Stream Computing Feature Examples

Nested Query Stream

The WITH [NO] DATA clause must be placed at the outermost level of the subquery.

  • SQL Definition Example
CREATE TABLE t8 (id int, d1_id int, d2_id int, c1 text, ts timestamp);

CREATE TABLE dim1 (id int, data text);
CREATE TABLE dim2 (id int, data text);

CREATE STREAM s8
AS (
  select a.id,a.c1 from (
    SELECT t8.id, t8.c1, ts
    , t8.d1_id, dim1.data AS d1_data
    , t8.d2_id, dim2.data AS d2_data
    FROM STREAMING t8
    INNER JOIN dim1 ON t8.d1_id = dim1.id 
    LEFT JOIN dim2 ON t8.d2_id = dim2.id
  )  a where a.id >1
 WITH  DATA
)
distributed by(id);
  • Result Output
=# INSERT INTO dim1 VALUES (1,'mmt1');
=# INSERT INTO dim1 VALUES (2,'mmt2');
=# INSERT INTO dim2 VALUES (1,'hmz1');
=# INSERT INTO dim2 VALUES (2,'hmz2');

=# INSERT INTO t8 VALUES (1,1,1,'mmt',current_timestamp);

=# select * from s8;
 id | c1 
----+----
(0 rows)

=# INSERT INTO t8 VALUES (2,2,2,'mmt',current_timestamp);

=# select * from s8;
 id | c1  
----+-----
  2 | mmt
(1 row)

Enrichment Stream

  • SQL Definition Example
CREATE TABLE t8 (id int, d1_id int, d2_id int, c1 text, ts timestamp);

CREATE TABLE dim1 (id int, data text);
CREATE TABLE dim2 (id int, data text);

CREATE STREAM s8
AS (
SELECT t8.id, t8.c1, ts
, t8.d1_id, dim1.data AS d1_data
, t8.d2_id, dim2.data AS d2_data
 FROM STREAMING t8
 INNER JOIN dim1 ON t8.d1_id = dim1.id 
 LEFT JOIN dim2 ON t8.d2_id = dim2.id
 WITH NO DATA
);
  • Result Output
-- Insert data into dimension tables
=# INSERT INTO dim1 VALUES (1,'mmt1');
=# INSERT INTO dim1 VALUES (2,'mmt2');

=# INSERT INTO dim2 VALUES (1,'hmz1');
=# INSERT INTO dim2 VALUES (2,'hmz2');

=# INSERT INTO t8 VALUES (1,1,1,'mmt',current_timestamp);
=# select * from t8;
 id | d1_id | d2_id | c1  |             ts             
----+-------+-------+-----+----------------------------
  1 |     1 |     1 | mmt | 2024-05-28 03:59:11.873657
(1 row)

=# select * from s8;
 id | c1  |             ts             | d1_id | d1_data | d2_id | d2_data 
----+-----+----------------------------+-------+---------+-------+---------
  1 | mmt | 2024-05-28 03:59:11.873657 |     1 | mmt1    |     1 | hmz1
(1 row)

=# INSERT INTO t8 VALUES (2,2,2,'mmt',current_timestamp);

=# select * from s8;
 id | c1  |             ts             | d1_id | d1_data | d2_id | d2_data 
----+-----+----------------------------+-------+---------+-------+---------
  2 | mmt | 2024-05-28 04:00:03.687348 |     2 | mmt2    |     2 | hmz2
  1 | mmt | 2024-05-28 03:59:11.873657 |     1 | mmt1    |     1 | hmz1
(2 rows)

Aggregation Stream

  • SQL Definition Example
CREATE TABLE t1 (id int, id2 int, id3 int, v1 int, v2 bigint)
DISTRIBUTED BY (id);

CREATE STREAM s1(id, id2, id3, min1, avg1, avg2)
AS (
    SELECT id, id2, id3, min(v1), avg(v1), avg(v2)
    FROM STREAMING t1
    GROUP BY id, id2, id3
    WITH NO DATA
)
DISTRIBUTED BY (id); 
  • Result Output
INSERT INTO t1 VALUES(1,2,3,10,20);
INSERT INTO t1 VALUES(1,2,2,5,10);

INSERT INTO t1 VALUES(1,2,4,5,10);
insert INTO t1 VALUES(1,2,4,4,10);

mmt=# SELECT * FROM s1;
 id | id2 | id3 | min1 |        avg1         |        avg2         | partial_avg1 |              partial_avg2              
----+-----+-----+------+---------------------+---------------------+--------------+----------------------------------------
  1 |   2 |   2 |    5 |  5.0000000000000000 | 10.0000000000000000 | {1,5}        | \x00000000000000010001000000000000000a
  1 |   2 |   3 |   10 | 10.0000000000000000 | 20.0000000000000000 | {1,10}       | \x000000000000000100010000000000000014
  1 |   2 |   4 |    4 |  4.5000000000000000 | 10.0000000000000000 | {2,9}        | \x000000000000000200010000000000000014
(3 rows)

Multi-level Stream

  • SQL Definition Example
CREATE TABLE t6 (id int, c1 text, ts timestamp);
create table dim1(id int,col1 text);
create table dim2(id int,col2 text);

CREATE STREAM s61(id, c1, ts,col1, arrive_s61)
AS (
    SELECT t6.*,dim1.col1, clock_timestamp()
    FROM STREAMING t6 join dim1 on t6.id=dim1.id
    WITH NO DATA
)
DISTRIBUTED BY (c1);

CREATE STREAM s62(id, c1, ts,col1,arrive_s61,col2, arrive_s6_2)
AS (
    SELECT s61.*,dim2.col2, clock_timestamp()
    FROM STREAMING s61 join dim2 on s61.id=dim2.id
    WITH NO DATA
)
DISTRIBUTED BY (id);                         
  • Result Output
INSERT INTO dim1 VALUES(1,'mmt1');
INSERT INTO dim1 VALUES(2,'hmz1');
INSERT INTO dim2 VALUES(1,'mmt2');
INSERT INTO dim2 VALUES(2,'hmz2');

INSERT INTO t6 VALUES (1,'mmt',current_timestamp);
INSERT INTO t6 VALUES (2,'hmz',current_timestamp);

=# select * from t6;
 id | c1  |             ts             
----+-----+----------------------------
  2 | hmz | 2024-05-28 04:15:07.591406
  1 | mmt | 2024-05-28 04:15:06.775371
(2 rows)

=# select * from s61;
 id | c1  |             ts             | col1 |          arrive_s61           
----+-----+----------------------------+------+-------------------------------
  1 | mmt | 2024-05-28 04:15:06.775371 | mmt1 | 2024-05-28 04:15:07.611297-04
  2 | hmz | 2024-05-28 04:15:07.591406 | hmz1 | 2024-05-28 04:15:08.613346-04
(2 rows)

=# select * from s62;
 id | c1  |             ts             | col1 |          arrive_s61           | col2 |          arrive_s6_2          
----+-----+----------------------------+------+-------------------------------+------+-------------------------------
  2 | hmz | 2024-05-28 04:15:07.591406 | hmz1 | 2024-05-28 04:15:08.613346-04 | hmz2 | 2024-05-28 04:15:08.725532-04
  1 | mmt | 2024-05-28 04:15:06.775371 | mmt1 | 2024-05-28 04:15:07.611297-04 | mmt2 | 2024-05-28 04:15:07.724866-04
(2 rows)

One Table Multiple Streams

  • SQL Definition Example
CREATE TABLE t5 (id int, c1 text, ts timestamp);

CREATE STREAM s51(id, c1, ts, arrive_s51)
AS (
 SELECT *, clock_timestamp()
 FROM STREAMING t5 WHERE id > 7
 WITH NO DATA
)
DISTRIBUTED BY (id);

CREATE STREAM s52(id, c1, ts, arrive_s52)
AS (
 SELECT *, clock_timestamp()
 FROM STREAMING t5 WHERE id < 3
 WITH NO DATA
)
DISTRIBUTED BY (c1);
  • Result Output
INSERT INTO t5 VALUES(1,'mmt',current_timestamp);
INSERT INTO t5 VALUES(7,'hmz',current_timestamp);
INSERT INTO t5 VALUES(8,'hmz2',current_timestamp);


=# SELECT * FROM s51;
 id | c1 | ts | arrive_s51 
----+----+----+------------
(0 rows)

=# SELECT * FROM s52;
 id | c1  |             ts             |          arrive_s52           
----+-----+----------------------------+-------------------------------
  1 | mmt | 2024-05-28 04:17:38.090422 | 2024-05-28 04:17:38.278591-04
(1 row)

=# INSERT INTO t5 VALUES(8,'hmz2',current_timestamp);

=# SELECT * FROM s51;
 id |  c1  |             ts             |          arrive_s51           
----+------+----------------------------+-------------------------------
  8 | hmz2 | 2024-05-28 04:18:08.299554 | 2024-05-28 04:18:09.241703-04
(1 row)

Multiple References to Upstream Table

  • SQL Definition Example
CREATE TABLE t3 (id int, c1 text, ts timestamp);

 CREATE STREAM s3(id, c1, ts, arrive_s3)
 AS (
   SELECT *, clock_timestamp()
   FROM STREAMING t3 WHERE id > 7
 UNION ALL
   SELECT *, clock_timestamp()
   FROM STREAMING t3 WHERE id < 3 
   WITH NO DATA
)
DISTRIBUTED BY (arrive_s3);
  • Result Output
INSERT INTO t3 VALUES (1,'mmt',current_timestamp); 
INSERT INTO t3 VALUES (8,'hmz',current_timestamp);     

=# SELECT * FROM s3;
 id | c1  |             ts             |           arrive_s3           
----+-----+----------------------------+-------------------------------
  1 | mmt | 2024-05-28 04:20:28.31711  | 2024-05-28 04:20:28.542517-04
  8 | hmz | 2024-05-28 04:20:29.399698 | 2024-05-28 04:20:29.540877-04
(2 rows)

Multi-engine Support for Stream Tables

  • SQL Definition Example
CREATE TABLE t4 (id int, c1 text, ts timestamp);

CREATE STREAM s4(id, c1, ts, arrive_s4)
AS (
   SELECT *, clock_timestamp()
   FROM STREAMING t4
   WITH NO DATA
)
USING mars3
    WITH (compresstype='zstd', compresslevel=1)
    DISTRIBUTED BY (id, arrive_s4)
    ORDER BY id
    PARTITION BY RANGE (ts)
(
   START (date '2024-03-01') INCLUSIVE
   END (date '2025-01-01') EXCLUSIVE
   EVERY (INTERVAL '1 month')
);
  • Result Output
=# \dS+ s4
                                          Partitioned stream "public.s4"
  Column   |            Type             | Collation | Nullable | Default | Storage  | Stats target | Description 
-----------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
 id        | integer                     |           |          |         | plain    |              | 
 c1        | text                        |           |          |         | extended |              | 
 ts        | timestamp without time zone |           |          |         | plain    |              | 
 arrive_s4 | timestamp with time zone    |           |          |         | plain    |              | 
Partition key: RANGE (ts)
View definition:

Partitions: s4_1_prt_1 FOR VALUES FROM ('2024-03-01 00:00:00') TO ('2024-04-01 00:00:00'),
            s4_1_prt_10 FOR VALUES FROM ('2024-12-01 00:00:00') TO ('2025-01-01 00:00:00'),
            s4_1_prt_2 FOR VALUES FROM ('2024-04-01 00:00:00') TO ('2024-05-01 00:00:00'),
            s4_1_prt_3 FOR VALUES FROM ('2024-05-01 00:00:00') TO ('2024-06-01 00:00:00'),
            s4_1_prt_4 FOR VALUES FROM ('2024-06-01 00:00:00') TO ('2024-07-01 00:00:00'),
            s4_1_prt_5 FOR VALUES FROM ('2024-07-01 00:00:00') TO ('2024-08-01 00:00:00'),
            s4_1_prt_6 FOR VALUES FROM ('2024-08-01 00:00:00') TO ('2024-09-01 00:00:00'),
            s4_1_prt_7 FOR VALUES FROM ('2024-09-01 00:00:00') TO ('2024-10-01 00:00:00'),
            s4_1_prt_8 FOR VALUES FROM ('2024-10-01 00:00:00') TO ('2024-11-01 00:00:00'),
            s4_1_prt_9 FOR VALUES FROM ('2024-11-01 00:00:00') TO ('2024-12-01 00:00:00')
Distributed by: (id, arrive_s4)
Access method: mars3
Order by: (id)
Options: compresstype=zstd, compresslevel=1



=# INSERT INTO t4 VALUES (1,'mmt', current_timestamp);

=# INSERT INTO t4 VALUES (1,'mmt', to_timestamp('2024-10-02 14:00:00','yyyy-mm-dd hh24:mi:ss'));

=# SELECT * FROM s4;
 id | c1  |             ts             |           arrive_s4           
----+-----+----------------------------+-------------------------------
  1 | mmt | 2024-05-28 04:23:49.271721 | 2024-05-28 04:23:50.32931-04
  1 | mmt | 2024-10-02 14:00:00        | 2024-05-28 04:24:05.668132-04
(2 rows)

=# SELECT * FROM s4_1_prt_3 ;
 id | c1  |             ts             |          arrive_s4           
----+-----+----------------------------+------------------------------
  1 | mmt | 2024-05-28 04:23:49.271721 | 2024-05-28 04:23:50.32931-04
(1 row)

=# SELECT * FROM s4_1_prt_8 ;
 id | c1  |         ts          |           arrive_s4           
----+-----+---------------------+-------------------------------
  1 | mmt | 2024-10-02 14:00:00 | 2024-05-28 04:24:05.668132-04
(1 row)

Dual-stream JOIN

  • SQL Definition Example
-- When performing a dual-stream JOIN, indexes must exist on the join columns of both upstream tables.
-- This avoids increasing incremental computation cost as data volume grows.
-- If indexes are missing, stream creation fails by default.
-- You can bypass this check by enabling the GUC mxstream.domino_join_skip_index_check.

CREATE TABLE t1 (id int, v1 int) DISTRIBUTED BY (id);
CREATE INDEX ON t1 (id);
CREATE TABLE t2 (id int, v2 int) DISTRIBUTED BY (id);
CREATE INDEX ON t2 (id);

CREATE STREAM s1(id, v1, v2) AS (
    SELECT t1.id, t1.v1, t2.v2
    FROM STREAMING t1
    INNER JOIN STREAMING t2 ON t1.id = t2.id
) DISTRIBUTED BY (id);
  • Result Output

The stream s1 is defined as a streaming JOIN between tables t1 and t2. The result updates whenever either side receives new data.

-- Insert two unmatched records first
=# INSERT INTO t1 VALUES (1, 100);

=# INSERT INTO t2 VALUES (2, 99);

-- No matching records
=# SELECT * FROM s1;
 id | v1 | v2
----+----+----
(0 rows)

-- Insert four more records, matching previous data (2) and new data (3)
=# INSERT INTO t1 VALUES (2, NULL), (3, 100);

=# INSERT INTO t2 VALUES (3, NULL), (3, NULL);

-- Matches for (2) and (3) appear
=# SELECT * FROM s1;
 id | v1  | v2
----+-----+----
  2 |     | 99
  3 | 100 |
  3 | 100 |
(3 rows)

Three-way Stream JOIN

  • SQL Definition Example
-- Create three input tables
CREATE TABLE t1 (id int, v1 int) DISTRIBUTED BY (id);
CREATE TABLE t2 (id int, v2 int) DISTRIBUTED BY (id);
CREATE TABLE t3 (id int, v3 int) DISTRIBUTED BY (id);

-- Create first stream: s1 = t1 ⨝ t2
CREATE STREAM s1 (id, v1, v2) AS (
        SELECT t1.id, t1.v1, t2.v2 FROM STREAMING t1
        INNER JOIN STREAMING t2 ON t1.id = t2.id
) DISTRIBUTED BY (id);

-- Create second stream: s2 = s1 ⨝ t3 = t1 ⨝ t2 ⨝ t3
CREATE STREAM s2 (id, v1, v2, v3) AS (
        SELECT s1.id, s1.v1, s1.v2, t3.v3 FROM STREAMING s1
        INNER JOIN STREAMING t3 ON s1.id = t3.id
) DISTRIBUTED BY (id);
  • Result Output
-- Insert first batch: generate new data in s1 but not in s2
INSERT INTO t1 VALUES (1, 100), (2, 100);
INSERT INTO t2 VALUES (1, 100);

-- Since streams compute independently, s2 triggers after s1 completes.
-- Wait for two computation cycles.
SELECT pg_sleep(2);
 pg_sleep
----------

(1 row)

-- s1 has one record
SELECT * FROM s1;
 id | v1  | v2
----+-----+-----
  1 | 100 | 100
(1 row)

-- s2 has no data because t3 is empty
SELECT * FROM s2;
 id | v1 | v2 | v3
----+----+----+----
(0 rows)

-- Insert second batch: generate new data in both s1 and s2
INSERT INTO t1 VALUES (3, NULL);
INSERT 0 1
INSERT INTO t2 VALUES (2, NULL), (3, 99);
INSERT 0 2
INSERT INTO t3 VALUES (1, NULL), (2, 98), (3, 100);
INSERT 0 3

-- Wait for two computation cycles again
SELECT pg_sleep(2);
 pg_sleep
----------

(1 row)

-- s1 gains two new records
SELECT * FROM s1;
 id | v1  | v2
----+-----+-----
  1 | 100 | 100    -- Previous result
  2 | 100 |        -- t1 ⨝ Δt2
  3 |     |  99    -- Δt1 ⨝ Δt2
(3 rows)

-- s2 gains three new records
SELECT * FROM s2;
 id | v1  | v2  | v3
----+-----+-----+-----
  1 | 100 | 100 |        -- s1 ⨝ Δt3
  2 | 100 |     |  98    -- Δs1 ⨝ Δt3 
  3 |     |  99 | 100    -- Δs1 ⨝ Δt3 
(3 rows)