Потоковые таблицы могут создаваться только суперпользователями. Необходимо предоставить привилегии суперпользователя.
Инструкции по созданию суперпользователя см. в разделе CREATE_ROLE.
В этом документе представлены базовые практические примеры, которые помогут пользователям быстро начать работу с функциями потоковых вычислений в YMatrix Database.
Потоковые вычисления — это метод обработки данных, позволяющий быстро анализировать и непрерывно обрабатывать данные в реальном времени. В YMatrix Database вы можете использовать SQL для быстрого создания собственных потоков данных. Помимо поддержки операций вставки, фильтрации, корректировки и заполнения, система также поддерживает операции потоковых вычислений в реальном времени, такие как расширение измерений, агрегация, каскадирование, ветвление и объединение. Это значительно повышает возможности системы хранилища данных в части обработки данных в реальном времени и одновременно снижает сложность системы. Благодаря встроенным потоковым вычислениям в YMatrix вы можете добиться обновления результатов на уровне секунд, в режиме реального времени и с инкрементальным обновлением.
В настоящее время потоковые вычисления широко применяются во многих областях, например:
Кроме того, потоковые вычисления активно используются в военной сфере, моделировании, электронной коммерции, цепочках поставок и Интернете вещей (IoT).
«Расширение измерений» обычно означает обогащение существующей таблицы за счёт соединения дополнительных столбцов или атрибутов из другой таблицы, в результате чего формируется широкая денормализованная таблица. YMatrix поддерживает расширение измерений в реальном времени.
Мы продемонстрируем это на простом примере, в котором данные о заказах дополняются информацией о товарах, что позволяет анализировать общий объём продаж по категориям товаров.
ods_order: Информация о заказах — факт-таблица, хранящая записи о заказах
| Поле | Описание |
|---|---|
| id | Идентификатор заказа |
| prod_id | Идентификатор продукта |
| ts | Временная метка заказа |
dim_prod: Информация о продуктах — таблица измерений, хранящая детали о продуктах
| Поле | Описание |
|---|---|
| id | Идентификатор продукта |
| pord_name | Название продукта |
| pord_detail | Детали продукта |
dwd_order_detail: Детали заказа — расширенная таблица заказов, содержащая полную информацию о продуктах
| Поле | Описание |
|---|---|
| id | Идентификатор заказа |
| ts | Временная метка |
| prod_id | Идентификатор продукта |
| pord_name | Название продукта |
| pord_detail | Детали продукта |
Сначала создайте таблицы ods_order и dim_prod.
CREATE TABLE ods_order (
id int,
prod_id int,
ts timestamp
)
DISTRIBUTED BY (id);
CREATE TABLE dim_prod (
id int,
prod_name text,
prod_detail text
)
DISTRIBUTED BY (id);
Затем создайте поток dwd_order_detail, чтобы выполнять расширение измерений над исходными транзакционными данными в реальном времени. При поступлении новых данных в dim_prod, таблица dwd_order_detail будет инкрементально обновляться в реальном времени, автоматически записывая последние расширенные записи транзакций.
CREATE STREAM dwd_order_detail(id, ts, prod_id, prod_name, prod_detail)
AS (
SELECT
ods_order.id,
ods_order.ts,
ods_order.prod_id,
dim_prod.prod_name,
dim_prod.prod_detail
FROM STREAMING ALL dim_prod
INNER JOIN ods_order
ON dim_prod.id = ods_order.prod_id
) PRIMARY KEY (id);
Подготовьте тестовые данные.
Вставка данных о заказах
-- Order 1
INSERT INTO ods_order
VALUES (
1,
1,
current_timestamp
);
-- Order 2
INSERT INTO ods_order
VALUES (
2,
2,
current_timestamp
);
Вставка данных о продуктах
INSERT INTO dim_prod
VALUES (
1,
'apple',
'fruit_001'
);
INSERT INTO dim_prod
VALUES (
2,
'cola',
'drink_001'
);
Посредством соединения dwd_order_detail и ods_order через поток dim_prod, при каждой вставке новых данных в dim_prod, потоковая таблица dwd_order_detail немедленно обновляется, отображая расширенные записи заказов.
Запрос текущих результатов
SELECT * FROM dwd_order_detail;
id | ts | prod_id | prod_name | prod_detail
----+----------------------------+---------+-----------+------------
1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001
2 | 2024-08-01 15:50:35.115252 | 2 | cola | drink_001
(2 rows)
Чтобы обновить данные в таблице dim_prod, используйте оператор UPDATE.
-- Update cola -> pepsi
UPDATE dim_prod SET prod_name = 'pepsi' WHERE id = 2;
Запрос последних результатов
SELECT * FROM dwd_order_detail;
id | ts | prod_id | prod_name | prod_detail
----+----------------------------+---------+-----------+------------
1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001
2 | 2024-08-01 15:50:35.115252 | 2 | pepsi | drink_001
(2 rows)
Чтобы удалить данные из таблицы dim_prod, используйте оператор DELETE.
-- Delete order 2
DELETE FROM dim_prod WHERE id = 2;
Запрос последних результатов
SELECT * FROM dwd_order_detail;
id | ts | prod_id | prod_name | prod_detail
----+----------------------------+---------+-----------+------------
1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001
(1 row)
«Агрегация» обычно означает суммирование набора данных для вычисления статистики, такой как сумма, среднее значение, максимум и минимум. YMatrix поддерживает агрегацию в реальном времени над вставленными, обновлёнными и удалёнными данными.
Мы продемонстрируем эту возможность на примере мониторинга в умном производстве, где производственные данные агрегируются в реальном времени для отслеживания месячного и годового выпуска продукции.
dwd_production: Таблица данных о выпуске продукции
| Поле | Описание |
|---|---|
| id | Идентификатор записи |
| category | Категория продукта |
| value | Объём производства |
| ts | Временная метка |
dws_stream_agg_month: Агрегированные месячные показатели
| Поле | Описание |
|---|---|
| category | Категория продукта |
| y | Год |
| m | Месяц |
| ym | Год-месяц |
| month_sum | Общий объём производства за месяц |
| month_cut | Количество записей за месяц |
dws_stream_agg_year: Агрегированные годовые показатели
| Поле | Описание |
|---|---|
| category | Категория продукта |
| y | Год |
| year_sum | Накопленный объём производства за год |
| year_cut | Накопленное количество записей за год |
Сначала создайте таблицу dwd_production для хранения производственных данных и вставьте образцы записей.
-- Create production data table
CREATE TABLE dwd_production (
id bigserial,
category int,
value bigint,
ts timestamp
) DISTRIBUTED BY (id);
-- Insert data
INSERT INTO dwd_production(category, value, ts) VALUES
(1002, 59, '2023-12-12 03:44:05'),
(1001, 15, '2024-01-02 11:22:33'),
(1001, 20, '2024-01-03 22:33:44'),
(1002, 34, '2024-01-04 01:02:03'),
(1001, 27, '2024-02-11 02:03:04'),
(1002, 57, '2024-02-12 03:04:05');
Создайте поток dws_stream_agg_month для месячной агрегации и поток dws_stream_agg_year для годовой агрегации. При получении новых данных оба потока будут автоматически обновлять свои результаты.
-- Create stream dws_stream_agg_month for monthly production
CREATE STREAM dws_stream_agg_month (category, y, m, ym, month_sum, month_cnt) AS (
SELECT
category,
extract(year FROM date_trunc('year', ts)::date),
extract(month FROM date_trunc('month', ts)::date),
date_trunc('month', ts)::date,
sum(value),
count(value)
FROM STREAMING ALL dwd_production
GROUP BY 1, 2, 3, 4 -- Group by category, year, month, year-month
)
DISTRIBUTED BY (category, y, m);
-- Create stream dws_stream_agg_year for annual production
CREATE STREAM dws_stream_agg_year (category, year, year_sum, year_cnt) AS (
SELECT
dws_stream_agg_month.category,
dws_stream_agg_month.y,
sum(dws_stream_agg_month.month_sum),
sum(dws_stream_agg_month.month_cnt)
FROM STREAMING ALL dws_stream_agg_month
GROUP BY 1, 2 -- Group by category, year
)
DISTRIBUTED BY (category, year);
Проанализируйте и запросите результаты.
Запрос текущих данных в dwd_production:
-- Sort by category and timestamp
SELECT * FROM dwd_production ORDER BY 2,4;
Запрос dws_stream_agg_month и dws_stream_agg_year для получения месячных и годовых агрегатов:
-- Query monthly production, sorted by category, year, month
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
(5 rows)
-- Query annual production, sorted by category, year
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 62 | 3
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
Вставьте новую запись в dwd_production. Потоки dws_stream_agg_month и dws_stream_agg_year выполнит инкрементальную агрегацию новых данных.
INSERT INTO dwd_production VALUES(7,1001,30,'2024-04-04 01:23:44');
Повторно запросите dws_stream_agg_month и dws_stream_agg_year, чтобы увидеть обновлённые результаты:
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 30 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
(6 rows)
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 92 | 4
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
Чтобы обновить данные в dwd_production, используйте оператор UPDATE.
UPDATE dwd_production SET value = 100 WHERE id = 7;
Запросите потоки dws_stream_agg_month и dws_stream_agg_year, чтобы проверить обновлённые агрегаты:
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 100 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 162 | 4
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
Чтобы удалить данные из dwd_production, используйте оператор DELETE.
DELETE FROM dwd_production WHERE id = 7;
Запросите потоки dws_stream_agg_month и dws_stream_agg_year, чтобы проверить обновлённые результаты:
-- Due to inherent limitations in reverse aggregation, after deleting a row, downstream streams cannot revert SUM() to NULL. Instead, the aggregated result is set to 0.
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 0 | 0
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 62 | 3
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
«Соединение двух потоков (Dual-stream JOIN)» означает объединение двух потоков данных в реальном времени по общему ключу для совместного анализа данных из обоих источников. YMatrix поддерживает инкрементальные операции JOIN над вновь поступающими данными. Мы продемонстрируем это на примере мониторинга дорожного движения в реальном времени: соединение данных о транспортных потоках и количестве аварий позволяет быстро оценить состояние дорог и выдавать ранние предупреждения при превышении заданных пороговых значений.
dwd_traffic_flow: Данные об объёме транспортного потока
| Поле | Описание |
|---|---|
| id | Идентификатор дороги |
| road_n | Количество транспортных средств |
dwd_traffic_event: Данные об авариях
| Поле | Описание |
|---|---|
| id | Идентификатор дороги |
| traf_n | Количество аварий |
dws_stream_trafficinfo_total: Таблица объединённых транспортных данных
| Поле | Описание |
|---|---|
| id | Идентификатор дороги |
| road_n | Количество транспортных средств |
| traf_n | Количество аварий |
Сначала создайте таблицу dwd_traffic_flow и таблицу dwd_traffic_event для хранения потоков данных об объёме транспортного потока и дорожно-транспортных происшествиях соответственно.
CREATE TABLE dwd_traffic_flow (
id int,
road_n int
)
DISTRIBUTED BY (id);
CREATE INDEX ON dwd_traffic_flow (id);
CREATE TABLE dwd_traffic_event (
id int,
traf_n int
)
DISTRIBUTED BY (id);
CREATE INDEX ON dwd_traffic_event (id);
Затем создайте поток dws_stream_trafficinfo_total, определённый как потоковое соединение (JOIN) между таблицей dwd_traffic_flow и таблицей dwd_traffic_event. Это означает, что результат потока будет обновляться в реальном времени при любом изменении данных о транспортном потоке или происшествиях.
CREATE STREAM dws_stream_trafficinfo_total(id, road_n, traf_n)
AS (
SELECT
dwd_traffic_flow.id,
dwd_traffic_flow.road_n,
dwd_traffic_event.traf_n
FROM STREAMING ALL dwd_traffic_flow
INNER JOIN STREAMING ALL dwd_traffic_event
ON dwd_traffic_flow.id = dwd_traffic_event.id
) PRIMARY KEY (id);
Проанализируйте и запросите результаты. Используйте данные из таблицы dws_stream_trafficinfo_total для выявления транспортных паттернов и прогнозирования заторов. Выдавайте оповещения при превышении объёма транспортного потока порогового значения или при возникновении происшествий.
Вставьте по одной записи в каждую из таблиц dwd_traffic_flow и dwd_traffic_event.
INSERT INTO dwd_traffic_flow VALUES (1, 80);
INSERT INTO dwd_traffic_event VALUES (2, 3);
Запросите потоковую таблицу dws_stream_trafficinfo_total. Поскольку для соединения двух потоков нет совпадающих ключей, в таблице dws_stream_trafficinfo_total не будет результатов.
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
(0 rows)
Вставьте ещё по две записи в каждую из таблиц dwd_traffic_flow и dwd_traffic_event.
INSERT INTO dwd_traffic_flow VALUES (2, NULL), (3, 100);
INSERT INTO dwd_traffic_event VALUES (1, 5), (3, NULL);
Запросите потоковую таблицу dws_stream_trafficinfo_total. Поток dws_stream_trafficinfo_total выполнит инкрементальные вычисления над новыми данными и вернёт последние результаты соединения двух потоков.
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
2 | | 3
1 | 80 | 5
3 | 100 |
(3 rows)
Чтобы обновить данные в таблице dwd_traffic_flow или dwd_traffic_event, используйте оператор UPDATE.
UPDATE dwd_traffic_flow SET road_n = 101 WHERE id=2;
Запросите потоковую таблицу dws_stream_trafficinfo_total. Поток dws_stream_trafficinfo_total выполнит соединение в реальном времени с обновлёнными данными и отобразит последние результаты.
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
1 | 80 | 5
3 | 100 |
2 | 101 | 3
(3 rows)
Чтобы удалить данные из таблицы dwd_traffic_flow или dwd_traffic_event, используйте оператор DELETE.
DELETE FROM dwd_traffic_event WHERE id = 2;
Запросите потоковую таблицу dws_stream_trafficinfo_total. Поток dws_stream_trafficinfo_total пересчитает соединение после удаления и вернёт обновлённые результаты.
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+---------+--------
1 | 80 | 5
3 | 100 |
(2 rows)