Типовой сценарий временных рядов

YMatrix подходит для сценариев временных рядов в IoT с устройствами различных масштабов. В этом руководстве рассматривается конкретный пример анализа данных поездок такси — типичного случая использования временных рядов, чтобы продемонстрировать, как загружать, обрабатывать и запрашивать данные временных рядов в YMatrix.

1 Введение в набор данных временных рядов — Данные о поездках такси в городе

В городе с населением более 8 миллионов человек работает более 200 000 автомобилей-такси. Городское транспортное управление собирает и публикует записи о каждой поездке такси, включая время и место посадки и высадки, количество пассажиров, стоимость поездки и способ оплаты. Какие выводы можно сделать на основе этих данных? Среди них — коэффициент использования такси и даже общее состояние дорожного движения. Полученные сведения позволяют улучшить городское управление и повысить качество жизни жителей и гостей города. В рамках данного руководства предоставляется архив данных за один месяц. Нажмите здесь, чтобы начать погружение в анализ городской мобильности (пароль: 1x4u).

2 Метаданные и схема таблицы

Одно из собранных полей указывает способ оплаты. Возможные значения: наличные, кредитная карта, бесплатная поездка, спорная, неизвестная и недействительная — эти значения называются статическими атрибутами. Создайте таблицу payment_types, чтобы хранить эту информацию для последующего соединения при выполнении запросов. Поскольку данные о «способе оплаты» небольшие по объему и могут требовать обновления, используйте хранилище по умолчанию — движок HEAP. Вообще говоря, если не указано иное, таблицы автоматически используют движок HEAP.

=# CREATE TABLE IF NOT EXISTS payment_types (
    payment_type int,
    description text
)
USING HEAP;

Использование предложения IF NOT EXISTS предотвращает ошибки при попытке создать таблицу, которая уже существует.

=# INSERT INTO payment_types VALUES
(1, 'Credit Card'),
(2, 'Cash'),
(3, 'No Charge'),
(4, 'Disputed'),
(5, 'Unknown'),
(6, 'Invalid Journey');

Другое поле указывает тип тарифа: стандартный тариф, аэропорт №1, аэропорт №2, специальная зона, договорная цена и групповая поездка. Аналогично создайте справочную таблицу rate_codes, используя движок хранения по умолчанию HEAP:

=# CREATE TABLE IF NOT EXISTS rate_codes (
    rate_code   int,
    description text
)
USING HEAP;

=# INSERT INTO rate_codes VALUES
(1, 'Standard Rate'),
(2, 'Airport 1'),
(3, 'Airport 2'),
(4, 'Special Zone'),
(5, 'Negotiated Price'),
(6, 'Group');

Теперь создадим таблицу временных рядов для хранения фактических данных о поездках. Ниже приведены пояснения ключевых полей:

  • pickup_datetime / dropoff_datetime: Временные метки начала и окончания поездки
  • pickup_longitude / pickup_latitude: Долгота и широта места посадки
  • dropoff_longitude / dropoff_latitude: Долгота и широта места высадки
  • passenger_count: Количество пассажиров
  • trip_distance: Расстояние поездки в милях
  • total_amount: Сумма оплаты за поездку
  • trip_duration: Вычисляемый столбец, формируемый при загрузке данных, представляющий продолжительность поездки в минутах

В данном сценарии временных рядов поездок такси данные в таблице trip представляют собой изменяющиеся во времени измерения с устройств. Учитывая разнообразие и изменчивость источников данных в приложениях для временных рядов, высокая производительность при вставке и хранении данных имеет критическое значение, тогда как требования к операциям обновления и удаления минимальны. Поэтому MARS3 является оптимальным выбором, обеспечивая значительные оптимизации при вставке, хранении и запросах к данным временных рядов.

Таблицы MARS3 зависят от расширения matrixts. Перед созданием таких таблиц убедитесь, что расширение установлено в целевой базе данных. Если оно уже установлено, этот шаг можно пропустить.

=# CREATE EXTENSION matrixts;

При создании таблицы используйте предложение USING MARS3 для указания движка хранения и предложение WITH для задания параметров.

  • compresstype: Алгоритм сжатия. Поддерживаемые значения: zstd, zlib, lz4; по умолчанию используется lz4.
  • compresslevel: Уровень сжатия. Меньшие значения обеспечивают более быстрое сжатие; большие значения дают лучшее сжатие. Средние значения обеспечивают баланс между скоростью и степенью сжатия. Допустимые диапазоны различаются в зависимости от алгоритма:
    • zstd: 1–19
    • zlib: 1–9
    • lz4: 1–20 (по умолчанию: 1)
=# CREATE TABLE IF NOT EXISTS trip (
  vendor_id text,
  pickup_datetime timestamp without time zone,
  dropoff_datetime timestamp without time zone,
  passenger_count int,
  trip_distance numeric,
  pickup_longitude numeric,
  pickup_latitude numeric,
  rate_code_id int,
  store_and_fwd_flag text,
  dropoff_longitude numeric,
  dropoff_latitude numeric,
  payment_type int,
  fare_amount numeric,
  extra numeric,
  mta_tax numeric,
  tip_amount numeric,
  tolls_amount numeric,
  improvement_surcharge numeric,
  total_amount numeric,
  trip_duration numeric GENERATED ALWAYS AS (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED
) 
USING MARS3
WITH (compresstype='lz4', compresslevel=1)
DISTRIBUTED BY (vendor_id)
ORDER BY (vendor_id, pickup_datetime)
PARTITION BY RANGE (pickup_datetime)
( START (date '2016-01-01') INCLUSIVE
   END (date '2016-02-01') EXCLUSIVE
   EVERY (INTERVAL '1 day') );
  • Предложение DISTRIBUTED BY распределяет данные по сегментам с помощью хеширования по столбцу trip, гарантируя, что строки с одинаковым значением vendor_id находятся на одном сегменте.
  • Предложение ORDER BY сортирует все данные внутри каждого сегмента по ключу сортировки (vendor_id, pickup_datetime), обеспечивая упорядоченное хранение.
  • Предложение PARTITION BY определяет партиционирование по диапазону по столбцу trip, создавая 31 ежедневную партицию с 1 января 2016 года (включительно) по 1 февраля 2016 года (исключительно). Ежедневное партиционирование позволяет быстро отсекать данные при временных запросах и упрощает будущее управление данными на основе срока хранения.

Примечание!
Согласно правилам синтаксиса SQL, DISTRIBUTED BY должен быть объявлен до PARTITION BY. Однако при выполнении сначала применяется DISTRIBUTED BY, чтобы распределить данные по соответствующим сегментам, а затем PARTITION BY вставляет данные в соответствующие подтаблицы партиций.

3 Загрузка данных

Найдите загруженный файл yellow_tripdata_2016-01.csv и используйте команду mxgate для его загрузки. Укажите фактический путь к файлу после tail и используйте параметр --db-master-host, чтобы задать имя хоста или IP-адрес главного узла. Пример:

$ tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database postgres --db-master-host mdw --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256  --delimiter ','  --exclude-columns trip_duration 

Основные параметры mxgate:

--db-database postgres     // Target database name    
--db-master-host mdw       // MXMaster hostname or IP
--db-master-port 5432      // Database port
--db-user mxadmin          // Database user
--time-format raw          // Raw format, no conversion
--target trip              // Target table name
--parallel 256             // Degree of parallelism
--delimiter ','            // Field delimiter

Дополнительную информацию о mxgate см. в разделе MatrixGate.

4 Анализ данных

YMatrix предоставляет функцию time_bucket, поддерживающую агрегацию по произвольным временным интервалам. Перед использованием установите расширение matrixts, чтобы инициализировать компоненты временных рядов (если они еще не созданы):

=# CREATE EXTENSION matrixts;

Теперь можно выполнить следующий SQL-запрос для подсчета общего количества поездок в день:

=# SELECT time_bucket('24 hours', pickup_datetime) AS day, count(*) 
FROM trip 
GROUP BY day 
ORDER BY day;

Чтобы проанализировать количество пассажиров по часам за 2 января 2016 года:

=# SELECT time_bucket('1 hour', pickup_datetime) AS hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' AND pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;

Используя max и min, можно быстро определить, что самая длинная поездка в наборе данных составляет 485,9 мили. Чтобы дополнительно проанализировать количество поездок, попадающих в различные диапазоны расстояний (≤10, 10–50, 50–100, 100–200, >200 миль), используйте один SQL-запрос:

=# SELECT distance_range, count(*) AS num_of_trips  
FROM
(
SELECT 
CASE
    WHEN trip_distance <= 10 THEN 10
    WHEN trip_distance >  10 AND trip_distance <= 50 THEN 50
    WHEN trip_distance >  50 AND trip_distance <= 100 THEN 100 
    WHEN trip_distance > 100 AND trip_distance <= 200 THEN 200
    WHEN trip_distance > 200 THEN 500 
    END AS distance_range  
FROM trip
) AS temp
GROUP BY distance_range;

Результат должен выглядеть примерно так:

 distance_range | num_of_trips
----------------+--------------
             10 |     10308767
             50 |       586200
            100 |          379
            200 |           58
            500 |            9