Stream Computing Use Case Practice

Stream tables can only be created by superusers. Superuser privileges must be granted.
For instructions on creating a superuser, refer to CREATE_ROLE.

This document provides basic use cases to help users quickly get started with stream computing features in YMatrix Database.


Stream computing is a data processing technique that enables fast analysis and continuous processing of real-time data. In YMatrix Database, you can use SQL to rapidly create your own data streams. Beyond supporting operations such as insert, filter, correction, and fill, it also supports real-time stream computing operations including dimension expansion, aggregation, cascading, branching, and merging. This significantly enhances the real-time capabilities of data warehouse systems while reducing system complexity. With in-database stream computing in YMatrix, you can achieve second-level, real-time, and incremental result updates.

Currently, stream computing has broad applications across many domains, for example:

  • In finance: multi-dimensional data analysis, real-time fund flow monitoring, and investment risk analysis;
  • In manufacturing: real-time monitoring and alerting, predictive maintenance, and quality control;
  • In transportation: intelligent traffic management, vehicle trajectory analysis, and traffic volume forecasting.

In addition, stream computing is widely used in military, simulation, e-commerce, supply chain, and IoT fields.


Use Case 1: Fast Dimension Expansion of Product Data

"Dimension expansion" typically refers to enriching an existing table by joining additional columns or attributes from another table, resulting in a wide denormalized table. YMatrix stream computing supports real-time dimension expansion.
We will demonstrate this with a simple example that expands order data with product information, enabling analysis of total sales per business line by product category.


Table Structure

  • ods_order: Order information — fact table storing order records

    Field Description
    id Order ID
    prod_id Product ID
    ts Order timestamp
  • dim_prod: Product information — dimension table storing product details

    Field Description
    id Product ID
    pord_name Product name
    pord_detail Product detail
  • dwd_order_detail: Order detail — expanded order table containing full product information

    Field Description
    id Order ID
    ts Timestamp
    prod_id Product ID
    pord_name Product name
    pord_detail Product detail

Operations

  1. First, create tables ods_order and dim_prod.

     CREATE TABLE ods_order (
        id int,
        prod_id int,
        ts timestamp
        )
     DISTRIBUTED BY (id);
    
     CREATE TABLE dim_prod (
         id int,
         prod_name text,
         prod_detail text
      )
     DISTRIBUTED BY (id);
  2. Next, create stream dwd_order_detail to perform real-time dimension expansion on raw transaction data. When new data arrives in dim_prod, dwd_order_detail will incrementally refresh in real time, automatically writing the latest expanded transaction records.

     CREATE STREAM dwd_order_detail(id, ts, prod_id, prod_name, prod_detail)
     AS (
        SELECT
           ods_order.id,
           ods_order.ts,
           ods_order.prod_id,
           dim_prod.prod_name,
           dim_prod.prod_detail
        FROM STREAMING ALL dim_prod
        INNER JOIN ods_order
            ON dim_prod.id = ods_order.prod_id
     ) PRIMARY KEY (id);
  3. Prepare test data.

  • Insert order data

      -- Order 1
      INSERT INTO ods_order
      VALUES (
          1,
          1,
          current_timestamp
      );
      -- Order 2
      INSERT INTO ods_order
      VALUES (
          2,
          2,
          current_timestamp
      );
  • Insert product data

      INSERT INTO dim_prod
      VALUES (
          1,
          'apple',
          'fruit_001'
      );
    
      INSERT INTO dim_prod
      VALUES (
          2,
          'cola',
          'drink_001'
      );

By joining dwd_order_detail and ods_order via stream dim_prod, whenever new data is inserted into dim_prod, the stream table dwd_order_detail updates immediately, showing the expanded order records.

  • Query current results

      SELECT * FROM dwd_order_detail;
    
       id |             ts             | prod_id | prod_name | prod_detail
      ----+----------------------------+---------+-----------+------------
        1 | 2024-08-01 15:50:23.117737 |       1 | apple     | fruit_001
        2 | 2024-08-01 15:50:35.115252 |       2 | cola      | drink_001
      (2 rows)
  1. To update data in table dim_prod, use the UPDATE statement.

     -- Update cola -> pepsi
     UPDATE dim_prod SET prod_name = 'pepsi' WHERE id = 2;
  • Query latest results

       SELECT * FROM dwd_order_detail;
    
       id |             ts             | prod_id | prod_name | prod_detail
      ----+----------------------------+---------+-----------+------------
        1 | 2024-08-01 15:50:23.117737 |       1 | apple     | fruit_001
        2 | 2024-08-01 15:50:35.115252 |       2 | pepsi     | drink_001
      (2 rows)
    
  1. To delete data from table dim_prod, use the DELETE statement.

     -- Delete order 2
     DELETE FROM dim_prod WHERE id = 2;
  • Query latest results

      SELECT * FROM dwd_order_detail;
    
       id |             ts             | prod_id | prod_name | prod_detail
      ----+----------------------------+---------+-----------+------------
        1 | 2024-08-01 15:50:23.117737 |       1 | apple     | fruit_001
      (1 row)

Use Case 2: Multi-Level Aggregation of Manufacturing Data

"Aggregation" generally refers to summarizing a dataset to compute statistics such as sum, average, maximum, and minimum values. YMatrix stream computing supports real-time aggregation over inserted, updated, and deleted data.
We will demonstrate this feature using a smart manufacturing monitoring scenario, aggregating production data in real time to track monthly and yearly output.


Table Structure

  • dwd_production: Product output data table

    Field Description
    id Record ID
    category Product category
    value Quantity produced
    ts Timestamp
  • dws_stream_agg_month: Monthly aggregated output

    Field Description
    category Product category
    y Year
    m Month
    ym Year-month
    month_sum Total monthly production
    month_cut Number of records per month
  • dws_stream_agg_year: Yearly aggregated output

    Field Description
    category Product category
    y Year
    year_sum Cumulative annual production
    year_cut Cumulative number of records per year

Operations

  1. First, create table dwd_production to store production data and insert sample records.

     -- Create production data table
     CREATE TABLE dwd_production (
         id        bigserial,
         category int,
         value     bigint,
         ts        timestamp
     ) DISTRIBUTED BY (id);
    
     -- Insert data
     INSERT INTO dwd_production(category, value, ts) VALUES
        (1002, 59, '2023-12-12 03:44:05'),
        (1001, 15, '2024-01-02 11:22:33'),
        (1001, 20, '2024-01-03 22:33:44'),
        (1002, 34, '2024-01-04 01:02:03'),
        (1001, 27, '2024-02-11 02:03:04'),
        (1002, 57, '2024-02-12 03:04:05');
  2. Create stream dws_stream_agg_month for monthly aggregation and stream dws_stream_agg_year for yearly aggregation. Upon receiving new data, both streams automatically update their results.

     -- Create stream dws_stream_agg_month for monthly production
     CREATE STREAM dws_stream_agg_month (category, y, m, ym, month_sum, month_cnt) AS (
        SELECT
          category,
          extract(year FROM date_trunc('year', ts)::date),
          extract(month FROM date_trunc('month', ts)::date),
          date_trunc('month', ts)::date,
          sum(value),
          count(value)
        FROM STREAMING ALL dwd_production
        GROUP BY 1, 2, 3, 4 -- Group by category, year, month, year-month
      )
      DISTRIBUTED BY (category, y, m);
    
     -- Create stream dws_stream_agg_year for annual production
     CREATE STREAM dws_stream_agg_year (category, year, year_sum, year_cnt) AS (
        SELECT
          dws_stream_agg_month.category,
          dws_stream_agg_month.y,
          sum(dws_stream_agg_month.month_sum),
          sum(dws_stream_agg_month.month_cnt)
        FROM STREAMING ALL dws_stream_agg_month
        GROUP BY 1, 2 -- Group by category, year
      )
      DISTRIBUTED BY (category, year);
  3. Analyze and query results.

  • Query current data in dwd_production:

      -- Sort by category and timestamp
      SELECT * FROM dwd_production ORDER BY 2,4;
  • Query dws_stream_agg_month and dws_stream_agg_year for monthly and yearly aggregates:

      -- Query monthly production, sorted by category, year, month
      SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
       category |  y   | m  |     ym     | month_sum | month_cnt 
      ----------+------+----+------------+-----------+-----------
           1001 | 2024 |  1 | 2024-01-01 |        35 |         2
           1001 | 2024 |  2 | 2024-02-01 |        27 |         1
           1002 | 2023 | 12 | 2023-12-01 |        59 |         1
           1002 | 2024 |  1 | 2024-01-01 |        34 |         1
           1002 | 2024 |  2 | 2024-02-01 |        57 |         1
      (5 rows)
    
      -- Query annual production, sorted by category, year
      SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
        category | year | year_sum | year_cnt
      ----------+------+----------+----------
           1001 | 2024 |       62 |        3
           1002 | 2023 |       59 |        1
           1002 | 2024 |       91 |        2
      (3 rows)
  • Insert a new record into dwd_production. Streams dws_stream_agg_month and dws_stream_agg_year will incrementally aggregate the new data.

      INSERT INTO dwd_production VALUES(7,1001,30,'2024-04-04 01:23:44');
  • Re-query dws_stream_agg_month and dws_stream_agg_year to see updated results:

      SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
       category |  y   | m  |     ym     | month_sum | month_cnt 
      ----------+------+----+------------+-----------+-----------
           1001 | 2024 |  1 | 2024-01-01 |        35 |        2
           1001 | 2024 |  2 | 2024-02-01 |        27 |        1
           1001 | 2024 |  4 | 2024-04-01 |        30 |        1
           1002 | 2023 | 12 | 2023-12-01 |        59 |        1
           1002 | 2024 |  1 | 2024-01-01 |        34 |        1
           1002 | 2024 |  2 | 2024-02-01 |        57 |        1
      (6 rows)
    
      SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
        category | year | year_sum | year_cnt
      ----------+------+----------+----------
           1001 | 2024 |       92 |        4
           1002 | 2023 |       59 |        1
           1002 | 2024 |       91 |        2
      (3 rows)
  1. To update data in dwd_production, use the UPDATE statement.

     UPDATE dwd_production SET value = 100 WHERE id = 7;
  • Query streams dws_stream_agg_month and dws_stream_agg_year to verify updated aggregates:

      SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
       category |  y   | m  |     ym     | month_sum | month_cnt 
      ----------+------+----+------------+-----------+-----------
           1001 | 2024 |  1 | 2024-01-01 |        35 |         2
           1001 | 2024 |  2 | 2024-02-01 |        27 |         1
           1001 | 2024 |  4 | 2024-04-01 |       100 |         1
           1002 | 2023 | 12 | 2023-12-01 |        59 |         1
           1002 | 2024 |  1 | 2024-01-01 |        34 |         1
           1002 | 2024 |  2 | 2024-02-01 |        57 |         1
    
      SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
    
       category | year | year_sum | year_cnt
      ----------+------+----------+----------
           1001 | 2024 |      162 |        4
           1002 | 2023 |       59 |        1
           1002 | 2024 |       91 |        2
      (3 rows)
    
  1. To delete data from dwd_production, use the DELETE statement.

     DELETE FROM dwd_production WHERE id = 7;
  • Query streams dws_stream_agg_month and dws_stream_agg_year to verify updated results:

      -- Due to inherent limitations in reverse aggregation, after deleting a row, downstream streams cannot revert SUM() to NULL. Instead, the aggregated result is set to 0.
      SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
       category |  y   | m  |     ym     | month_sum | month_cnt      
      ----------+------+----+------------+-----------+-----------
           1001 | 2024 |  1 | 2024-01-01 |        35 |         2 
           1001 | 2024 |  2 | 2024-02-01 |        27 |         1 
           1001 | 2024 |  4 | 2024-04-01 |         0 |         0 
           1002 | 2023 | 12 | 2023-12-01 |        59 |         1 
           1002 | 2024 |  1 | 2024-01-01 |        34 |         1 
           1002 | 2024 |  2 | 2024-02-01 |        57 |         1 
    
      SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
       category | year | year_sum | year_cnt
      ----------+------+----------+----------
           1001 | 2024 |       62 |        3
           1002 | 2023 |       59 |        1
           1002 | 2024 |       91 |        2
      (3 rows)

Use Case 3: Dual-Stream Join for Traffic Data

"Dual-stream JOIN" refers to joining two real-time data streams on a common key to combine and analyze data from both sources. YMatrix stream computing supports incremental JOIN operations on newly arriving data. We will demonstrate this using a real-time traffic monitoring scenario, where joining vehicle flow and accident counts enables rapid road condition assessment and early warning based on predefined thresholds.


Table Structure

  • dwd_traffic_flow: Traffic volume data

    Field Description
    id Road ID
    road_n Vehicle count
  • dwd_traffic_event: Accident data

    Field Description
    id Road ID
    traf_n Number of accidents
  • dws_stream_trafficinfo_total: Joined traffic data table

    Field Description
    id Road ID
    road_n Vehicle count
    traf_n Number of accidents

--- SPLIT ---

Operations

  1. First, create table dwd_traffic_flow and table dwd_traffic_event to store data streams for road traffic volume and traffic incidents, respectively.

     CREATE TABLE dwd_traffic_flow (
         id int,
         road_n int
     )
     DISTRIBUTED BY (id);
     CREATE INDEX ON dwd_traffic_flow (id);
    
     CREATE TABLE dwd_traffic_event (
         id int,
         traf_n int
     )
     DISTRIBUTED BY (id);
     CREATE INDEX ON dwd_traffic_event (id);
  2. Next, create stream dws_stream_trafficinfo_total, defined as a streaming JOIN between table dwd_traffic_flow and table dwd_traffic_event. This means the stream result updates in real time whenever either the road traffic data or incident data changes.

     CREATE STREAM dws_stream_trafficinfo_total(id, road_n, traf_n)
     AS (
         SELECT
             dwd_traffic_flow.id,
             dwd_traffic_flow.road_n,
             dwd_traffic_event.traf_n
         FROM STREAMING ALL dwd_traffic_flow
         INNER JOIN STREAMING ALL dwd_traffic_event
             ON dwd_traffic_flow.id = dwd_traffic_event.id
     ) PRIMARY KEY (id);
  3. Analyze and query the results. Use data from table dws_stream_trafficinfo_total to identify traffic patterns and predict congestion. Issue alerts when traffic volume exceeds a threshold or when incidents occur.

  • Insert one record into each of table dwd_traffic_flow and table dwd_traffic_event.

      INSERT INTO dwd_traffic_flow VALUES (1, 80);
    
      INSERT INTO dwd_traffic_event VALUES (2, 3);
  • Query the stream table dws_stream_trafficinfo_total. Since no matching keys exist for the two-stream JOIN, no results appear in table dws_stream_trafficinfo_total.

      SELECT * FROM dws_stream_trafficinfo_total;
    
       id | road_n | traf_n
      ----+--------+--------
      (0 rows)
  • Insert two additional records into each of table dwd_traffic_flow and table dwd_traffic_event.

      INSERT INTO dwd_traffic_flow VALUES (2, NULL), (3, 100);
    
      INSERT INTO dwd_traffic_event VALUES (1, 5), (3, NULL);
  • Query the stream table dws_stream_trafficinfo_total. Stream dws_stream_trafficinfo_total performs incremental computation on the new data and returns the latest two-stream JOIN results.

      SELECT * FROM dws_stream_trafficinfo_total;
       id | road_n | traf_n
      ----+--------+--------
        2 |        |      3
        1 |     80 |      5
        3 |    100 |
      (3 rows)
  1. To update data in table dwd_traffic_flow or dwd_traffic_event, use the UPDATE statement.

     UPDATE dwd_traffic_flow SET road_n = 101 WHERE id=2;
  • Query the stream table dws_stream_trafficinfo_total. Stream dws_stream_trafficinfo_total computes the JOIN in real time with the updated data and displays the latest results.

      SELECT * FROM dws_stream_trafficinfo_total;
    
       id | road_n | traf_n
      ----+--------+--------
        1 |     80 |      5
        3 |    100 |
        2 |    101 |      3
      (3 rows)
  1. To delete data from table dwd_traffic_flow or dwd_traffic_event, use the DELETE statement.

     DELETE FROM dwd_traffic_event WHERE id = 2;
  • Query the stream table dws_stream_trafficinfo_total. Stream dws_stream_trafficinfo_total recomputes the JOIN after deletion and returns the updated results.

      SELECT * FROM dws_stream_trafficinfo_total;
    
       id | road_n  | traf_n
      ----+---------+--------
        1 |   80    |   5
        3 |   100   |
      (2 rows)