YMatrix подходит для сценариев обработки временных данных в IoT-устройствах любого масштаба. В данном руководстве рассматривается конкретный пример — сбор и анализ временных данных такси в городе, чтобы продемонстрировать методы загрузки, обработки и запросов временных данных в YMatrix.
В городе проживает более 8 миллионов человек и работает около 200 000 автомобилей такси. Городская администрация собирает и публикует информацию о каждой поездке: время посадки и высадки пассажиров, места посадки и высадки, количество пассажиров, сумма оплаты и способ оплаты. С помощью этих данных можно провести анализ таких показателей, как коэффициент использования такси и даже общее состояние дорожного движения. Это позволяет улучшить управление городом и повысить качество жизни жителей и туристов. В рамках данного руководства предоставляется архив данных за один месяц. Нажмите здесь, чтобы начать работу в области управления городским транспортом (код доступа: 1x4u).
Среди собранных данных присутствует способ оплаты. Возможные способы оплаты: наличные, кредитная карта, бесплатная поездка, спорная оплата, неизвестно и недействительна. Такие данные мы называем статическими атрибутами. Эта информация сохраняется путём создания таблицы payment_types, чтобы в дальнейшем можно было связывать эти метаданные при выполнении запросов. Объём данных, связанных с «способом оплаты», небольшой, и требуется возможность их обновления, поэтому используется стандартный движок хранения HEAP. Как правило, если явно не указано иное, по умолчанию применяется именно движок HEAP.
CREATE TABLE IF NOT EXISTS payment_types (
payment_type int,
description text
)
USING HEAP;
Используя оператор IF NOT EXISTS, можно избежать ошибок при повторном создании таблицы с одинаковым именем.
INSERT INTO payment_types VALUES
(1, ‘Credit card’),
(2, ‘Cash’),
(3, ‘No charge’),
(4, ‘Disputed’),
(5, ‘Unknown’),
(6, ‘Invalid trip’);
Также существует тип тарифа, указывающий на цену поездки: стандартный тариф, аэропорт №1, аэропорт №2, специальные зоны, договорная цена, поездка нескольких пассажиров и т.д. Аналогично можно создать статическую таблицу атрибутов rate_codes для хранения этой информации и использовать стандартный движок хранения HEAP:
CREATE TABLE IF NOT EXISTS rate_codes (
rate_code int,
description text
)
USING HEAP;
INSERT INTO rate_codes VALUES
(1, ‘Standard Rate’),
(2, ‘Airport 1’),
(3, ‘Airport 2’),
(4, ‘Special Area’),
(5, ‘Negotiated Price’),
(6, ‘Group’);
Далее можно создать таблицу временных рядов для хранения конкретных данных о поездках. Ниже приведены поля с кратким описанием их значений: pickup_datetime/dropoff_datetime — момент времени посадки и высадки соответственно; pickup_longitude /pickup_latitude — долгота и широта точки посадки; dropoff_longitude/dropoff_latitude — долгота и широта точки высадки; passenger_count — количество пассажиров; trip_distance — расстояние поездки (в милях); total_amount — стоимость поездки; последнее поле trip_duration — это производное поле, которое генерируется при загрузке данных и содержит продолжительность поездки (в минутах).
В сценарии поездок на такси метаданные в таблице поездок представляют собой временные данные, формируемые в результате изменения состояния устройства во времени. Поскольку источники данных в сценариях временных рядов разнообразны и сильно варьируются, предъявляются высокие требования к записи и хранению временных данных, в то время как потребность в обновлении и удалении данных минимальна. Поэтому MARS2 является наилучшим выбором, поскольку он значительно оптимизирован по производительности записи, хранения и запросов временных данных.
Таблица MARS2 зависит от расширения matrixts, поэтому перед созданием таблицы необходимо сначала создать это расширение в базе данных, использующей данный движок хранения. Если расширение уже создано, повторное создание не требуется — можно пропустить этот шаг:
CREATE EXTENSION matrixts;
При создании таблицы следует использовать оператор USING MARS2 для указания движка хранения и оператор WITH для задания соответствующих параметров. Параметр compresstype определяет алгоритм сжатия и поддерживает значения zstd, zlib и lz4; значение по умолчанию — lz4; параметр compresslevel определяет уровень сжатия. Чем меньше значение, тем выше степень сжатия; средние значения обеспечивают баланс между скоростью сжатия и степенью сжатия. Разные алгоритмы имеют разные допустимые диапазоны значений:
CREATE TABLE IF NOT EXISTS trip (
vendor_id text,
pickup_datetime timestamp without time zone,
dropoff_datetime timestamp without time zone,
passenger_count int,
trip_distance numeric,
pickup_longitude numeric,
pickup_latitude numeric,
rate_code_id int,
store_and_fwd_flag text,
dropoff_longitude numeric,
dropoff_latitude numeric,
payment_type int,
fare_amount numeric,
extra numeric,
mta_tax numeric,
tip_amount numeric,
tolls_amount numeric,
improvement_surcharge numeric,
total_amount numeric,
trip_duration numeric GENERATED ALWAYS AS (EXTRACT(EPOCH FROM (dropoff_datetime - pickup_datetime)::INTERVAL)/60) STORED
)
USING MARS2
WITH (compresstype='lz4', compresslevel=1)
DISTRIBUTED BY (vendor_id)
PARTITION BY RANGE (pickup_datetime)
( START (date '2016-01-01') INCLUSIVE
END (date '2016-02-01') EXCLUSIVE
EVERY (INTERVAL '1 day') );
Наличие оператора DISTRIBUTED BY означает, что данные в таблице trip будут распределены по блокам (bucket) с использованием хэш-распределения по столбцу vendor_id, таким образом, данные с одинаковыми значениями окажутся на одном узле. Оператор PARTITION BY выполняет партиционирование данных. Будет создано 31 партиционная таблица с интервалом в один день — с 1 января 2016 года (включительно) по 1 февраля 2016 года (не включая). Таблица trip автоматически разделяется по дням, что упрощает быстрый отбор данных по временным периодам и способствует быстрой обработке устаревших данных в будущем.
Примечание!
Согласно правилам синтаксиса SQL, сначала должен быть указан оператор DISTRIBUTED BY, затем — PARTITION BY. При фактическом выполнении сначала применяется DISTRIBUTED BY для распределения данных по соответствующим узлам, после чего на каждом узле выполняется PARTITION BY для вставки данных в соответствующую подпартицию.
После создания таблицы MARS2 необходимо дополнительно создать индекс типа mars2_btree, чтобы обеспечить нормальную запись и чтение данных. Индексация выполняет несколько основных функций:
CREATE INDEX idx_trip ON trip USING mars2_btree (vendor_id, pickup_datetime);
Вы можете найти путь к файлу yellow_tripdata_2016-01.csv, скачанному по указанной выше ссылке, и использовать команду mxgate для загрузки данных. После параметра tail укажите фактический путь к файлу и используйте параметр --db-master-host для указания имени узла или IP-адреса, например:
tail -n +2 /home/mxadmin/workspace/nyc-taxi-data/yellow_tripdata_2016-01.csv | mxgate --source stdin --db-database postgres --db-master-host mdw --db-master-port 5432 --db-user mxadmin --time-format raw --target trip --parallel 256 --delimiter ',' --exclude-columns trip_duration
Основные параметры mxgate перечислены ниже:
--db-database postgres Specify the database name
--db-master-host mdw master node host name or IP
--db-master-port 5432 database port
--db-user mxadmin database username
--time-format raw original format, no conversion
--target trip table name to import
--parallel 256 parallel number
--delimiter ',' delimiter
Более подробную информацию о mxgate см. в разделе MatrixGate
YMatrix предоставляет функцию time_bucket, которая поддерживает сегментированные вычисления с любым временным интервалом. Перед использованием необходимо установить расширение MatrixTS в базе данных для инициализации компонентов временных данных без необходимости повторного создания.
CREATE EXTENSION matrixts;
После этого можно использовать следующий SQL-запрос для подсчёта количества поездок каждый день:
SELECT time_bucket('24 hours', pickup_datetime) AS day, count(*)
from trip
GROUP BY day
ORDER BY day;
Если вы хотите узнать, сколько людей совершали поездки каждый час 2 января 2016 года, используйте следующий SQL-запрос:
SELECT time_bucket('1 hour', pickup_datetime) AS hour, sum(passenger_count)
FROM trip
WHERE pickup_datetime >= '2016-01-02 00:00:00' AND pickup_datetime < '2016-01-03 00:00:00'
GROUP BY hour
ORDER BY hour;
С помощью функций max и min можно быстро узнать, что максимальное расстояние в текущем наборе данных составляет 485,9 мили. Если вы хотите дополнительно проанализировать общее количество поездок с расстоянием более 10, от 10 до 50, от 50 до 100, от 100 до 200 и более 200 миль, достаточно одного SQL-запроса:
SELECT distance_range, count(*) AS num_of_trips
FROM
(
SELECT
CASE
WHEN trip_distance <= 10 THEN 10
WHEN trip_distance > 10 AND trip_distance <= 50 THEN 50
WHEN trip_distance > 50 AND trip_distance <= 100 THEN 100
WHEN trip_distance > 100 AND trip_distance <= 200 THEN 200
WHEN trip_distance > 200 THEN 500
END AS distance_range
FROM trip
) AS temp
GROUP BY distance_range;
После выполнения вы получите результат:
distance_range | num_of_trips
----------------------------------------------------------------------------------------------------------------------------------
10 | 10308767
50 | 586200
100 | 379
200 | 58
500 | 9