Flow tables can only be created under Superuser and require superuser permissions.
For the creation method of superusers, please refer to CREATE_ROLE
This document aims to facilitate users to quickly get started with some of the features of YMatrix database stream computing by providing basic use cases.
Stream computing is a data processing technology that quickly analyzes and continuously processes data generated in real time. In the YMatrix database, you can use SQL to quickly create your own data streams. In addition to supporting operations such as insertion, filtering, correction and filling, it also supports real-time stream calculation and processing operations such as dimension expansion, aggregation, cascade, forking, and merging, which comprehensively improves the real-time nature of the warehousing system and reduces the complexity of the system. Using the YMatrix library in-stream calculation, you can achieve second-level, real-time and incremental data results refresh. Currently, stream computing has a wide range of application scenarios, such as:
In addition, stream computing also has a large number of applications in various fields such as military, simulation, e-commerce, supply chain and the Internet of Things.
"Dimensional calculation" generally refers to the process of producing large-wide tables by associating columns or attributes in other tables to extend an existing data table structure. YMatrix's stream computing function supports real-time expanded dimensional calculations of data.
We will use a simple example of performing a dimensional expansion operation on order + product information. Expand business orders through the product purchase information of each customer to obtain the total sales of various products in each business line.
ods_order
is order information: a fact table that stores order information
Fields | Meaning |
---|---|
id | Order id |
prod_id | Product id |
ts | Order time |
dim_prod
is product information: a dimension table that stores product details
Fields | Meaning |
---|---|
id | Transaction Product Number |
pord_name | Product Name |
pord_detail | Product Details |
dwd_order_detail
is the order details: the order details table containing all product information after expansion
Fields | Meaning |
---|---|
id | Order id |
ts | Billing time |
prod_id | Product id |
pord_name | Product Name |
pord_detail | Product Details |
The first step, let's create the tables ods_order
and dim_prod
.
CREATE TABLE ods_order (
id int,
prod_id int,
ts timestamp
)
DISTRIBUTED BY (id);
CREATE TABLE dim_prod (
id int,
prod_name text,
prod_detail text
)
DISTRIBUTED BY (id);
The second step is to create the stream dwd_order_detail
to expand the data of the original transaction data through product information. When new data is updated in dim_prod
, dwd_order_detail
will be refreshed in increments in real time, and the latest, expanded transaction information will be automatically written.
CREATE STREAM dwd_order_detail(id, ts, prod_id, prod_name, prod_detail)
AS (
SELECT
ods_order.id,
ods_order.ts,
ods_order.prod_id,
dim_prod.prod_name,
dim_prod.prod_detail
FROM STREAMING ALL dim_prod
INNER JOIN ods_order
ON dim_prod.id = ods_order.prod_id
) PRIMARY KEY (id);
Step 3: Prepare the data.
Insert order information
-- Order 1
INSERT INTO ods_order
VALUES (
1,
1,
Current_timestamp
);
-- Order 2
INSERT INTO ods_order
VALUES (
2,
2,
Current_timestamp
);
Insert product information
INSERT INTO dim_prod
VALUES (
1,
'apple',
'fruit_001'
);
INSERT INTO dim_prod
VALUES (
2,
'cola',
'drink_001'
);
We associate the table ods_order
and table dim_prod
through the stream dwd_order_detail
, and once we insert new data into dim_prod
, the stream table dwd_order_detail
will also be updated immediately, showing the expanded new order data.
Query the current results
SELECT * FROM dwd_order_detail;
id | ts | prod_id | prod_name | prod_detail
---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001
2 | 2024-08-01 15:50:35.115252 | 2 | cola | drink_001
(2 rows)
dim_prod
, we can use the UPDATE
statement to operate. -- Update cola -> pepsi
UPDATE dim_prod SET prod_name = 'pepsi' WHERE id = 2;
Query the latest results
SELECT * FROM dwd_order_detail;
id | ts | prod_id | prod_name | prod_detail
---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001
2 | 2024-08-01 15:50:35.115252 | 2 | pepsi | drink_001
(2 rows)
dim_prod
, we can use the DELETE
statement to operate. -- Delete Order 2
DELETE FROM dim_prod WHERE id = 2;
Query the latest results
SELECT * FROM dwd_order_detail;
id | ts | prod_id | prod_name | prod_detail
---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | 2024-08-01 15:50:23.117737 | 1 | apple | fruit_001
(1 row)
"Aggregation calculation" usually refers to a summary and calculation of a set of data to generate statistical information such as sum, average, maximum, minimum, etc. YMatrix database stream computing supports real-time aggregated computing of newly added, updated, and deleted data.
We will further use this feature with examples of smart manufacturing real-time data monitoring. Keep abreast of monthly and annual product production data information through real-time aggregation of manufacturing data streams.
dwd_production
is the product production information table
Fields | Meaning |
---|---|
id | data number |
category | Product Type |
value | Quantity of product production |
ts | time |
dws_stream_agg_month
is the monthly summary of output
Fields | Meaning |
---|---|
category | Product Type |
y | year |
m | Month |
ym | year and month |
month_sum | Total monthly product production |
month_cut | monthly product data number |
dws_stream_agg_year
is the annual summary information of output
Fields | Meaning |
---|---|
category | Product Type |
y | year |
year_sum | annual cumulative product production quantity |
year_cut | annual cumulative product data number |
The first step, let's create a table dwd_production
for storing product production information and insert data.
-- Create a product production information table
CREATE TABLE dwd_production (
id bigserial,
category int,
value bigint,
ts timestamp
) DISTRIBUTED BY (id);
-- Insert data
INSERT INTO dwd_production(category, value, ts) VALUES
(1002, 59, '2023-12-12 03:44:05'),
(1001, 15, '2024-01-02 11:22:33'),
(1001, 20, '2024-01-03 22:33:44'),
(1002, 34, '2024-01-04 01:02:03'),
(1001, 27, '2024-02-11 02:03:04'),
(1002, 57, '2024-02-12 03:04:05');
The second step is to create the stream dws_stream_agg_month
, which is used for the aggregation operation of the monthly production of the product. Create a stream dws_stream_agg_year
for aggregation operations for annual product production. When new data is received, the streams dws_stream_agg_month
and dws_stream_agg_year
automatically perform aggregation operations, and the results in the update flow table are the latest.
--Create stream dws_stream_agg_month, monthly product production
CREATE STREAM dws_stream_agg_month (category, y, m, ym, month_sum, month_cnt) AS (
SELECT
category,
extract(year FROM date_trunc('year', ts)::date),
extract(month FROM date_trunc('month', ts)::date),
date_trunc('month', ts)::date,
sum(value),
count(value)
FROM STREAMING ALL dwd_production
GROUP BY 1, 2, 3, 4 -- Grouped by product type, year, month, year and month
)
DISTRIBUTED BY (category, y, m);
--Create stream dws_stream_agg_year, annual product production
CREATE STREAM dws_stream_agg_year (category, year, year_sum, year_cnt) AS (
SELECT
dws_stream_agg_month.category,
dws_stream_agg_month.y,
sum(dws_stream_agg_month.month_sum),
sum(dws_stream_agg_month.month_cnt)
FROM STREAMING ALL dws_stream_agg_month
GROUP BY 1, 2 -- Grouped by product type, year
)
DISTRIBUTED BY (category, year);
Step 3: Analyze and query data results
We can first query the product production information in the current table dwd_production
.
-- Sort by product type and time
SELECT * FROM dwd_production ORDER BY 2,4;
Then look up the table dws_stream_agg_month
and table dws_stream_agg_year
, and the results show monthly and annual data of product production based on product type and time.
--Query product monthly production information, sort by type, year, and month
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
(5 rows)
--Query product annual production volume information, sort by type and year
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
---------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1001 | 2024 | 62 | 3
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
Let's add a new product production data to the table dwd_production
, and the streams dws_stream_agg_month
and dws_stream_agg_year
will aggregate the newly added data in real time continuously.
INSERT INTO dwd_production VALUES(7,1001,30,'2024-04-04 01:23:44');
Query the tables dws_stream_agg_month
and dws_stream_agg_year
again, and the latest aggregated data results are displayed.
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 30 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
(6 rows)
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
---------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1001 | 2024 | 92 | 4
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
dwd_production
, we can use the UPDATE
statement to operate. UPDATE dwd_production SET value = 100 WHERE id = 7;
Then query the flow tables dws_stream_agg_month
and dws_stream_agg_year
, and the aggregation results have been updated.
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 100 | 1
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
---------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1001 | 2024 | 162 | 4
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
dwd_production
, we can use the DELETE
statement to operate. DELETE FROM dwd_production WHERE id = 7;
Then query the flow tables dws_stream_agg_month
and dws_stream_agg_year
, and the aggregation results have been updated.
-- Because of the natural limit of reverse aggregation, after deleting data rows, the downstream stream cannot update the sum() result to NULL, so the aggregate result after deleting data is set to 0.
SELECT * FROM dws_stream_agg_month ORDER BY 1,2,3;
category | y | m | ym | month_sum | month_cnt
---------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1001 | 2024 | 1 | 2024-01-01 | 35 | 2
1001 | 2024 | 2 | 2024-02-01 | 27 | 1
1001 | 2024 | 4 | 2024-04-01 | 0 | 0
1002 | 2023 | 12 | 2023-12-01 | 59 | 1
1002 | 2024 | 1 | 2024-01-01 | 34 | 1
1002 | 2024 | 2 | 2024-02-01 | 57 | 1
SELECT category, year, year_sum, year_cnt FROM dws_stream_agg_year ORDER BY 1,2;
category | year | year_sum | year_cnt
---------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1001 | 2024 | 62 | 3
1002 | 2023 | 59 | 1
1002 | 2024 | 91 | 2
(3 rows)
"Dual-stream JOIN calculation" generally refers to the connection (JOIN) operation of two real-time data streams in stream processing so that data items from two different streams can be combined together for analysis or further processing based on some common keys. YMatrix database stream calculation supports incremental JOIN calculations for new data. We will further use this feature through the example of real-time traffic monitoring to experience the effect of real-time incremental JOIN of dual data streams.
By correlation between traffic flow and traffic accidents, the current road situation can be quickly judged through the pre-set early warning threshold, and timely road conditions warning and accident handling can be achieved.
dwd_traffic_flow
is the traffic data table
Fields | Meaning |
---|---|
id | Road number |
road_n | Traffic data information |
dwd_traffic_event
is a traffic accident data sheet
Fields | Meaning |
---|---|
id | Road number |
traf_n | Number of traffic accidents |
dws_stream_trafficinfo_total
is the traffic data table after a dual-stream connection.
Fields | Meaning |
---|---|
id | Road number |
road_n | Traffic data information |
traf_n | Number of traffic accidents |
The first step is to create tables dwd_traffic_flow
and tables dwd_traffic_event
to store the data flow of road traffic and the data flow of traffic accidents respectively.
CREATE TABLE dwd_traffic_flow (
id int,
road_n int
)
DISTRIBUTED BY (id);
CREATE INDEX ON dwd_traffic_flow (id);
CREATE TABLE dwd_traffic_event (
id int,
traf_n int
)
DISTRIBUTED BY (id);
CREATE INDEX ON dwd_traffic_event (id);
The second step is to create the flow dws_stream_trafficinfo_total
, which is defined as streaming JOIN calculations of the table dwd_traffic_flow
and table `dwd_traffic_event, that is, when the data flow of road traffic or the data flow of traffic accidents changes, it will be updated to the results of the flow table in real time.
CREATE STREAM dws_stream_trafficinfo_total(id, road_n, traf_n)
AS (
SELECT
dwd_traffic_flow.id,
dwd_traffic_flow.road_n,
dwd_traffic_event.traf_n
FROM STREAMING ALL dwd_traffic_flow
INNER JOIN STREAMING ALL dwd_traffic_event
ON dwd_traffic_flow.id = dwd_traffic_event.id
) PRIMARY KEY (id);
The third step is to analyze and query the data results, and judge the traffic flow pattern and predict traffic congestion by analyzing the data in the table dws_stream_trafficinfo_total
. When the traffic flow on the road reaches a certain threshold or a traffic accident occurs, a timely warning is made.
We first insert a traffic record in each of the table dwd_traffic_flow
and table dwd_traffic_event
.
INSERT INTO dwd_traffic_flow VALUES (1, 80);
INSERT INTO dwd_traffic_event VALUES (2, 3);
Then query the flow table dws_stream_trafficinfo_total
. Since the existing data does not apply to the dual-stream JOIN operation, the result data has not been queried in the table dws_stream_trafficinfo_total
.
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
---+----------------------------------------------------------------------------------------------------------------------------
(0 rows)
We have added 2 new traffic records to each of the table dwd_traffic_flow
and table dwd_traffic_event
.
INSERT INTO dwd_traffic_flow VALUES (2, NULL), (3, 100);
INSERT INTO dwd_traffic_event VALUES (1, 5), (3, NULL);
Then query the flow table dws_stream_trafficinfo_total
. Stream dws_stream_trafficinfo_total
performs incremental calculations on the newly added data to display the latest dual stream JOIN results.
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
---+----------------------------------------------------------------------------------------------------------------------------
2 | | 3
1 | 80 | 5
3 | 100 |
(3 rows)
dwd_traffic_flow
or dwd_traffic_event
, we can use the UPDATE statement to operate. UPDATE dwd_traffic_flow SET road_n = 101 WHERE id=2;
Then query the flow table dws_stream_trafficinfo_total
. The stream dws_stream_trafficinfo_total
performs real-time JOIN calculation on the modified data to display the latest data results.
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
---+----------------------------------------------------------------------------------------------------------------------------
1 | 80 | 5
3 | 100 |
2 | 101 | 3
(3 rows)
dwd_traffic_flow
or the table dwd_traffic_event
, we can use the DELETE statement to operate. DELETE FROM dwd_traffic_event WHERE id = 2;
Then query the flow table dws_stream_trafficinfo_total
. The stream dws_stream_trafficinfo_total
performs real-time JOIN calculation on the deleted data to display the latest data results.
SELECT * FROM dws_stream_trafficinfo_total;
id | road_n | traf_n
---+----------------------------------------------------------------------------------------------------------------------------
1 | 80 | 5
3 | 100 |
(2 rows)