WITH [NO] The location of the DATA must be placed at the outermost layer of the subquery
CREATE TABLE t8 (id int, d1_id int, d2_id int, c1 text, ts timestamp);
CREATE TABLE dim1 (id int, data text); CREATE TABLE dim2 (id int, data text);
CREATE STREAM s8 AS ( select a.id,a.c1 from ( SELECT t8.id, t8.c1, ts , t8.d1_id, dim1.data AS d1_data , t8.d2_id, dim2.data AS d2_data FROM STREAMING t8 INNER JOIN dim1 ON t8.d1_id = dim1.id LEFT JOIN dim2 ON t8.d2_id = dim2.id ) a where a.id >1 WITH DATA ) distributed by(id);
- Results Display
=# INSERT INTO dim1 VALUES (1,'mmt1'); =# INSERT INTO dim1 VALUES (2,'mmt2'); =# INSERT INTO dim2 VALUES (1,'hmz1'); =# INSERT INTO dim2 VALUES (2,'hmz2');
=# INSERT INTO t8 VALUES (1,1,1,'mmt',current_timestamp);
=# select * from s8; id | c1 ----+---- (0 rows)
=# INSERT INTO t8 VALUES (2,2,2,'mmt',current_timestamp);
=# select * from s8;
id | c1
----+-----
2 | mmt
(1 row)
### Expanding flow
- SQL Definition Example
CREATE TABLE t8 (id int, d1_id int, d2_id int, c1 text, ts timestamp);
CREATE TABLE dim1 (id int, data text); CREATE TABLE dim2 (id int, data text);
CREATE STREAM s8 AS ( SELECT t8.id, t8.c1, ts , t8.d1_id, dim1.data AS d1_data , t8.d2_id, dim2.data AS d2_data FROM STREAMING t8 INNER JOIN dim1 ON t8.d1_id = dim1.id LEFT JOIN dim2 ON t8.d2_id = dim2.id WITH NO DATA );
- Results Display
--Insert data on the dimension table =# INSERT INTO dim1 VALUES (1,'mmt1'); =# INSERT INTO dim1 VALUES (2,'mmt2');
=# INSERT INTO dim2 VALUES (1,'hmz1'); =# INSERT INTO dim2 VALUES (2,'hmz2');
=# INSERT INTO t8 VALUES (1,1,1,'mmt',current_timestamp);
=# select * from t8;
id | d1_id | d2_id | c1 | ts
----+-------+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | 1 | 1 | mmt | 2024-05-28 03:59:11.873657
(1 row)
=# select * from s8; id | c1 | ts | d1_id | d1_data | d2_id | d2_data ----+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 1 | mmt | 2024-05-28 03:59:11.873657 | 1 | mmt1 | 1 | hmz1 (1 row)
=# INSERT INTO t8 VALUES (2,2,2,'mmt',current_timestamp);
=# select * from s8; id | c1 | ts | d1_id | d1_data | d2_id | d2_data ----+-----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 2 | mmt | 2024-05-28 04:00:03.687348 | 2 | mmt2 | 2 | hmz2 1 | mmt | 2024-05-28 03:59:11.873657 | 1 | mmt1 | 1 | hmz1 (2 rows)
### Gathering the stream
- SQL Definition Example
CREATE TABLE t1 (id int, id2 int, id3 int, v1 int, v2 bigint) DISTRIBUTED BY (id);
CREATE STREAM s1(id, id2, id3, min1, avg1, avg2) AS ( SELECT id, id2, id3, min(v1), avg(v1), avg(v2) FROM STREAMING t1 GROUP BY id, id2, id3 WITH NO DATA ) DISTRIBUTED BY (id);
- Results Display
INSERT INTO t1 VALUES(1,2,3,10,20); INSERT INTO t1 VALUES(1,2,2,5,10);
INSERT INTO t1 VALUES(1,2,4,5,10); insert INTO t1 VALUES(1,2,4,4,10);
mmt=# SELECT * FROM s1;
id | id2 | id3 | min1 | avg1 | avg2 | partial_avg1 | partial_avg2
----+-----+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | 2 | 2 | 5 | 5.0000000000000000 | 10.0000000000000000 | {1,5} | \x00000000000000010001000000000000000a
1 | 2 | 3 | 10 | 10.0000000000000000 | 20.0000000000000000 | {1,10} | \x000000000000000100010000000000000014
1 | 2 | 4 | 4 | 4.5000000000000000 | 10.0000000000000000 | {2,9} | \x000000000000000200010000000000000014
(3 rows)
### Multi-level flow
- SQL Definition Example
CREATE TABLE t6 (id int, c1 text, ts timestamp); create table dim1(id int,col1 text); create table dim2(id int,col2 text);
CREATE STREAM s61(id, c1, ts,col1, arrive_s61) AS ( SELECT t6.*,dim1.col1, clock_timestamp() FROM STREAMING t6 join dim1 on t6.id=dim1.id WITH NO DATA ) DISTRIBUTED BY (c1);
CREATE STREAM s62(id, c1, ts,col1,arrive_s61,col2, arrive_s6_2) AS ( SELECT s61.*,dim2.col2, clock_timestamp() FROM STREAMING s61 join dim2 on s61.id=dim2.id WITH NO DATA ) DISTRIBUTED BY (id);
- Results Display
INSERT INTO dim1 VALUES(1,'mmt1'); INSERT INTO dim1 VALUES(2,'hmz1'); INSERT INTO dim2 VALUES(1,'mmt2'); INSERT INTO dim2 VALUES(2,'hmz2');
INSERT INTO t6 VALUES (1,'mmt',current_timestamp); INSERT INTO t6 VALUES (2,'hmz',current_timestamp);
=# select * from t6;
id | c1 | ts
----+-----+-----------------------------------------------------------------------------------------------------------------------
2 | hmz | 2024-05-28 04:15:07.591406
1 | mmt | 2024-05-28 04:15:06.775371
(2 rows)
=# select * from s61;
id | c1 | ts | col1 | arrive_s61
----+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | mmt | 2024-05-28 04:15:06.775371 | mmt1 | 2024-05-28 04:15:07.611297-04
2 | hmz | 2024-05-28 04:15:07.591406 | hmz1 | 2024-05-28 04:15:08.613346-04
(2 rows)
=# select * from s62;
id | c1 | ts | col1 | arrive_s61 | col2 | arrive_s6_2
----+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2 | hmz | 2024-05-28 04:15:07.591406 | hmz1 | 2024-05-28 04:15:08.613346-04 | hmz2 | 2024-05-28 04:15:08.725532-04
1 | mmt | 2024-05-28 04:15:06.775371 | mmt1 | 2024-05-28 04:15:07.611297-04 | mmt2 | 2024-05-28 04:15:07.724866-04
(2 rows)
### One table and multiple streams
- SQL Definition Example
CREATE TABLE t5 (id int, c1 text, ts timestamp);
CREATE STREAM s51(id, c1, ts, arrive_s51) AS ( SELECT *, clock_timestamp() FROM STREAMING t5 WHERE id > 7 WITH NO DATA ) DISTRIBUTED BY (id);
CREATE STREAM s52(id, c1, ts, arrive_s52) AS ( SELECT *, clock_timestamp() FROM STREAMING t5 WHERE id < 3 WITH NO DATA ) DISTRIBUTED BY (c1);
- Results Display
INSERT INTO t5 VALUES(1,'mmt',current_timestamp); INSERT INTO t5 VALUES(7,'hmz',current_timestamp); INSERT INTO t5 VALUES(8,'hmz2',current_timestamp);
=# SELECT * FROM s51; id | c1 | ts | arrive_s51 ----+----+------------------------------------------------------------------------------------------------------------------------ (0 rows)
=# SELECT * FROM s52;
id | c1 | ts | arrive_s52
----+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | mmt | 2024-05-28 04:17:38.090422 | 2024-05-28 04:17:38.278591-04
(1 row)
=# INSERT INTO t5 VALUES(8,'hmz2',current_timestamp);
=# SELECT * FROM s51;
id | c1 | ts | arrive_s51
----+------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
8 | hmz2 | 2024-05-28 04:18:08.299554 | 2024-05-28 04:18:09.241703-04
(1 row)
### Upstream table references multiple times
- SQL Definition Example
CREATE TABLE t3 (id int, c1 text, ts timestamp);
CREATE STREAM s3(id, c1, ts, arrive_s3) AS ( SELECT , clock_timestamp() FROM STREAMING t3 WHERE id > 7 UNION ALL SELECT , clock_timestamp() FROM STREAMING t3 WHERE id < 3 WITH NO DATA ) DISTRIBUTED BY (arrive_s3);
- Results Display
INSERT INTO t3 VALUES (1,'mmt',current_timestamp); INSERT INTO t3 VALUES (8,'hmz',current_timestamp);
=# SELECT * FROM s3;
id | c1 | ts | arrive_s3
----+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | mmt | 2024-05-28 04:20:28.31711 | 2024-05-28 04:20:28.542517-04
8 | hmz | 2024-05-28 04:20:29.399698 | 2024-05-28 04:20:29.540877-04
(2 rows)
### Flow table multi-engine support
- SQL Definition Example
CREATE TABLE t4 (id int, c1 text, ts timestamp);
CREATE STREAM s4(id, c1, ts, arrive_s4) AS ( SELECT *, clock_timestamp() FROM STREAMING t4 WITH NO DATA ) USING mars3 WITH (compresstype='zstd', compresslevel=1) DISTRIBUTED BY (id, arrive_s4) ORDER BY id PARTITION BY RANGE (ts) ( START (date '2024-03-01') INCLUSIVE END (date '2025-01-01') EXCLUSIVE EVERY (INTERVAL '1 month') );
- Results Display
id | integer | | | | plain | | c1 | text | | | | extended | | ts | timestamp without time zone | | | | plain | | arrive_s4 | timestamp with time zone | | | | plain | | Partition key: RANGE (ts) View definition:
Partitions: s4_1_prt_1 FOR VALUES FROM ('2024-03-01 00:00:00') TO ('2024-04-01 00:00:00'), s4_1_prt_10 FOR VALUES FROM ('2024-12-01 00:00:00') TO ('2025-01-01 00:00:00'), s4_1_prt_2 FOR VALUES FROM ('2024-04-01 00:00:00') TO ('2024-05-01 00:00:00'), s4_1_prt_3 FOR VALUES FROM ('2024-05-01 00:00:00') TO ('2024-06-01 00:00:00'), s4_1_prt_4 FOR VALUES FROM ('2024-06-01 00:00:00') TO ('2024-07-01 00:00:00'), s4_1_prt_5 FOR VALUES FROM ('2024-07-01 00:00:00') TO ('2024-08-01 00:00:00'), s4_1_prt_6 FOR VALUES FROM ('2024-08-01 00:00:00') TO ('2024-09-01 00:00:00'), s4_1_prt_7 FOR VALUES FROM ('2024-09-01 00:00:00') TO ('2024-10-01 00:00:00'), s4_1_prt_8 FOR VALUES FROM ('2024-10-01 00:00:00') TO ('2024-11-01 00:00:00'), s4_1_prt_9 FOR VALUES FROM ('2024-11-01 00:00:00') TO ('2024-12-01 00:00:00') Distributed by: (id, arrive_s4) Access method: mars3 Order by: (id) Options: compresstype=zstd, compresslevel=1
=# INSERT INTO t4 VALUES (1,'mmt', current_timestamp);
=# INSERT INTO t4 VALUES (1,'mmt', to_timestamp('2024-10-02 14:00:00','yyyy-mm-dd hh24:mi:ss'));
=# SELECT * FROM s4;
id | c1 | ts | arrive_s4
----+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | mmt | 2024-05-28 04:23:49.271721 | 2024-05-28 04:23:50.32931-04
1 | mmt | 2024-10-02 14:00:00 | 2024-05-28 04:24:05.668132-04
(2 rows)
=# SELECT * FROM s4_1_prt_3 ;
id | c1 | ts | arrive_s4
----+-----+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | mmt | 2024-05-28 04:23:49.271721 | 2024-05-28 04:23:50.32931-04
(1 row)
=# SELECT * FROM s4_1_prt_8 ;
id | c1 | ts | arrive_s4
----+-----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
1 | mmt | 2024-10-02 14:00:00 | 2024-05-28 04:24:05.668132-04
(1 row)
### Shuangliu JOIN
- SQL Definition Example
-- When joining the two upstream tables, it is required that there are indexes on the associated fields to avoid the cost of each incremental calculation when the amount of data becomes larger and larger. -- If the two upstream tables lack indexes on the associated fields, an error will be reported when creating the stream by default. You can skip this check by opening GUC mxstream.domino_join_skip_index_check
CREATE TABLE t1 (id int, v1 int) DISTRIBUTED BY (id); CREATE INDEX ON t1 (id); CREATE TABLE t2 (id int, v2 int) DISTRIBUTED BY (id); CREATE INDEX ON t2 (id);
CREATE STREAM s1(id, v1, v2) AS ( SELECT t1.id, t1.v1, t2.v2 FROM STREAMING t1 INNER JOIN STREAMING t2 ON t1.id = t2.id ) DISTRIBUTED BY (id);
- Results Display
It can be seen that s1 is defined as: streaming JOIN on the t1 table and t2 table, that is, the data update result will be updated on which side, for example:
-- Insert two mismatched records first =# INSERT INTO t1 VALUES (1, 100);
=# INSERT INTO t2 VALUES (2, 99);
-- No matching record =# SELECT * FROM s1; id | v1 | v2 ----+----+---- (0 rows)
-- Insert 4 records to match the previous data (2) and match the newly inserted data (3) =# INSERT INTO t1 VALUES (2, NULL), (3, 100);
=# INSERT INTO t2 VALUES (3, NULL), (3, NULL);
-- (2) (3) Match separately =# SELECT * FROM s1; id | v1 | v2 ----+-----+---- 2 | | 99 3 | 100 | 3 | 100 | (3 rows)
### Third-rate JOIN
- SQL Definition Example
-- Create 3 input tables CREATE TABLE t1 (id int, v1 int) DISTRIBUTED BY (id); CREATE TABLE t2 (id int, v2 int) DISTRIBUTED BY (id); CREATE TABLE t3 (id int, v3 int) DISTRIBUTED BY (id); -- Create the first stream s1=t1⨝t1 CREATE STREAM s1 (id, v1, v2) AS ( SELECT t1.id, t1.v1, t2.v2 FROM STREAMING t1 INNER JOIN STREAMING t2 ON t1.id = t2.id ) DISTRIBUTED BY (id); -- Create the second stream s2=s1⨝t3=t1⨝t2⨝t3 CREATE STREAM s2 (id, v1, v2, v3) AS ( SELECT s1.id, s1.v1, s1.v2, t3.v3 FROM STREAMING s1 INNER JOIN STREAMING t3 ON s1.id = t3.id ) DISTRIBUTED BY (id);
- Results Display
-- Write the first batch of data and construct s1 with new data, and s2 with no new data INSERT INTO t1 VALUES (1, 100), (2, 100); INSERT INTO t2 VALUES (1, 100);
(1 row)
-- s1 has 1 data SELECT * FROM s1; id | v1 | v2 ----+-----+----- 1 | 100 | 100 (1 row)
-- s2 has no data because t3 is empty SELECT * FROM s2; id | v1 | v2 | v3 ----+----+----+---- (0 rows)
-- Write the second batch of data and construct s1 and s2 with new data respectively INSERT INTO t1 VALUES (3, NULL); INSERT 0 1 INSERT INTO t2 VALUES (2, NULL), (3, 99); INSERT 0 2 INSERT INTO t3 VALUES (1, NULL), (2, 98), (3, 100); INSERT 0 3
(1 row)
-- s1 added 2 new data SELECT * FROM s1; id | v1 | v2 ----+-----+----- 1 | 100 | 100 -- Last calculation result 2 | 100 | -- t1 ⨝ Δt2 3 | | 99 -- Δt1 ⨝ Δt2 (3 rows)
-- s2 added 3 new data SELECT * FROM s2; id | v1 | v2 | v3 ----+-----+--------------- 1 | 100 | 100 | -- s1 ⨝ Δt3 2 | 100 | | 98 -- Δs1 ⨝ Δt3 3 | | 99 | 100 -- Δs1 ⨝ Δt3 (3 rows)