YMatrix подходит для сценариев обработки временных данных в IoT-устройствах любого масштаба. В данном руководстве рассматривается конкретный пример — сбор и анализ временных данных такси в городе, чтобы продемонстрировать методы загрузки, обработки и запросов временных данных в YMatrix.
В городе проживает более 8 миллионов человек и работает 200 000 такси. Городская администрация собирает и публикует информацию о каждой поездке такси, включая время посадки и высадки, места посадки и высадки, количество пассажиров, стоимость поездки и способ оплаты. С помощью этих данных можно провести анализ: например, определить коэффициент использования такси или даже общее состояние дорожного движения! Это позволяет улучшить управление городом и повысить качество жизни жителей и туристов. В рамках данного руководства предоставляется архив данных за один месяц. Нажмите здесь, чтобы начать свой путь в управлении городским транспортом (код доступа: 1x4u).
Среди собранных данных есть способ оплаты. Возможные способы оплаты: наличные, кредитная карта, бесплатная поездка, спорная оплата, неизвестно и недействительна. Такие данные мы называем статическими атрибутами. Сохраните эту информацию, создав таблицу payment_types, чтобы в дальнейшем можно было связывать её с этими метаданными при выполнении запросов. Информация, связанная со «способом оплаты», имеет небольшой объём и требует обновления, поэтому используйте стандартный движок хранения HEAP. Обычно движок HEAP используется по умолчанию, если не указано иное.
=# CREATE TABLE IF NOT EXISTS payment_types (
payment_type int,
description text
)
USING HEAP;
Используя оператор IF NOT EXISTS, вы можете избежать ошибок повторного создания таблиц с одинаковыми именами.
=# INSERT INTO payment_types VALUES
(1, ‘Credit card’),
(2, ‘Cash’),
(3, ‘No charge’),
(4, ‘Disputed’),
(5, ‘Unknown’),
(6, ‘Invalid trip’);
Также существует тип тарифа, указывающий цену: стандартный тариф, Аэропорт №1, Аэропорт №2, специальные зоны, договорная цена, поездка нескольких пассажиров и т.д. Аналогично, вы можете создать статическую таблицу атрибутов rate_codes для хранения этой информации и использовать стандартный движок хранения HEAP:
=# CREATE TABLE IF NOT EXISTS rate_codes (
rate_code int,
description text
)
USING HEAP;
=# INSERT INTO rate_codes VALUES
(1, ‘Standard Rate’),
(2, ‘Airport 1’),
(3, ‘Airport 2’),
(4, ‘Special Area’),
(5, ‘Negotiated Price’),
(6, ‘Group’);
Далее можно создать таблицу временных рядов для хранения данных о поездках. Ниже приведены поля и краткое описание их значений. Поля pickup_datetime / dropoff_datetime обозначают момент времени посадки и высадки соответственно. Поля pickup_longitude / pickup_latitude — широту и долготу места посадки, dropoff_longitude / dropoff_latitude — широту и долготу места высадки, passenger_count — количество пассажиров, trip_distance — расстояние поездки (в милях), total_amount — стоимость поездки, а последнее поле trip_duration формируется при загрузке данных и содержит продолжительность поездки (в минутах).
В сценарии сбора данных такси метаданные в таблице trip представляют собой временные данные, генерируемые в результате изменения состояния устройства во времени. Поскольку источники данных разнообразны и сильно варьируются в сценариях временных данных, предъявляются высокие требования к записи и хранению таких данных, в то время как потребность в обновлении и удалении данных невелика. Поэтому MARS3 является наилучшим выбором, поскольку он значительно оптимизирован по производительности записи, хранения и запросов временных данных.
Таблица MARS3 зависит от расширения matrixts, поэтому перед созданием таблицы необходимо сначала создать это расширение в базе данных с использованием соответствующего движка хранения. Если расширение уже создано, повторное создание не требуется — просто пропустите этот шаг:
=# CREATE EXTENSION matrixts;
При создании таблицы следует использовать оператор USING MARS3 для указания движка хранения и оператор WITH для задания соответствующих параметров. Параметр compresstype обозначает алгоритм сжатия, поддерживаются значения zstd, zlib и lz4, значение по умолчанию — lz4; compresslevel — уровень сжатия. Чем меньше значение, тем выше степень сжатия; средние значения обеспечивают баланс между скоростью сжатия и степенью сжатия. Разные алгоритмы имеют разные допустимые диапазоны значений:
=# CREATE TABLE IF NOT EXISTS trip (
vendor_id text,
pickup_datetime timestamp without time zone,
dropoff_datetime timestamp without time zone,
passenger_count int,
trip_distance numeric,
pickup_longitude numeric,
pickup_latitude numeric,
rate_code_id int,
store_and_fwd_flag text,
dropoff_longitude numeric,
dropoff_latitude numeric,
payment_type int,
fare_amount numeric,
extra numeric,
mta_tax numeric,
tip_amount numeric,
tolls_amount numeric,
improvement_surcharge numeric,
total_amount numeric,
trip_duration numeric GENERATED ALWAYS AS (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED
)
USING MARS3
WITH (compresstype='lz4', compresslevel=1)
DISTRIBUTED BY (vendor_id)
ORDER BY (vendor_id, pickup_datetime)
PARTITION BY RANGE (pickup_datetime)
( START (date '2016-01-01') INCLUSIVE
END (date '2016-02-01') EXCLUSIVE
EVERY (INTERVAL '1 day') );
DISTRIBUTED BY означает, что данные в таблице trip будут распределены по блокам (bucketing) и распределяться по хешу согласно столбцу vendor_id, так что данные с одинаковым значением окажутся на одном узле.(vendor_id, pickup_datetime), благодаря чему данные хранятся в упорядоченном виде.PARTITION BY выполняет секционирование данных. В период с 1 января 2016 года (включительно) по 1 февраля 2016 года (исключая этот день) создаются 31 секционная таблица с интервалом в один день. Таблица trip автоматически разделяется по дням, что упрощает быстрый отбор данных по временным периодам и способствует быстрой обработке устаревших данных в будущем.Примечание!
Синтаксис SQL требует, чтобы сначала указывалсяDISTRIBUTED BY, а затемPARTITION BY. Однако фактически сначала выполняетсяDISTRIBUTED BY— распределение данных на соответствующие узлы, а затем на указанном узле выполняетсяPARTITION BY— вставка данных в соответствующую подсекцию.
Вы можете найти путь к файлу yellow_tripdata_2016-01.csv, загруженному по указанной выше ссылке, и использовать команду mxgate для загрузки данных. Укажите фактический путь к файлу после параметра tail и имя узла или IP-адрес через параметр --db-master-host, например:
$ tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database postgres --db-master-host mdw --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256 --delimiter ',' --exclude-columns trip_duration
Основные параметры mxgate следующие:
--db-database postgres // Specify the database name
--db-master-host mdw // master node host name or IP
--db-master-port 5432 // Database port
--db-user mxadmin // Database username
--time-format raw // original format, no conversion
--target trip // The table name to be imported
--parallel 256 // Parallel number
--delimiter ',' // delimiter
Более подробную информацию о mxgate см. в разделе MatrixGate
YMatrix предоставляет функцию time_bucket, которая поддерживает вычисления с разбиением по любому временному интервалу. Перед использованием необходимо установить расширение matrixts в базе данных для инициализации компонентов временных данных (повторное создание не требуется).
=# CREATE EXTENSION matrixts;
Затем можно использовать следующий SQL-запрос, чтобы подсчитать количество поездок каждый день:
=# SELECT time_bucket('24 hours', pickup_datetime) AS day, count(*)
from trip
GROUP BY day
ORDER BY day;
Если вы хотите узнать, сколько людей совершали поездки каждый час 2 января 2016 года, используйте следующий SQL-запрос:
=# SELECT time_bucket('1 hour', pickup_datetime) AS hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' AND pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;
С помощью функций max и min быстро выясняется, что максимальное расстояние в текущем наборе данных составляет 485,9 мили. Если вы хотите дополнительно проанализировать общее количество поездок с расстоянием более 10, 10–50, 50–100, 100–200 и более 200 миль, достаточно одного SQL-запроса:
=# SELECT distance_range, count(*) AS num_of_trips
FROM
(
SELECT
CASE
WHEN trip_distance <= 10 THEN 10
WHEN trip_distance > 10 AND trip_distance <= 50 THEN 50
WHEN trip_distance > 50 AND trip_distance <= 100 THEN 100
WHEN trip_distance > 100 AND trip_distance <= 200 THEN 200
WHEN trip_distance > 200 THEN 500
END AS distance_range
FROM trip
) AS temp
GROUP BY distance_range;
После выполнения вы получите результат:
distance_range | num_of_trips
----------------------------------------------------------------------------------------------------------------------------------
10 | 10308767
50 | 586200
100 | 379
200 | 58
500 | 9