Sliding Window

This document describes the sliding window feature in YMatrix.

1 Background

Sliding windows are a common feature in streaming data scenarios. In such cases, continuous aggregation can be performed on data from the most recent time period.
Sliding windows are typically used with monitoring and alerting systems. When data within a recent time window meets predefined conditions, the database server sends an alert message to the client.
For example, compute the average temperature per device every minute, and trigger an alert if it exceeds 90 degrees.

2 What Is a Sliding Window?

A sliding window differs from a tumbling window in that its time intervals can overlap.
A sliding window has two parameters: WINDOW_SIZE and SLIDING_SIZE.
WINDOW_SIZE defines the time range of the window, while SLIDING_SIZE specifies the step size for each slide.

3 How Does the Time Window Slide?

The time used for the sliding window is the insert time, i.e., when the data is ingested into the database.
The sliding step size is controlled by setting SLIDING_SIZE.

  • When SLIDING_SIZE < WINDOW_SIZE, the windows overlap. Each data record may be observed by multiple windows.

  • When SLIDING_SIZE = WINDOW_SIZE, the behavior is equivalent to a tumbling window—no overlap, and each record appears in exactly one window.

  • When SLIDING_SIZE > WINDOW_SIZE, it becomes a hopping window—non-overlapping with gaps between windows. Some data may not be included in any window.

4 How Does Alerting Work with Sliding Windows?

Each time the window slides, the database evaluates the data within the current window. If the data meets the specified condition, a message is pushed.
Messages are sent via PostgreSQL's pg_notify mechanism.
Clients receive messages using the listener interface provided by the libpq protocol (e.g., libpq.NewListener in Go, java.sql.Listener in Java).
When creating a sliding window, use the parameter CHANNEL_NAME to specify the name of the message queue.

Multiple clients can listen to the same message queue simultaneously.

Multiple sliding windows can also share the same message queue. Thus, a single client can receive alert messages from multiple sliding windows.

5 Usage

5.1 Create Extension

The sliding window feature depends on the matrixts extension. First, create the extension:

=# CREATE EXTENSION matrixts;

5.2 Create Data Table

=# CREATE TABLE metrics(
   time timestamp,
   tag_id int,
   sensor float8
   )
   USING MARS3
   DISTRIBUTED BY (tag_id)
   ORDER BY (time,tag_id);

5.3 Create Sliding Window

=# CREATE VIEW sv1 WITH (
    CONTINUOUS, 
    WINDOW_SIZE='10 min',
    SLIDING_SIZE='1 min',
    CHANNEL_NAME='my_channel'
) AS
SELECT tag_id, COUNT(*), SUM(sensor)
FROM metrics GROUP BY tag_id
HAVING MAX(sensor) > 10;

5.4 Parameter Description

Parameters fall into two categories: those defined in the WITH clause related to the sliding window and message push, and the SQL query defining how streaming data is aggregated within the window.

WITH Clause

  • CONTINUOUS: Declares the view as a continuous sliding window.
  • WINDOW_SIZE: Specifies the time duration of the sliding window.
  • SLIDING_SIZE: Defines the sliding step interval.
  • CHANNEL_NAME: The name of the message queue to which alerts are published when conditions are met.

SQL Query

  • SELECT: The content to be sent in each message (can use PostgreSQL built-in JSON functions to format the payload).
  • FROM: The source table from which streaming data is read (sliding windows do not support joins or multiple tables).
  • WHERE: Determines which portion of the base table is visible to the window.
  • HAVING: The condition that triggers message delivery. If HAVING is omitted, a message is sent on every slide.

5.5 Examples

Example 1: Maximum Temperature Monitoring

Compute the maximum temperature per device over the past 2 minutes, updated every 30 seconds (i.e., a 2-minute window sliding every 30 seconds).
Message format: (device ID, max temperature).
Messages are sent to channel temp_monitor.
First, create the metrics table metrics_1.

=# CREATE TABLE metrics_1(
   time timestamp,
   tag_id int,
   temp float8
   )
   USING MARS3
   DISTRIBUTED BY (tag_id)
   ORDER BY (time,tag_id);

Then create the sliding window:

=# CREATE VIEW temp_sv WITH (
    CONTINUOUS, 
    WINDOW_SIZE='2 min',
    SLIDING_SIZE='30 seconds',
    CHANNEL_NAME='temp_monitor'
) AS
SELECT tag_id, MAX(temp)
FROM metrics_1 GROUP BY tag_id;

Example 2: Low Voltage Alert

Monitor voltage values per device over the last 1 minute, updated every 10 seconds.
Trigger an alert when the minimum voltage exceeds 10.
Message format: (设备号,平均电压,最低电压,最高电压).
Messages are sent to channel over_volt_alarm.
First, create the data table metrics_2.

=# CREATE TABLE metrics_2(
   time timestamp,
   tag_id int,
   volt float8
   )
   USING MARS3
   DISTRIBUTED BY (tag_id)
   ORDER BY (time,tag_id);

Then create the sliding window:

=# CREATE VIEW volt_alarm WITH (
    CONTINUOUS, 
    WINDOW_SIZE='1 min',
    SLIDING_SIZE='10 seconds',
    CHANNEL_NAME='over_volt_alarm'
) AS
SELECT tag_id, AVG(volt), MIN(volt), MAX(volt)
FROM metrics_2 GROUP BY tag_id
HAVING MIN(volt) > 10;

Client Example 1: Go Language

ref: https://pkg.go.dev/github.com/lib/pq/example/listen

package main

import (
    "database/sql"
    "fmt"
    "time"

    "github.com/lib/pq"
)

func waitForNotification(l *pq.Listener) {
    select {
    case n := <-l.Notify:
        fmt.Println("received notification, new work available")
        fmt.Println(n.Channel)
        fmt.Println(n.Extra)

    case <-time.After(90 * time.Second):
        go l.Ping()
        // Check if there's more work available, just in case it takes
        // a while for the Listener to notice connection loss and
        // reconnect.
        fmt.Println("received no work for 90 seconds, checking for new work")
    }
}

func main() {
    var conninfo string = "user=mxadmin password=mxadmin dbname=postgres sslmode=disable"

    _, _ = sql.Open("postgres", conninfo)

    reportProblem := func(ev pq.ListenerEventType, err error) {
        if err != nil {
            fmt.Println(err.Error())
        }
    }

    minReconn := 10 * time.Second
    maxReconn := time.Minute
    fmt.Println("entering conn")
    listener := pq.NewListener(conninfo, minReconn, maxReconn, reportProblem)
    var err = listener.Listen("my_channel")
    if err != nil {
        panic(err)
    }

    fmt.Println("entering main loop")
    for {
        // process all available work before waiting for notifications
        //getWork(db)
        waitForNotification(listener)
    }
}

Client Example 2: Java Language

ref: https://jdbc.postgresql.org/documentation/81/listennotify.html

package test;

import java.sql.*;

public class NotificationTest {

    public static void main(String args[]) throws Exception {
        Class.forName("org.postgresql.Driver");
        String url = "jdbc:postgresql://172.16.100.32:5432/postgres";

        // Create two distinct connections, one for the notifier
        // and another for the listener to show the communication
        // works across connections although this example would
        // work fine with just one connection.
        Connection lConn = DriverManager.getConnection(url,"mxadmin","mxadmin");
        Connection nConn = DriverManager.getConnection(url,"mxadmin","mxadmin");

        // Create two threads, one to issue notifications and
        // the other to receive them.
        Listener listener = new Listener(lConn);
//        Notifier notifier = new Notifier(nConn);
        listener.start();
//        notifier.start();
    }

}

class Listener extends Thread {

    private Connection conn;
    private org.postgresql.PGConnection pgconn;

    Listener(Connection conn) throws SQLException {
        this.conn = conn;
        this.pgconn = (org.postgresql.PGConnection)conn;
        Statement stmt = conn.createStatement();
        stmt.execute("LISTEN my_channel");
        stmt.close();
    }

    public void run() {
        while (true) {
            try {
                // issue a dummy query to contact the backend
                // and receive any pending notifications.
                Statement stmt = conn.createStatement();
                ResultSet rs = stmt.executeQuery("SELECT 1");
                rs.close();
                stmt.close();

                org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
                System.out.println(notifications.length);
                if (notifications != null) {
                    for (int i=0; i<notifications.length; i++) {
                        System.out.println("Got notification: " + notifications[i].getName());
                        System.out.println("Got notification: " + notifications[i].getName() + " with payload: " + notifications[i].getParameter());
                    }
                }

                // wait a while before checking again for new
                // notifications
                Thread.sleep(500);
            } catch (SQLException sqle) {
                sqle.printStackTrace();
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
}

6 FAQ

  1. Is data in sliding windows persisted?
    No. Expired data is cleaned up periodically.

  2. Does the sliding window guarantee ACID?
    Yes. Data in sliding windows shares the same transaction ID as the base table.

  3. Are window functions supported in sliding windows?
    Not currently supported.

  4. Does the sliding window support partitioned tables?
    Yes.

  5. Can a sliding window monitor multiple tables?
    No.

  6. Can multiple sliding windows be defined on a single table?
    Yes.

  7. Can multiple clients listen to alerts from the same sliding window?
    Yes.