Типовой сценарий работы с временными данными

YMatrix подходит для сценариев обработки временных данных в IoT-устройствах любого масштаба. В данном руководстве рассматривается конкретный пример — сбор и анализ временных данных такси в городе, чтобы продемонстрировать методы загрузки, обработки и запросов временных данных в YMatrix.

1 Описание набора временных данных — данные о поездках такси в одном городе

В городе проживает более 8 миллионов человек и работает 200 000 такси. Городская администрация собирает и публикует информацию о каждой поездке такси, включая время посадки и высадки, места посадки и высадки, количество пассажиров, стоимость поездки и способ оплаты. С помощью этих данных можно провести анализ: например, определить коэффициент использования такси или даже общее состояние дорожного движения! Это позволяет улучшить управление городом и повысить качество жизни жителей и туристов. В рамках данного руководства предоставляется архив данных за один месяц. Нажмите здесь, чтобы начать свой путь в управлении городским транспортом (код доступа: 1x4u).

2 Метаданные и схема таблицы

Среди собранных данных есть способ оплаты. Возможные способы оплаты: наличные, кредитная карта, бесплатная поездка, спорная оплата, неизвестно и недействительна. Такие данные мы называем статическими атрибутами. Сохраните эту информацию, создав таблицу payment_types, чтобы в дальнейшем можно было связывать её с этими метаданными при выполнении запросов. Информация, связанная со «способом оплаты», имеет небольшой объём и требует обновления, поэтому используйте стандартный движок хранения HEAP. Обычно движок HEAP используется по умолчанию, если не указано иное.

=# CREATE TABLE IF NOT EXISTS payment_types (
    payment_type int,
    description text
)
USING HEAP;

Используя оператор IF NOT EXISTS, вы можете избежать ошибок повторного создания таблиц с одинаковыми именами.

=# INSERT INTO payment_types VALUES
(1, ‘Credit card’),
(2, ‘Cash’),
(3, ‘No charge’),
(4, ‘Disputed’),
(5, ‘Unknown’),
(6, ‘Invalid trip’);

Также существует тип тарифа, указывающий цену: стандартный тариф, Аэропорт №1, Аэропорт №2, специальные зоны, договорная цена, поездка нескольких пассажиров и т.д. Аналогично, вы можете создать статическую таблицу атрибутов rate_codes для хранения этой информации и использовать стандартный движок хранения HEAP:

=# CREATE TABLE IF NOT EXISTS rate_codes (
    rate_code   int,
    description text
)
USING HEAP;

=# INSERT INTO rate_codes VALUES
(1, ‘Standard Rate’),
(2, ‘Airport 1’),
(3, ‘Airport 2’),
(4, ‘Special Area’),
(5, ‘Negotiated Price’),
(6, ‘Group’);

Далее можно создать таблицу временных рядов для хранения данных о поездках. Ниже приведены поля и краткое описание их значений. Поля pickup_datetime / dropoff_datetime обозначают момент времени посадки и высадки соответственно. Поля pickup_longitude / pickup_latitude — широту и долготу места посадки, dropoff_longitude / dropoff_latitude — широту и долготу места высадки, passenger_count — количество пассажиров, trip_distance — расстояние поездки (в милях), total_amount — стоимость поездки, а последнее поле trip_duration формируется при загрузке данных и содержит продолжительность поездки (в минутах).

В сценарии сбора данных такси метаданные в таблице trip представляют собой временные данные, генерируемые в результате изменения состояния устройства во времени. Поскольку источники данных разнообразны и сильно варьируются в сценариях временных данных, предъявляются высокие требования к записи и хранению таких данных, в то время как потребность в обновлении и удалении данных невелика. Поэтому MARS3 является наилучшим выбором, поскольку он значительно оптимизирован по производительности записи, хранения и запросов временных данных.

Таблица MARS3 зависит от расширения matrixts, поэтому перед созданием таблицы необходимо сначала создать это расширение в базе данных с использованием соответствующего движка хранения. Если расширение уже создано, повторное создание не требуется — просто пропустите этот шаг:

=# CREATE EXTENSION matrixts;

При создании таблицы следует использовать оператор USING MARS3 для указания движка хранения и оператор WITH для задания соответствующих параметров. Параметр compresstype обозначает алгоритм сжатия, поддерживаются значения zstd, zlib и lz4, значение по умолчанию — lz4; compresslevel — уровень сжатия. Чем меньше значение, тем выше степень сжатия; средние значения обеспечивают баланс между скоростью сжатия и степенью сжатия. Разные алгоритмы имеют разные допустимые диапазоны значений:

  • zstd: 1–19
  • zlib: 1–9
  • lz4: 1–20. Значение по умолчанию — 1.
    =# CREATE TABLE IF NOT EXISTS trip (
    vendor_id text,
    pickup_datetime timestamp without time zone,
    dropoff_datetime timestamp without time zone,
    passenger_count int,
    trip_distance numeric,
    pickup_longitude numeric,
    pickup_latitude numeric,
    rate_code_id int,
    store_and_fwd_flag text,
    dropoff_longitude numeric,
    dropoff_latitude numeric,
    payment_type int,
    fare_amount numeric,
    extra numeric,
    mta_tax numeric,
    tip_amount numeric,
    tolls_amount numeric,
    improvement_surcharge numeric,
    total_amount numeric,
    trip_duration numeric GENERATED ALWAYS AS (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED
    ) 
    USING MARS3
    WITH (compresstype='lz4', compresslevel=1)
    DISTRIBUTED BY (vendor_id)
    ORDER BY (vendor_id, pickup_datetime)
    PARTITION BY RANGE (pickup_datetime)
    ( START (date '2016-01-01') INCLUSIVE
     END (date '2016-02-01') EXCLUSIVE
     EVERY (INTERVAL '1 day') );
  • Наличие оператора DISTRIBUTED BY означает, что данные в таблице trip будут распределены по блокам (bucketing) и распределяться по хешу согласно столбцу vendor_id, так что данные с одинаковым значением окажутся на одном узле.
  • Оператор ORDER BY обеспечивает сортировку всех данных на каждом узле по ключу сортировки (vendor_id, pickup_datetime), благодаря чему данные хранятся в упорядоченном виде.
  • Оператор PARTITION BY выполняет секционирование данных. В период с 1 января 2016 года (включительно) по 1 февраля 2016 года (исключая этот день) создаются 31 секционная таблица с интервалом в один день. Таблица trip автоматически разделяется по дням, что упрощает быстрый отбор данных по временным периодам и способствует быстрой обработке устаревших данных в будущем.

Примечание!
Синтаксис SQL требует, чтобы сначала указывался DISTRIBUTED BY, а затем PARTITION BY. Однако фактически сначала выполняется DISTRIBUTED BY — распределение данных на соответствующие узлы, а затем на указанном узле выполняется PARTITION BY — вставка данных в соответствующую подсекцию.

3 Загрузка данных

Вы можете найти путь к файлу yellow_tripdata_2016-01.csv, загруженному по указанной выше ссылке, и использовать команду mxgate для загрузки данных. Укажите фактический путь к файлу после параметра tail и имя узла или IP-адрес через параметр --db-master-host, например:

$ tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database postgres --db-master-host mdw --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256  --delimiter ','  --exclude-columns trip_duration 

Основные параметры mxgate следующие:

--db-database postgres // Specify the database name
--db-master-host mdw // master node host name or IP
--db-master-port 5432 // Database port
--db-user mxadmin // Database username
--time-format raw // original format, no conversion
--target trip // The table name to be imported
--parallel 256 // Parallel number
--delimiter ',' // delimiter

Более подробную информацию о mxgate см. в разделе MatrixGate

4 Анализ данных

YMatrix предоставляет функцию time_bucket, которая поддерживает вычисления с разбиением по любому временному интервалу. Перед использованием необходимо установить расширение matrixts в базе данных для инициализации компонентов временных данных (повторное создание не требуется).

=# CREATE EXTENSION matrixts;

Затем можно использовать следующий SQL-запрос, чтобы подсчитать количество поездок каждый день:

=# SELECT time_bucket('24 hours', pickup_datetime) AS day, count(*) 
from trip 
GROUP BY day 
ORDER BY day;

Если вы хотите узнать, сколько людей совершали поездки каждый час 2 января 2016 года, используйте следующий SQL-запрос:

=# SELECT time_bucket('1 hour', pickup_datetime) AS hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' AND pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;

С помощью функций max и min быстро выясняется, что максимальное расстояние в текущем наборе данных составляет 485,9 мили. Если вы хотите дополнительно проанализировать общее количество поездок с расстоянием более 10, 10–50, 50–100, 100–200 и более 200 миль, достаточно одного SQL-запроса:

=# SELECT distance_range, count(*) AS num_of_trips  
FROM
(
SELECT 
CASE
    WHEN trip_distance <= 10 THEN 10
    WHEN trip_distance >  10 AND trip_distance <= 50 THEN 50
    WHEN trip_distance >  50 AND trip_distance <= 100 THEN 100 
    WHEN trip_distance > 100 AND trip_distance <= 200 THEN 200
    WHEN trip_distance > 200 THEN 500 
    END AS distance_range  
FROM trip
) AS temp
GROUP BY distance_range;

После выполнения вы получите результат:

 distance_range | num_of_trips
----------------------------------------------------------------------------------------------------------------------------------
             10 |     10308767
             50 |       586200
            100 |          379
            200 |           58
            500 |            9