Stream tables can only be created by superusers. Superuser privileges must be granted.
For instructions on creating a superuser, refer to CREATE_ROLE.
This document provides basic use cases to help users quickly get started with stream computing features in YMatrix Database.
Stream computing is a data processing technique that enables fast analysis and continuous processing of real-time data. In YMatrix Database, you can use SQL to rapidly create your own data streams. Beyond supporting operations such as insert, filter, correction, and fill, it also supports real-time stream computing operations including dimension expansion, aggregation, cascading, branching, and merging. This significantly enhances the real-time capabilities of data warehouse systems while reducing system complexity. With in-database stream computing in YMatrix, you can achieve second-level, real-time, and incremental result updates.
Currently, stream computing has broad applications across many domains, for example:
In addition, stream computing is widely used in military, simulation, e-commerce, supply chain, and IoT fields.
"Dimension expansion" typically refers to enriching an existing table by joining additional columns or attributes from another table, resulting in a wide denormalized table. YMatrix stream computing supports real-time dimension expansion.
We will demonstrate this with a simple example that expands order data with product information, enabling analysis of total sales per business line by product category.
ods_order: Order information — fact table storing order records
| Field | Description |
|---|---|
| id | Order ID |
| prod_id | Product ID |
| ts | Order timestamp |
dim_prod: Product information — dimension table storing product details
| Field | Description |
|---|---|
| id | Product ID |
| pord_name | Product name |
| pord_detail | Product detail |
dwd_order_detail: Order detail — expanded order table containing full product information
| Field | Description |
|---|---|
| id | Order ID |
| ts | Timestamp |
| prod_id | Product ID |
| pord_name | Product name |
| pord_detail | Product detail |
First, create tables ods_order and dim_prod.
CREATE TABLE ods_order (
id int,
prod_id int,
ts timestamp
)
DISTRIBUTED BY (id);
CREATE TABLE dim_prod (
id int,
prod_name text,
prod_detail text
)
DISTRIBUTED BY (id);
Next, create stream dwd_order_detail to perform real-time dimension expansion on raw transaction data. When new data arrives in dim_prod, dwd_order_detail will incrementally refresh in real time, automatically writing the latest expanded transaction records.
CREATE STREAM dwd_order_detail(id, ts, prod_id, prod_name, prod_detail)
AS (
SELECT
ods_order.id,
ods_order.ts,
ods_order.prod_id,
dim_prod.prod_name,
dim_prod.prod_detail
FROM STREAMING ALL dim_prod
INNER JOIN ods_order
ON dim_prod.id = ods_order.prod_id
) PRIMARY KEY (id);
Prepare test data.
Insert order data
-- Order 1
INSERT INTO ods_order
VALUES (
1,
1,
current_timestamp
);
-- Order 2
INSERT INTO ods_order
VALUES (
2,
2,
current_timestamp
);
Insert product data
INSERT INTO dim_prod
VALUES (
1,
'apple',
'fruit_001'
);
INSERT INTO dim_prod
VALUES (
2,
'cola',
'drink_001'
);
By joining dwd_order_detail and ods_order via stream dim_prod, whenever new data is inserted into dim_prod, the stream table dwd_order_detail updates immediately, showing the expanded order records.
Query current results
SELECT * FROM dwd_order_detail;
id | ts | prod_id | prod_name | prod_detail
----+----------------------------+---------+-----------+------------
1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001
2 | 2024-08-01 15:50:35.115252 | 2 | cola | drink_001
(2 rows)
To update data in table dim_prod, use the UPDATE statement.
-- Update cola -> pepsi
UPDATE dim_prod SET prod_name = 'pepsi' WHERE id = 2;
Query latest results
SELECT * FROM dwd_order_detail;
id | ts | prod_id | prod_name | prod_detail
----+----------------------------+---------+-----------+------------
1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001
2 | 2024-08-01 15:50:35.115252 | 2 | pepsi | drink_001
(2 rows)
To delete data from table dim_prod, use the DELETE statement.
-- Delete order 2
DELETE FROM dim_prod WHERE id = 2;
Query latest results
SELECT * FROM dwd_order_detail;
id | ts | prod_id | prod_name | prod_detail
----+----------------------------+---------+-----------+------------
1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001
(1 row)
"Aggregation" generally refers to summarizing a dataset to compute statistics such as sum, average, maximum, and minimum values. YMatrix stream computing supports real-time aggregation over inserted, updated, and deleted data.
We will demonstrate this feature using a smart manufacturing monitoring scenario, aggregating production data in real time to track monthly and yearly output.
dwd_production: Product output data table
| Field | Description |
|---|---|
| id | Record ID |
| category | Product category |
| value | Quantity produced |
| ts | Timestamp |
dws_stream_agg_month: Monthly aggregated output
| Field | Description |
|---|---|
| category | Product category |
| y | Year |
| m | Month |
| ym | Year-month |
| month_sum | Total monthly production |
| month_cut | Number of records per month |
dws_stream_agg_year: Yearly aggregated output
| Field | Description |
|---|---|
| category | Product category |
| y | Year |
| year_sum | Cumulative annual production |
| year_cut | Cumulative number of records per year |
First, create table dwd_production to store production data and insert sample records.
-- Create production data table
CREATE TABLE dwd_production (
id bigserial,
category int,
value bigint,
ts timestamp
) DISTRIBUTED BY (id);
-- Insert data
INSERT INTO dwd_production(category, value, ts) VALUES
(1002, 59, '2023-12-12 03:44:05'),
(1001, 15, '2024-01-02 11:22:33'),
(1001, 20, '2024-01-03 22:33:44'),
(1002, 34, '2024-01-04 01:02:03'),
(1001, 27, '2024-02-11 02:03:04'),
(1002, 57, '2024-02-12 03:04:05');
Create stream dws_stream_agg_month for monthly aggregation and stream dws_stream_agg_year for yearly aggregation. Upon receiving new data, both streams automatically update their results.
-- Create stream dws_stream_agg_month for monthly production
CREATE STREAM dws_stream_agg_month (category, y, m, ym, month_sum, month_cnt) AS (
SELECT
category,
extract(year FROM date_trunc('year', ts)::date),
extract(month FROM date_trunc('month', ts)::date),
date_trunc('month', ts)::date,
sum(value),
count(value)
FROM STREAMING ALL dwd_production
GROUP BY 1, 2, 3, 4 -- Group by category, year, month, year-month
)
DISTRIBUTED BY (category, y, m);
-- Create stream dws_stream_agg_year for annual production
CREATE STREAM dws_stream_agg_year (category, year, year_sum, year_cnt) AS (
SELECT
dws_stream_agg_month.category,
dws_stream_agg_month.y,
sum(dws_stream_agg_month.month_sum),
sum(dws_stream_agg_month.month_cnt)
FROM STREAMING ALL dws_stream_agg_month
GROUP BY 1, 2 -- Group by category, year
)
DISTRIBUTED BY (category, year);
Analyze and query results.
Query current data in dwd_production:
-- Sort by category and timestamp
SELECT * FROM dwd_production ORDER BY 2,4;
Query dws_stream_agg_month and dws_stream_agg_year for monthly and yearly aggregates:
-- Query monthly production, sorted by category, year, month
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
(5 rows)
-- Query annual production, sorted by category, year
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 62 | 3
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
Insert a new record into dwd_production. Streams dws_stream_agg_month and dws_stream_agg_year will incrementally aggregate the new data.
INSERT INTO dwd_production VALUES(7,1001,30,'2024-04-04 01:23:44');
Re-query dws_stream_agg_month and dws_stream_agg_year to see updated results:
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 30 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
(6 rows)
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 92 | 4
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
To update data in dwd_production, use the UPDATE statement.
UPDATE dwd_production SET value = 100 WHERE id = 7;
Query streams dws_stream_agg_month and dws_stream_agg_year to verify updated aggregates:
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 100 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 162 | 4
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
To delete data from dwd_production, use the DELETE statement.
DELETE FROM dwd_production WHERE id = 7;
Query streams dws_stream_agg_month and dws_stream_agg_year to verify updated results:
-- Due to inherent limitations in reverse aggregation, after deleting a row, downstream streams cannot revert SUM() to NULL. Instead, the aggregated result is set to 0.
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
----------+------+----+------------+-----------+-----------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 0 | 0
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
----------+------+----------+----------
1001 | 2024 | 62 | 3
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
"Dual-stream JOIN" refers to joining two real-time data streams on a common key to combine and analyze data from both sources. YMatrix stream computing supports incremental JOIN operations on newly arriving data. We will demonstrate this using a real-time traffic monitoring scenario, where joining vehicle flow and accident counts enables rapid road condition assessment and early warning based on predefined thresholds.
dwd_traffic_flow: Traffic volume data
| Field | Description |
|---|---|
| id | Road ID |
| road_n | Vehicle count |
dwd_traffic_event: Accident data
| Field | Description |
|---|---|
| id | Road ID |
| traf_n | Number of accidents |
dws_stream_trafficinfo_total: Joined traffic data table
| Field | Description |
|---|---|
| id | Road ID |
| road_n | Vehicle count |
| traf_n | Number of accidents |
--- SPLIT ---
Operations
First, create table dwd_traffic_flow and table dwd_traffic_event to store data streams for road traffic volume and traffic incidents, respectively.
CREATE TABLE dwd_traffic_flow (
id int,
road_n int
)
DISTRIBUTED BY (id);
CREATE INDEX ON dwd_traffic_flow (id);
CREATE TABLE dwd_traffic_event (
id int,
traf_n int
)
DISTRIBUTED BY (id);
CREATE INDEX ON dwd_traffic_event (id);
Next, create stream dws_stream_trafficinfo_total, defined as a streaming JOIN between table dwd_traffic_flow and table dwd_traffic_event. This means the stream result updates in real time whenever either the road traffic data or incident data changes.
CREATE STREAM dws_stream_trafficinfo_total(id, road_n, traf_n)
AS (
SELECT
dwd_traffic_flow.id,
dwd_traffic_flow.road_n,
dwd_traffic_event.traf_n
FROM STREAMING ALL dwd_traffic_flow
INNER JOIN STREAMING ALL dwd_traffic_event
ON dwd_traffic_flow.id = dwd_traffic_event.id
) PRIMARY KEY (id);
Analyze and query the results. Use data from table dws_stream_trafficinfo_total to identify traffic patterns and predict congestion. Issue alerts when traffic volume exceeds a threshold or when incidents occur.
Insert one record into each of table dwd_traffic_flow and table dwd_traffic_event.
INSERT INTO dwd_traffic_flow VALUES (1, 80);
INSERT INTO dwd_traffic_event VALUES (2, 3);
Query the stream table dws_stream_trafficinfo_total. Since no matching keys exist for the two-stream JOIN, no results appear in table dws_stream_trafficinfo_total.
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
(0 rows)
Insert two additional records into each of table dwd_traffic_flow and table dwd_traffic_event.
INSERT INTO dwd_traffic_flow VALUES (2, NULL), (3, 100);
INSERT INTO dwd_traffic_event VALUES (1, 5), (3, NULL);
Query the stream table dws_stream_trafficinfo_total. Stream dws_stream_trafficinfo_total performs incremental computation on the new data and returns the latest two-stream JOIN results.
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
2 | | 3
1 | 80 | 5
3 | 100 |
(3 rows)
To update data in table dwd_traffic_flow or dwd_traffic_event, use the UPDATE statement.
UPDATE dwd_traffic_flow SET road_n = 101 WHERE id=2;
Query the stream table dws_stream_trafficinfo_total. Stream dws_stream_trafficinfo_total computes the JOIN in real time with the updated data and displays the latest results.
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+--------+--------
1 | 80 | 5
3 | 100 |
2 | 101 | 3
(3 rows)
To delete data from table dwd_traffic_flow or dwd_traffic_event, use the DELETE statement.
DELETE FROM dwd_traffic_event WHERE id = 2;
Query the stream table dws_stream_trafficinfo_total. Stream dws_stream_trafficinfo_total recomputes the JOIN after deletion and returns the updated results.
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
----+---------+--------
1 | 80 | 5
3 | 100 |
(2 rows)