Практические примеры использования потоковых вычислений

Потоковые таблицы могут создаваться только суперпользователями. Необходимо предоставить привилегии суперпользователя.
Инструкции по созданию суперпользователя см. в разделе CREATE_ROLE.

В этом документе представлены базовые практические примеры, которые помогут пользователям быстро начать работу с функциями потоковых вычислений в YMatrix Database.


Потоковые вычисления — это метод обработки данных, позволяющий быстро анализировать и непрерывно обрабатывать данные в реальном времени. В YMatrix Database вы можете использовать SQL для быстрого создания собственных потоков данных. Помимо поддержки операций вставки, фильтрации, корректировки и заполнения, система также поддерживает операции потоковых вычислений в реальном времени, такие как расширение измерений, агрегация, каскадирование, ветвление и объединение. Это значительно повышает возможности системы хранилища данных в части обработки данных в реальном времени и одновременно снижает сложность системы. Благодаря встроенным потоковым вычислениям в YMatrix вы можете добиться обновления результатов на уровне секунд, в режиме реального времени и с инкрементальным обновлением.

В настоящее время потоковые вычисления широко применяются во многих областях, например:

  • В финансах: многомерный анализ данных, мониторинг денежных потоков в реальном времени, анализ инвестиционных рисков;
  • В производстве: мониторинг и оповещение в реальном времени, прогнозирующее техническое обслуживание, контроль качества;
  • В транспорте: интеллектуальное управление дорожным движением, анализ траекторий транспортных средств, прогнозирование объёмов транспортного потока.

Кроме того, потоковые вычисления активно используются в военной сфере, моделировании, электронной коммерции, цепочках поставок и Интернете вещей (IoT).


Пример 1: Быстрое расширение измерений данных о продуктах

«Расширение измерений» обычно означает обогащение существующей таблицы за счёт соединения дополнительных столбцов или атрибутов из другой таблицы, в результате чего формируется широкая денормализованная таблица. YMatrix поддерживает расширение измерений в реальном времени.
Мы продемонстрируем это на простом примере, в котором данные о заказах дополняются информацией о товарах, что позволяет анализировать общий объём продаж по категориям товаров.


Структура таблиц

  • ods_order: Информация о заказах — факт-таблица, хранящая записи о заказах

    Поле Описание
    id Идентификатор заказа
    prod_id Идентификатор продукта
    ts Временная метка заказа
  • dim_prod: Информация о продуктах — таблица измерений, хранящая детали о продуктах

    Поле Описание
    id Идентификатор продукта
    pord_name Название продукта
    pord_detail Детали продукта
  • dwd_order_detail: Детали заказа — расширенная таблица заказов, содержащая полную информацию о продуктах

    Поле Описание
    id Идентификатор заказа
    ts Временная метка
    prod_id Идентификатор продукта
    pord_name Название продукта
    pord_detail Детали продукта

Операции

  1. Сначала создайте таблицы ods_order и dim_prod.

     CREATE TABLE ods_order (
        id int,
        prod_id int,
        ts timestamp
        )
     DISTRIBUTED BY (id);
    
     CREATE TABLE dim_prod (
         id int,
         prod_name text,
         prod_detail text
      )
     DISTRIBUTED BY (id);
  2. Затем создайте поток dwd_order_detail, чтобы выполнять расширение измерений над исходными транзакционными данными в реальном времени. При поступлении новых данных в dim_prod, таблица dwd_order_detail будет инкрементально обновляться в реальном времени, автоматически записывая последние расширенные записи транзакций.

     CREATE STREAM dwd_order_detail(id, ts, prod_id, prod_name, prod_detail)
     AS (
        SELECT
           ods_order.id,
           ods_order.ts,
           ods_order.prod_id,
           dim_prod.prod_name,
           dim_prod.prod_detail
        FROM STREAMING ALL dim_prod
        INNER JOIN ods_order
            ON dim_prod.id = ods_order.prod_id
     ) PRIMARY KEY (id);
  3. Подготовьте тестовые данные.

  • Вставка данных о заказах

      -- Order 1
      INSERT INTO ods_order
      VALUES (
          1,
          1,
          current_timestamp
      );
      -- Order 2
      INSERT INTO ods_order
      VALUES (
          2,
          2,
          current_timestamp
      );
  • Вставка данных о продуктах

      INSERT INTO dim_prod
      VALUES (
          1,
          'apple',
          'fruit_001'
      );
    
      INSERT INTO dim_prod
      VALUES (
          2,
          'cola',
          'drink_001'
      );

Посредством соединения dwd_order_detail и ods_order через поток dim_prod, при каждой вставке новых данных в dim_prod, потоковая таблица dwd_order_detail немедленно обновляется, отображая расширенные записи заказов.

  • Запрос текущих результатов

      SELECT * FROM dwd_order_detail;
    
       id |             ts             | prod_id | prod_name | prod_detail
      ----+----------------------------+---------+-----------+------------
        1 | 2024-08-01 15:50:23.117737 |       1 | apple     | fruit_001
        2 | 2024-08-01 15:50:35.115252 |       2 | cola      | drink_001
      (2 rows)
  1. Чтобы обновить данные в таблице dim_prod, используйте оператор UPDATE.

     -- Update cola -> pepsi
     UPDATE dim_prod SET prod_name = 'pepsi' WHERE id = 2;
  • Запрос последних результатов

       SELECT * FROM dwd_order_detail;
    
       id |             ts             | prod_id | prod_name | prod_detail
      ----+----------------------------+---------+-----------+------------
        1 | 2024-08-01 15:50:23.117737 |       1 | apple     | fruit_001
        2 | 2024-08-01 15:50:35.115252 |       2 | pepsi     | drink_001
      (2 rows)
    
  1. Чтобы удалить данные из таблицы dim_prod, используйте оператор DELETE.

     -- Delete order 2
     DELETE FROM dim_prod WHERE id = 2;
  • Запрос последних результатов

      SELECT * FROM dwd_order_detail;
    
       id |             ts             | prod_id | prod_name | prod_detail
      ----+----------------------------+---------+-----------+------------
        1 | 2024-08-01 15:50:23.117737 |       1 | apple     | fruit_001
      (1 row)

Пример 2: Многоуровневая агрегация производственных данных

«Агрегация» обычно означает суммирование набора данных для вычисления статистики, такой как сумма, среднее значение, максимум и минимум. YMatrix поддерживает агрегацию в реальном времени над вставленными, обновлёнными и удалёнными данными.
Мы продемонстрируем эту возможность на примере мониторинга в умном производстве, где производственные данные агрегируются в реальном времени для отслеживания месячного и годового выпуска продукции.


Структура таблиц

  • dwd_production: Таблица данных о выпуске продукции

    Поле Описание
    id Идентификатор записи
    category Категория продукта
    value Объём производства
    ts Временная метка
  • dws_stream_agg_month: Агрегированные месячные показатели

    Поле Описание
    category Категория продукта
    y Год
    m Месяц
    ym Год-месяц
    month_sum Общий объём производства за месяц
    month_cut Количество записей за месяц
  • dws_stream_agg_year: Агрегированные годовые показатели

    Поле Описание
    category Категория продукта
    y Год
    year_sum Накопленный объём производства за год
    year_cut Накопленное количество записей за год

Операции

  1. Сначала создайте таблицу dwd_production для хранения производственных данных и вставьте образцы записей.

     -- Create production data table
     CREATE TABLE dwd_production (
         id        bigserial,
         category int,
         value     bigint,
         ts        timestamp
     ) DISTRIBUTED BY (id);
    
     -- Insert data
     INSERT INTO dwd_production(category, value, ts) VALUES
        (1002, 59, '2023-12-12 03:44:05'),
        (1001, 15, '2024-01-02 11:22:33'),
        (1001, 20, '2024-01-03 22:33:44'),
        (1002, 34, '2024-01-04 01:02:03'),
        (1001, 27, '2024-02-11 02:03:04'),
        (1002, 57, '2024-02-12 03:04:05');
  2. Создайте поток dws_stream_agg_month для месячной агрегации и поток dws_stream_agg_year для годовой агрегации. При получении новых данных оба потока будут автоматически обновлять свои результаты.

     -- Create stream dws_stream_agg_month for monthly production
     CREATE STREAM dws_stream_agg_month (category, y, m, ym, month_sum, month_cnt) AS (
        SELECT
          category,
          extract(year FROM date_trunc('year', ts)::date),
          extract(month FROM date_trunc('month', ts)::date),
          date_trunc('month', ts)::date,
          sum(value),
          count(value)
        FROM STREAMING ALL dwd_production
        GROUP BY 1, 2, 3, 4 -- Group by category, year, month, year-month
      )
      DISTRIBUTED BY (category, y, m);
    
     -- Create stream dws_stream_agg_year for annual production
     CREATE STREAM dws_stream_agg_year (category, year, year_sum, year_cnt) AS (
        SELECT
          dws_stream_agg_month.category,
          dws_stream_agg_month.y,
          sum(dws_stream_agg_month.month_sum),
          sum(dws_stream_agg_month.month_cnt)
        FROM STREAMING ALL dws_stream_agg_month
        GROUP BY 1, 2 -- Group by category, year
      )
      DISTRIBUTED BY (category, year);
  3. Проанализируйте и запросите результаты.

  • Запрос текущих данных в dwd_production:

      -- Sort by category and timestamp
      SELECT * FROM dwd_production ORDER BY 2,4;
  • Запрос dws_stream_agg_month и dws_stream_agg_year для получения месячных и годовых агрегатов:

      -- Query monthly production, sorted by category, year, month
      SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
       category |  y   | m  |     ym     | month_sum | month_cnt 
      ----------+------+----+------------+-----------+-----------
           1001 | 2024 |  1 | 2024-01-01 |        35 |         2
           1001 | 2024 |  2 | 2024-02-01 |        27 |         1
           1002 | 2023 | 12 | 2023-12-01 |        59 |         1
           1002 | 2024 |  1 | 2024-01-01 |        34 |         1
           1002 | 2024 |  2 | 2024-02-01 |        57 |         1
      (5 rows)
    
      -- Query annual production, sorted by category, year
      SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
        category | year | year_sum | year_cnt
      ----------+------+----------+----------
           1001 | 2024 |       62 |        3
           1002 | 2023 |       59 |        1
           1002 | 2024 |       91 |        2
      (3 rows)
  • Вставьте новую запись в dwd_production. Потоки dws_stream_agg_month и dws_stream_agg_year выполнит инкрементальную агрегацию новых данных.

      INSERT INTO dwd_production VALUES(7,1001,30,'2024-04-04 01:23:44');
  • Повторно запросите dws_stream_agg_month и dws_stream_agg_year, чтобы увидеть обновлённые результаты:

      SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
       category |  y   | m  |     ym     | month_sum | month_cnt 
      ----------+------+----+------------+-----------+-----------
           1001 | 2024 |  1 | 2024-01-01 |        35 |        2
           1001 | 2024 |  2 | 2024-02-01 |        27 |        1
           1001 | 2024 |  4 | 2024-04-01 |        30 |        1
           1002 | 2023 | 12 | 2023-12-01 |        59 |        1
           1002 | 2024 |  1 | 2024-01-01 |        34 |        1
           1002 | 2024 |  2 | 2024-02-01 |        57 |        1
      (6 rows)
    
      SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
        category | year | year_sum | year_cnt
      ----------+------+----------+----------
           1001 | 2024 |       92 |        4
           1002 | 2023 |       59 |        1
           1002 | 2024 |       91 |        2
      (3 rows)
  1. Чтобы обновить данные в dwd_production, используйте оператор UPDATE.

     UPDATE dwd_production SET value = 100 WHERE id = 7;
  • Запросите потоки dws_stream_agg_month и dws_stream_agg_year, чтобы проверить обновлённые агрегаты:

      SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
       category |  y   | m  |     ym     | month_sum | month_cnt 
      ----------+------+----+------------+-----------+-----------
           1001 | 2024 |  1 | 2024-01-01 |        35 |         2
           1001 | 2024 |  2 | 2024-02-01 |        27 |         1
           1001 | 2024 |  4 | 2024-04-01 |       100 |         1
           1002 | 2023 | 12 | 2023-12-01 |        59 |         1
           1002 | 2024 |  1 | 2024-01-01 |        34 |         1
           1002 | 2024 |  2 | 2024-02-01 |        57 |         1
    
      SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
    
       category | year | year_sum | year_cnt
      ----------+------+----------+----------
           1001 | 2024 |      162 |        4
           1002 | 2023 |       59 |        1
           1002 | 2024 |       91 |        2
      (3 rows)
    
  1. Чтобы удалить данные из dwd_production, используйте оператор DELETE.

     DELETE FROM dwd_production WHERE id = 7;
  • Запросите потоки dws_stream_agg_month и dws_stream_agg_year, чтобы проверить обновлённые результаты:

      -- Due to inherent limitations in reverse aggregation, after deleting a row, downstream streams cannot revert SUM() to NULL. Instead, the aggregated result is set to 0.
      SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
       category |  y   | m  |     ym     | month_sum | month_cnt      
      ----------+------+----+------------+-----------+-----------
           1001 | 2024 |  1 | 2024-01-01 |        35 |         2 
           1001 | 2024 |  2 | 2024-02-01 |        27 |         1 
           1001 | 2024 |  4 | 2024-04-01 |         0 |         0 
           1002 | 2023 | 12 | 2023-12-01 |        59 |         1 
           1002 | 2024 |  1 | 2024-01-01 |        34 |         1 
           1002 | 2024 |  2 | 2024-02-01 |        57 |         1 
    
      SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
       category | year | year_sum | year_cnt
      ----------+------+----------+----------
           1001 | 2024 |       62 |        3
           1002 | 2023 |       59 |        1
           1002 | 2024 |       91 |        2
      (3 rows)

Пример 3: Соединение двух потоков для анализа транспортных данных

«Соединение двух потоков (Dual-stream JOIN)» означает объединение двух потоков данных в реальном времени по общему ключу для совместного анализа данных из обоих источников. YMatrix поддерживает инкрементальные операции JOIN над вновь поступающими данными. Мы продемонстрируем это на примере мониторинга дорожного движения в реальном времени: соединение данных о транспортных потоках и количестве аварий позволяет быстро оценить состояние дорог и выдавать ранние предупреждения при превышении заданных пороговых значений.


Структура таблиц

  • dwd_traffic_flow: Данные об объёме транспортного потока

    Поле Описание
    id Идентификатор дороги
    road_n Количество транспортных средств
  • dwd_traffic_event: Данные об авариях

    Поле Описание
    id Идентификатор дороги
    traf_n Количество аварий
  • dws_stream_trafficinfo_total: Таблица объединённых транспортных данных

    Поле Описание
    id Идентификатор дороги
    road_n Количество транспортных средств
    traf_n Количество аварий

Операции

  1. Сначала создайте таблицу dwd_traffic_flow и таблицу dwd_traffic_event для хранения потоков данных об объёме транспортного потока и дорожно-транспортных происшествиях соответственно.

     CREATE TABLE dwd_traffic_flow (
         id int,
         road_n int
     )
     DISTRIBUTED BY (id);
     CREATE INDEX ON dwd_traffic_flow (id);
    
     CREATE TABLE dwd_traffic_event (
         id int,
         traf_n int
     )
     DISTRIBUTED BY (id);
     CREATE INDEX ON dwd_traffic_event (id);
  2. Затем создайте поток dws_stream_trafficinfo_total, определённый как потоковое соединение (JOIN) между таблицей dwd_traffic_flow и таблицей dwd_traffic_event. Это означает, что результат потока будет обновляться в реальном времени при любом изменении данных о транспортном потоке или происшествиях.

     CREATE STREAM dws_stream_trafficinfo_total(id, road_n, traf_n)
     AS (
         SELECT
             dwd_traffic_flow.id,
             dwd_traffic_flow.road_n,
             dwd_traffic_event.traf_n
         FROM STREAMING ALL dwd_traffic_flow
         INNER JOIN STREAMING ALL dwd_traffic_event
             ON dwd_traffic_flow.id = dwd_traffic_event.id
     ) PRIMARY KEY (id);
  3. Проанализируйте и запросите результаты. Используйте данные из таблицы dws_stream_trafficinfo_total для выявления транспортных паттернов и прогнозирования заторов. Выдавайте оповещения при превышении объёма транспортного потока порогового значения или при возникновении происшествий.

  • Вставьте по одной записи в каждую из таблиц dwd_traffic_flow и dwd_traffic_event.

      INSERT INTO dwd_traffic_flow VALUES (1, 80);
    
      INSERT INTO dwd_traffic_event VALUES (2, 3);
  • Запросите потоковую таблицу dws_stream_trafficinfo_total. Поскольку для соединения двух потоков нет совпадающих ключей, в таблице dws_stream_trafficinfo_total не будет результатов.

      SELECT * FROM dws_stream_trafficinfo_total;
    
       id | road_n | traf_n
      ----+--------+--------
      (0 rows)
  • Вставьте ещё по две записи в каждую из таблиц dwd_traffic_flow и dwd_traffic_event.

      INSERT INTO dwd_traffic_flow VALUES (2, NULL), (3, 100);
    
      INSERT INTO dwd_traffic_event VALUES (1, 5), (3, NULL);
  • Запросите потоковую таблицу dws_stream_trafficinfo_total. Поток dws_stream_trafficinfo_total выполнит инкрементальные вычисления над новыми данными и вернёт последние результаты соединения двух потоков.

      SELECT * FROM dws_stream_trafficinfo_total;
       id | road_n | traf_n
      ----+--------+--------
        2 |        |      3
        1 |     80 |      5
        3 |    100 |
      (3 rows)
  1. Чтобы обновить данные в таблице dwd_traffic_flow или dwd_traffic_event, используйте оператор UPDATE.

     UPDATE dwd_traffic_flow SET road_n = 101 WHERE id=2;
  • Запросите потоковую таблицу dws_stream_trafficinfo_total. Поток dws_stream_trafficinfo_total выполнит соединение в реальном времени с обновлёнными данными и отобразит последние результаты.

      SELECT * FROM dws_stream_trafficinfo_total;
    
       id | road_n | traf_n
      ----+--------+--------
        1 |     80 |      5
        3 |    100 |
        2 |    101 |      3
      (3 rows)
  1. Чтобы удалить данные из таблицы dwd_traffic_flow или dwd_traffic_event, используйте оператор DELETE.

     DELETE FROM dwd_traffic_event WHERE id = 2;
  • Запросите потоковую таблицу dws_stream_trafficinfo_total. Поток dws_stream_trafficinfo_total пересчитает соединение после удаления и вернёт обновлённые результаты.

      SELECT * FROM dws_stream_trafficinfo_total;
    
       id | road_n  | traf_n
      ----+---------+--------
        1 |   80    |   5
        3 |   100   |
      (2 rows)