YMatrix подходит для сценариев IoT с временными рядами, включающих устройства различных масштабов. В этом руководстве рассматривается конкретный пример с данными поездок подключённых такси, чтобы продемонстрировать, как загружать, обрабатывать и запрашивать данные временных рядов в YMatrix.
В городе с населением более 8 миллионов человек работает более 200 000 такси. Городские власти собирают и публикуют записи о поездках этих такси, включая время и место посадки и высадки, количество пассажиров, стоимость поездки и способ оплаты. Какие выводы можно сделать на основе этих данных? Среди них — коэффициент использования такси и даже общее состояние дорожного движения. Полученные сведения позволяют улучшить управление городом и повысить качество жизни жителей и гостей. В рамках данного руководства предоставляется архив данных за один месяц. Нажмите здесь, чтобы начать работу в области управления городским транспортом (пароль: 1x4u).
Одно из полей в собранных данных указывает способ оплаты. Возможные значения: наличные, кредитная карта, бесплатная поездка, спорная, неизвестная и недействительная — эти значения называются статическими атрибутами. Создайте таблицу payment_types, чтобы хранить эту информацию для последующего использования в операциях соединения при выполнении запросов. Поскольку набор значений атрибута «способ оплаты» небольшой и может обновляться, используйте хранилище по умолчанию — движок HEAP. В целом, если не указано иное, таблицы создаются с использованием движка HEAP по умолчанию.
=# CREATE TABLE IF NOT EXISTS payment_types (
payment_type int,
description text
)
USING HEAP;
Предложение IF NOT EXISTS предотвращает ошибки при попытке создать таблицу, которая уже существует.
=# INSERT INTO payment_types VALUES
(1, 'Credit Card'),
(2, 'Cash'),
(3, 'No Charge'),
(4, 'Disputed'),
(5, 'Unknown'),
(6, 'Invalid Journey');
Другое поле указывает тип тарифа, включая стандартный тариф, аэропорт №1, аэропорт №2, специальную зону, договорную стоимость и групповую поездку. Аналогичным образом создайте справочную таблицу rate_codes для хранения этой информации, также используя движок HEAP по умолчанию:
=# CREATE TABLE IF NOT EXISTS rate_codes (
rate_code int,
description text
)
USING HEAP;
=# INSERT INTO rate_codes VALUES
(1, 'Standard Rate'),
(2, 'Airport 1'),
(3, 'Airport 2'),
(4, 'Special Zone'),
(5, 'Negotiated Price'),
(6, 'Group');
Далее создайте таблицу временных рядов для хранения фактических данных о поездках. Ниже приведены пояснения ключевых полей:
pickup_datetime / dropoff_datetime: Временные метки посадки и высадки pickup_longitude / pickup_latitude: Долгота и широта места посадки dropoff_longitude / dropoff_latitude: Долгота и широта места высадки passenger_count: Количество пассажиров trip_distance: Расстояние поездки в милях total_amount: Сумма оплаты trip_duration: Вычисляемый столбец, формируемый при загрузке данных, представляющий продолжительность поездки в минутах В данном сценарии временных рядов поездок на такси данные в таблице trip представляют собой изменяющиеся во времени измерения от устройств. Учитывая разнообразие и изменчивость источников данных в приложениях с временными рядами, требуется высокая производительность при вставке и хранении данных, в то время как операции обновления и удаления встречаются редко. Поэтому MARS3 является оптимальным выбором, поскольку обеспечивает значительные оптимизации при вставке, хранении и запросах к данным временных рядов.
Таблицы MARS3 зависят от расширения matrixts. Перед созданием таких таблиц убедитесь, что расширение установлено в целевой базе данных. Пропустите этот шаг, если расширение уже установлено:
=# CREATE EXTENSION matrixts;
Используйте предложение USING MARS3 для указания движка хранения и WITH для задания параметров.
compresstype: Алгоритм сжатия. Поддерживаемые значения: zstd, zlib, lz4; по умолчанию используется lz4. compresslevel: Уровень сжатия. Меньшие значения обеспечивают более быстрое сжатие; большие значения — лучшее соотношение сжатия. Умеренные значения обеспечивают баланс между скоростью и степенью сжатия. Допустимые диапазоны различаются в зависимости от алгоритма:=# CREATE TABLE IF NOT EXISTS trip (
vendor_id text,
pickup_datetime timestamp without time zone,
dropoff_datetime timestamp without time zone,
passenger_count int,
trip_distance numeric,
pickup_longitude numeric,
pickup_latitude numeric,
rate_code_id int,
store_and_fwd_flag text,
dropoff_longitude numeric,
dropoff_latitude numeric,
payment_type int,
fare_amount numeric,
extra numeric,
mta_tax numeric,
tip_amount numeric,
tolls_amount numeric,
improvement_surcharge numeric,
total_amount numeric,
trip_duration numeric GENERATED ALWAYS AS (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED
)
USING MARS3
WITH (compresstype='lz4', compresslevel=1)
DISTRIBUTED BY (vendor_id)
ORDER BY (vendor_id, pickup_datetime)
PARTITION BY RANGE (pickup_datetime)
( START (date '2016-01-01') INCLUSIVE
END (date '2016-02-01') EXCLUSIVE
EVERY (INTERVAL '1 day') );
DISTRIBUTED BY распределяет данные по сегментам с помощью хэш-распределения по столбцу trip, гарантируя, что строки с одинаковым значением vendor_id находятся на одном сегменте.ORDER BY сортирует все данные внутри каждого сегмента по ключу сортировки (vendor_id, pickup_datetime), обеспечивая упорядоченное хранение.PARTITION BY определяет диапазонное секционирование. Оно создаёт 31 ежедневную секцию с 1 января 2016 года (включительно) по 1 февраля 2016 года (исключительно). Автоматическое ежедневное секционирование позволяет быстро выполнять фильтрацию по временным интервалам и упрощает будущее управление данными на основе истечения срока их хранения.Примечание!
Согласно правилам синтаксиса SQL,DISTRIBUTED BYдолжен быть объявлен доPARTITION BY. Однако при выполнении сначала применяетсяDISTRIBUTED BYдля распределения данных по соответствующим сегментам, а затемPARTITION BYдля вставки данных в соответствующие подтаблицы секций.
Найдите загруженный файл yellow_tripdata_2016-01.csv и используйте команду mxgate для загрузки данных. Укажите фактический путь к файлу после tail и используйте параметр --db-master-host для указания имени хоста или IP-адреса мастера. Пример:
$ tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database postgres --db-master-host mdw --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256 --delimiter ',' --exclude-columns trip_duration
Ключевые параметры mxgate:
--db-database postgres // Target database name
--db-master-host mdw // MXMaster hostname or IP
--db-master-port 5432 // Database port
--db-user mxadmin // Database user
--time-format raw // Raw format, no conversion
--target trip // Target table name
--parallel 256 // Parallel degree
--delimiter ',' // Field delimiter
Дополнительную информацию о mxgate см. в разделе MatrixGate.
YMatrix предоставляет функцию time_bucket, поддерживающую агрегацию по произвольным временным интервалам. Перед использованием установите расширение matrixts, чтобы инициализировать компоненты для работы с временными рядами (пропустите, если уже создано):
=# CREATE EXTENSION matrixts;
Теперь вы можете выполнить следующий SQL-запрос для подсчёта общего количества поездок в день:
=# SELECT time_bucket('24 hours', pickup_datetime) AS day, count(*)
FROM trip
GROUP BY day
ORDER BY day;
Чтобы проанализировать количество пассажиров по часам за 2 января 2016 года:
=# SELECT time_bucket('1 hour', pickup_datetime) AS hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' AND pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;
Используя max и min, можно быстро определить, что самая длинная поездка в наборе данных составляет 485,9 мили. Чтобы дополнительно проанализировать количество поездок в разных диапазонах расстояний (≤10, 10–50, 50–100, 100–200, >200 миль), используйте один SQL-запрос:
=# SELECT distance_range, count(*) AS num_of_trips
FROM
(
SELECT
CASE
WHEN trip_distance <= 10 THEN 10
WHEN trip_distance > 10 AND trip_distance <= 50 THEN 50
WHEN trip_distance > 50 AND trip_distance <= 100 THEN 100
WHEN trip_distance > 100 AND trip_distance <= 200 THEN 200
WHEN trip_distance > 200 THEN 500
END AS distance_range
FROM trip
) AS temp
GROUP BY distance_range;
После выполнения вы должны увидеть результат следующего вида:
distance_range | num_of_trips
----------------+--------------
10 | 10308767
50 | 586200
100 | 379
200 | 58
500 | 9