YMatrix подходит для сценариев временных рядов в IoT с устройствами различных масштабов. В этом руководстве рассматривается конкретный пример анализа данных поездок такси — типичного случая использования временных рядов, чтобы продемонстрировать, как загружать, обрабатывать и запрашивать данные временных рядов в YMatrix.
В городе с населением более 8 миллионов человек работает более 200 000 автомобилей-такси. Городское транспортное управление собирает и публикует записи о каждой поездке такси, включая время и место посадки и высадки, количество пассажиров, стоимость поездки и способ оплаты. Какие выводы можно сделать на основе этих данных? Среди них — коэффициент использования такси и даже общее состояние дорожного движения. Полученные сведения позволяют улучшить городское управление и повысить качество жизни жителей и гостей города. В рамках данного руководства предоставляется архив данных за один месяц. Нажмите здесь, чтобы начать погружение в анализ городской мобильности (пароль: 1x4u).
Одно из собранных полей указывает способ оплаты. Возможные значения: наличные, кредитная карта, бесплатная поездка, спорная, неизвестная и недействительная — эти значения называются статическими атрибутами. Создайте таблицу payment_types, чтобы хранить эту информацию для последующего соединения при выполнении запросов. Поскольку данные о «способе оплаты» небольшие по объему и могут требовать обновления, используйте хранилище по умолчанию — движок HEAP. Вообще говоря, если не указано иное, таблицы автоматически используют движок HEAP.
=# CREATE TABLE IF NOT EXISTS payment_types (
payment_type int,
description text
)
USING HEAP;
Использование предложения IF NOT EXISTS предотвращает ошибки при попытке создать таблицу, которая уже существует.
=# INSERT INTO payment_types VALUES
(1, 'Credit Card'),
(2, 'Cash'),
(3, 'No Charge'),
(4, 'Disputed'),
(5, 'Unknown'),
(6, 'Invalid Journey');
Другое поле указывает тип тарифа: стандартный тариф, аэропорт №1, аэропорт №2, специальная зона, договорная цена и групповая поездка. Аналогично создайте справочную таблицу rate_codes, используя движок хранения по умолчанию HEAP:
=# CREATE TABLE IF NOT EXISTS rate_codes (
rate_code int,
description text
)
USING HEAP;
=# INSERT INTO rate_codes VALUES
(1, 'Standard Rate'),
(2, 'Airport 1'),
(3, 'Airport 2'),
(4, 'Special Zone'),
(5, 'Negotiated Price'),
(6, 'Group');
Теперь создадим таблицу временных рядов для хранения фактических данных о поездках. Ниже приведены пояснения ключевых полей:
pickup_datetime / dropoff_datetime: Временные метки начала и окончания поездки pickup_longitude / pickup_latitude: Долгота и широта места посадки dropoff_longitude / dropoff_latitude: Долгота и широта места высадки passenger_count: Количество пассажиров trip_distance: Расстояние поездки в милях total_amount: Сумма оплаты за поездку trip_duration: Вычисляемый столбец, формируемый при загрузке данных, представляющий продолжительность поездки в минутах В данном сценарии временных рядов поездок такси данные в таблице trip представляют собой изменяющиеся во времени измерения с устройств. Учитывая разнообразие и изменчивость источников данных в приложениях для временных рядов, высокая производительность при вставке и хранении данных имеет критическое значение, тогда как требования к операциям обновления и удаления минимальны. Поэтому MARS3 является оптимальным выбором, обеспечивая значительные оптимизации при вставке, хранении и запросах к данным временных рядов.
Таблицы MARS3 зависят от расширения matrixts. Перед созданием таких таблиц убедитесь, что расширение установлено в целевой базе данных. Если оно уже установлено, этот шаг можно пропустить.
=# CREATE EXTENSION matrixts;
При создании таблицы используйте предложение USING MARS3 для указания движка хранения и предложение WITH для задания параметров.
compresstype: Алгоритм сжатия. Поддерживаемые значения: zstd, zlib, lz4; по умолчанию используется lz4. compresslevel: Уровень сжатия. Меньшие значения обеспечивают более быстрое сжатие; большие значения дают лучшее сжатие. Средние значения обеспечивают баланс между скоростью и степенью сжатия. Допустимые диапазоны различаются в зависимости от алгоритма:=# CREATE TABLE IF NOT EXISTS trip (
vendor_id text,
pickup_datetime timestamp without time zone,
dropoff_datetime timestamp without time zone,
passenger_count int,
trip_distance numeric,
pickup_longitude numeric,
pickup_latitude numeric,
rate_code_id int,
store_and_fwd_flag text,
dropoff_longitude numeric,
dropoff_latitude numeric,
payment_type int,
fare_amount numeric,
extra numeric,
mta_tax numeric,
tip_amount numeric,
tolls_amount numeric,
improvement_surcharge numeric,
total_amount numeric,
trip_duration numeric GENERATED ALWAYS AS (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED
)
USING MARS3
WITH (compresstype='lz4', compresslevel=1)
DISTRIBUTED BY (vendor_id)
ORDER BY (vendor_id, pickup_datetime)
PARTITION BY RANGE (pickup_datetime)
( START (date '2016-01-01') INCLUSIVE
END (date '2016-02-01') EXCLUSIVE
EVERY (INTERVAL '1 day') );
DISTRIBUTED BY распределяет данные по сегментам с помощью хеширования по столбцу trip, гарантируя, что строки с одинаковым значением vendor_id находятся на одном сегменте.ORDER BY сортирует все данные внутри каждого сегмента по ключу сортировки (vendor_id, pickup_datetime), обеспечивая упорядоченное хранение.PARTITION BY определяет партиционирование по диапазону по столбцу trip, создавая 31 ежедневную партицию с 1 января 2016 года (включительно) по 1 февраля 2016 года (исключительно). Ежедневное партиционирование позволяет быстро отсекать данные при временных запросах и упрощает будущее управление данными на основе срока хранения.Примечание!
Согласно правилам синтаксиса SQL,DISTRIBUTED BYдолжен быть объявлен доPARTITION BY. Однако при выполнении сначала применяетсяDISTRIBUTED BY, чтобы распределить данные по соответствующим сегментам, а затемPARTITION BYвставляет данные в соответствующие подтаблицы партиций.
Найдите загруженный файл yellow_tripdata_2016-01.csv и используйте команду mxgate для его загрузки. Укажите фактический путь к файлу после tail и используйте параметр --db-master-host, чтобы задать имя хоста или IP-адрес главного узла. Пример:
$ tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database postgres --db-master-host mdw --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256 --delimiter ',' --exclude-columns trip_duration
Основные параметры mxgate:
--db-database postgres // Target database name
--db-master-host mdw // MXMaster hostname or IP
--db-master-port 5432 // Database port
--db-user mxadmin // Database user
--time-format raw // Raw format, no conversion
--target trip // Target table name
--parallel 256 // Degree of parallelism
--delimiter ',' // Field delimiter
Дополнительную информацию о mxgate см. в разделе MatrixGate.
YMatrix предоставляет функцию time_bucket, поддерживающую агрегацию по произвольным временным интервалам. Перед использованием установите расширение matrixts, чтобы инициализировать компоненты временных рядов (если они еще не созданы):
=# CREATE EXTENSION matrixts;
Теперь можно выполнить следующий SQL-запрос для подсчета общего количества поездок в день:
=# SELECT time_bucket('24 hours', pickup_datetime) AS day, count(*)
FROM trip
GROUP BY day
ORDER BY day;
Чтобы проанализировать количество пассажиров по часам за 2 января 2016 года:
=# SELECT time_bucket('1 hour', pickup_datetime) AS hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' AND pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;
Используя max и min, можно быстро определить, что самая длинная поездка в наборе данных составляет 485,9 мили. Чтобы дополнительно проанализировать количество поездок, попадающих в различные диапазоны расстояний (≤10, 10–50, 50–100, 100–200, >200 миль), используйте один SQL-запрос:
=# SELECT distance_range, count(*) AS num_of_trips
FROM
(
SELECT
CASE
WHEN trip_distance <= 10 THEN 10
WHEN trip_distance > 10 AND trip_distance <= 50 THEN 50
WHEN trip_distance > 50 AND trip_distance <= 100 THEN 100
WHEN trip_distance > 100 AND trip_distance <= 200 THEN 200
WHEN trip_distance > 200 THEN 500
END AS distance_range
FROM trip
) AS temp
GROUP BY distance_range;
Результат должен выглядеть примерно так:
distance_range | num_of_trips
----------------+--------------
10 | 10308767
50 | 586200
100 | 379
200 | 58
500 | 9