Типовой сценарий временных рядов

YMatrix подходит для сценариев IoT с временными рядами, включающих устройства различных масштабов. В этом руководстве рассматривается конкретный пример с данными поездок подключённых такси, чтобы продемонстрировать, как загружать, обрабатывать и запрашивать данные временных рядов в YMatrix.

1 Введение в набор данных временных рядов — Данные о поездках городских такси

В городе с населением более 8 миллионов человек работает более 200 000 такси. Городские власти собирают и публикуют записи о поездках этих такси, включая время и место посадки и высадки, количество пассажиров, стоимость поездки и способ оплаты. Какие выводы можно сделать на основе этих данных? Среди них — коэффициент использования такси и даже общее состояние дорожного движения. Полученные сведения позволяют улучшить управление городом и повысить качество жизни жителей и гостей. В рамках данного руководства предоставляется архив данных за один месяц. Нажмите здесь, чтобы начать работу в области управления городским транспортом (пароль: 1x4u).

2 Метаданные и схема таблицы

Одно из полей в собранных данных указывает способ оплаты. Возможные значения: наличные, кредитная карта, бесплатная поездка, спорная, неизвестная и недействительная — эти значения называются статическими атрибутами. Создайте таблицу payment_types, чтобы хранить эту информацию для последующего использования в операциях соединения при выполнении запросов. Поскольку набор значений атрибута «способ оплаты» небольшой и может обновляться, используйте хранилище по умолчанию — движок HEAP. В целом, если не указано иное, таблицы создаются с использованием движка HEAP по умолчанию.

=# CREATE TABLE IF NOT EXISTS payment_types (
    payment_type int,
    description text
)
USING HEAP;

Предложение IF NOT EXISTS предотвращает ошибки при попытке создать таблицу, которая уже существует.

=# INSERT INTO payment_types VALUES
(1, 'Credit Card'),
(2, 'Cash'),
(3, 'No Charge'),
(4, 'Disputed'),
(5, 'Unknown'),
(6, 'Invalid Journey');

Другое поле указывает тип тарифа, включая стандартный тариф, аэропорт №1, аэропорт №2, специальную зону, договорную стоимость и групповую поездку. Аналогичным образом создайте справочную таблицу rate_codes для хранения этой информации, также используя движок HEAP по умолчанию:

=# CREATE TABLE IF NOT EXISTS rate_codes (
    rate_code   int,
    description text
)
USING HEAP;

=# INSERT INTO rate_codes VALUES
(1, 'Standard Rate'),
(2, 'Airport 1'),
(3, 'Airport 2'),
(4, 'Special Zone'),
(5, 'Negotiated Price'),
(6, 'Group');

Далее создайте таблицу временных рядов для хранения фактических данных о поездках. Ниже приведены пояснения ключевых полей:

  • pickup_datetime / dropoff_datetime: Временные метки посадки и высадки
  • pickup_longitude / pickup_latitude: Долгота и широта места посадки
  • dropoff_longitude / dropoff_latitude: Долгота и широта места высадки
  • passenger_count: Количество пассажиров
  • trip_distance: Расстояние поездки в милях
  • total_amount: Сумма оплаты
  • trip_duration: Вычисляемый столбец, формируемый при загрузке данных, представляющий продолжительность поездки в минутах

В данном сценарии временных рядов поездок на такси данные в таблице trip представляют собой изменяющиеся во времени измерения от устройств. Учитывая разнообразие и изменчивость источников данных в приложениях с временными рядами, требуется высокая производительность при вставке и хранении данных, в то время как операции обновления и удаления встречаются редко. Поэтому MARS3 является оптимальным выбором, поскольку обеспечивает значительные оптимизации при вставке, хранении и запросах к данным временных рядов.

Таблицы MARS3 зависят от расширения matrixts. Перед созданием таких таблиц убедитесь, что расширение установлено в целевой базе данных. Пропустите этот шаг, если расширение уже установлено:

=# CREATE EXTENSION matrixts;

Используйте предложение USING MARS3 для указания движка хранения и WITH для задания параметров.

  • compresstype: Алгоритм сжатия. Поддерживаемые значения: zstd, zlib, lz4; по умолчанию используется lz4.
  • compresslevel: Уровень сжатия. Меньшие значения обеспечивают более быстрое сжатие; большие значения — лучшее соотношение сжатия. Умеренные значения обеспечивают баланс между скоростью и степенью сжатия. Допустимые диапазоны различаются в зависимости от алгоритма:
    • zstd: 1–19
    • zlib: 1–9
    • lz4: 1–20 (по умолчанию: 1)
=# CREATE TABLE IF NOT EXISTS trip (
  vendor_id text,
  pickup_datetime timestamp without time zone,
  dropoff_datetime timestamp without time zone,
  passenger_count int,
  trip_distance numeric,
  pickup_longitude numeric,
  pickup_latitude numeric,
  rate_code_id int,
  store_and_fwd_flag text,
  dropoff_longitude numeric,
  dropoff_latitude numeric,
  payment_type int,
  fare_amount numeric,
  extra numeric,
  mta_tax numeric,
  tip_amount numeric,
  tolls_amount numeric,
  improvement_surcharge numeric,
  total_amount numeric,
  trip_duration numeric GENERATED ALWAYS AS (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED
) 
USING MARS3
WITH (compresstype='lz4', compresslevel=1)
DISTRIBUTED BY (vendor_id)
ORDER BY (vendor_id, pickup_datetime)
PARTITION BY RANGE (pickup_datetime)
( START (date '2016-01-01') INCLUSIVE
   END (date '2016-02-01') EXCLUSIVE
   EVERY (INTERVAL '1 day') );
  • Предложение DISTRIBUTED BY распределяет данные по сегментам с помощью хэш-распределения по столбцу trip, гарантируя, что строки с одинаковым значением vendor_id находятся на одном сегменте.
  • Предложение ORDER BY сортирует все данные внутри каждого сегмента по ключу сортировки (vendor_id, pickup_datetime), обеспечивая упорядоченное хранение.
  • Предложение PARTITION BY определяет диапазонное секционирование. Оно создаёт 31 ежедневную секцию с 1 января 2016 года (включительно) по 1 февраля 2016 года (исключительно). Автоматическое ежедневное секционирование позволяет быстро выполнять фильтрацию по временным интервалам и упрощает будущее управление данными на основе истечения срока их хранения.

Примечание!
Согласно правилам синтаксиса SQL, DISTRIBUTED BY должен быть объявлен до PARTITION BY. Однако при выполнении сначала применяется DISTRIBUTED BY для распределения данных по соответствующим сегментам, а затем PARTITION BY для вставки данных в соответствующие подтаблицы секций.

3 Загрузка данных

Найдите загруженный файл yellow_tripdata_2016-01.csv и используйте команду mxgate для загрузки данных. Укажите фактический путь к файлу после tail и используйте параметр --db-master-host для указания имени хоста или IP-адреса мастера. Пример:

$ tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database postgres --db-master-host mdw --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256  --delimiter ','  --exclude-columns trip_duration 

Ключевые параметры mxgate:

--db-database postgres     // Target database name    
--db-master-host mdw       // MXMaster hostname or IP
--db-master-port 5432      // Database port
--db-user mxadmin          // Database user
--time-format raw          // Raw format, no conversion
--target trip              // Target table name
--parallel 256             // Parallel degree
--delimiter ','            // Field delimiter

Дополнительную информацию о mxgate см. в разделе MatrixGate.

4 Анализ данных

YMatrix предоставляет функцию time_bucket, поддерживающую агрегацию по произвольным временным интервалам. Перед использованием установите расширение matrixts, чтобы инициализировать компоненты для работы с временными рядами (пропустите, если уже создано):

=# CREATE EXTENSION matrixts;

Теперь вы можете выполнить следующий SQL-запрос для подсчёта общего количества поездок в день:

=# SELECT time_bucket('24 hours', pickup_datetime) AS day, count(*) 
FROM trip 
GROUP BY day 
ORDER BY day;

Чтобы проанализировать количество пассажиров по часам за 2 января 2016 года:

=# SELECT time_bucket('1 hour', pickup_datetime) AS hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' AND pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;

Используя max и min, можно быстро определить, что самая длинная поездка в наборе данных составляет 485,9 мили. Чтобы дополнительно проанализировать количество поездок в разных диапазонах расстояний (≤10, 10–50, 50–100, 100–200, >200 миль), используйте один SQL-запрос:

=# SELECT distance_range, count(*) AS num_of_trips  
FROM
(
SELECT 
CASE
    WHEN trip_distance <= 10 THEN 10
    WHEN trip_distance >  10 AND trip_distance <= 50 THEN 50
    WHEN trip_distance >  50 AND trip_distance <= 100 THEN 100 
    WHEN trip_distance > 100 AND trip_distance <= 200 THEN 200
    WHEN trip_distance > 200 THEN 500 
    END AS distance_range  
FROM trip
) AS temp
GROUP BY distance_range;

После выполнения вы должны увидеть результат следующего вида:

 distance_range | num_of_trips
----------------+--------------
             10 |     10308767
             50 |       586200
            100 |          379
            200 |           58
            500 |            9