Sliding window

1. Background introduction

Sliding windows are a common feature of stream computing scenarios. In the stream computing data scenario, aggregation operations can be performed continuously on the data in the recent period. Sliding windows are usually used with monitoring alarms. When the data meets preset conditions in the last period of time, the data server will send an alarm message to the client. For example, if the average temperature is calculated every minute for each device, it will be alerted if it exceeds 90 degrees.

2. What is a sliding window?

Sliding windows (Sliding Window), unlike scrolling windows that do not overlap, the time windows of sliding windows can overlap. The sliding window has two parameters: WINDOW_SIZE and SLIDING_SIZE. WINDOW_SIZE specifies the display range of the window, and SLIDING_SIZE is the step size of each slide.

3. How does the time window slide?

When sliding the window, use Insert Time, which is the time when data is entered into the database. Determine the sliding step of the window by setting SLIDING_SIZE.

  • When SLIDING_SIZE < WINDOW_SIZE, the displayed windows will overlap and each piece of data will be observed by multiple time windows.

  • When SLIDING_SIZE = WINDOW_SIZE, it is equivalent to the roller window, without overlap, and each piece of data will only be displayed once.

  • When SLIDING_SIZE > WINDOW_SIZE is a jump window, with no overlap and gaps between the windows, and some data may not be displayed.

4. How to monitor and alert in the sliding window?

Each time the window slides, the database monitors the data in the current window. If the data in the window meets the preset conditions, a message is pushed. Push is sent through pg_notify provided by PostgreSQL. The client uses the listener interface provided by the libpq protocol. (For example, go language libpq.NewListener; Java language java.sql.Listener) When creating a sliding window, declare the name of the message queue with the parameter CHANNEL_NAME.

Sliding window

The same message queue can be listened to by multiple clients at the same time. Sliding window

Multiple sliding windows can also reuse the same message queue, that is, the same client can listen to alert messages from multiple sliding windows at the same time.

Sliding window

5. How to use

5.1 Create an extension

The sliding window depends on the matrixts extension, first create the extension:

CREATE EXTENSION matrixts;

5.2 Create a data table

CREATE TABLE metrics(
    time timestamp,
    tag_id int,
    sensor float8
)
DISTRIBUTED BY (tag_id);

5.3 Create a sliding window

CREATE VIEW sv1 WITH (
    CONTINUOUS, 
    WINDOW_SIZE='10 min',
    SLIDING_SIZE='1 min',
    CHANNEL_NAME='my_channel'
) AS
SELECT tag_id, COUNT(*), SUM(sensor)
FROM metrics GROUP BY tag_id
HAVING MAX(sensor) > 10;

5.4 Parameter description:

The parameters are divided into two parts, and the parameters of OPTION are related to the sliding window and message push. SQL specifies how stream data is displayed in a sliding window.

OPTION:

  • CONTINUOUS declares that the view is a gathered sliding window.
  • WINDOW_SIZE Sliding window monitors the time window size of the data.
  • SLIDING_SIZE The amount of time the sliding window will slide.
  • CHANNEL_NAME The message queue name pushed by the database when the monitoring condition is met.

SQL:

  • SELECT The data content pushed each time. (You can use postgres built-in json function to assemble the message format.)
  • FROM window data comes from. (Sliding window does not support multi-table data)
  • WHERE Which part of the data can be monitored by the window in the original data table.
  • HAVING monitors the trigger conditions for push. If the HAVING condition is omitted, a message is pushed every time the window slides.

5.5 Example

Example 1: Maximum temperature monitoring

The maximum temperature of each device in the past 2 minutes is counted and updated every 30 seconds. (i.e., a 2-minute time window, swipe once every 30 seconds) The message format is (device number, maximum temperature). Messages are pushed to message queue temp_monitor.
First, create the device data table metrics_1.

CREATE TABLE metrics_1(
    time timestamp,
    tag_id int,
    temp float8
)
DISTRIBUTED BY (tag_id);

Then create a sliding window.

CREATE VIEW temp_sv WITH (
    CONTINUOUS, 
    WINDOW_SIZE='2 min',
    SLIDING_SIZE='30 seconds',
    CHANNEL_NAME='temp_monitor'
) AS
SELECT tag_id, MAX(temp)
FROM metrics_1 GROUP BY tag_id;

Example 2: Minimum Voltage Alarm

The voltage value of each device in the past 1 minute is counted and updated every 10 seconds. An alarm is triggered when the minimum voltage exceeds 10. The message format is (device number, average voltage, minimum voltage, maximum voltage). Messages are pushed to message queue over_volt_alarm.
First, create the device data table metrics_2.

CREATE TABLE metrics_2(
    time timestamp,
    tag_id int,
    volt float8
)
DISTRIBUTED BY (tag_id);

Then create a sliding window.

CREATE VIEW volt_alarm WITH (
    CONTINUOUS, 
    WINDOW_SIZE='1 min',
    SLIDING_SIZE='10 seconds',
    CHANNEL_NAME='over_volt_alarm'
) AS
SELECT tag_id, AVG(volt), MIN(volt), MAX(volt)
FROM metrics_2 GROUP BY tag_id
HAVING MIN(volt) > 10;

Client Example 1: Go Language

ref: https://pkg.go.dev/github.com/lib/pq/example/listen

package main

import (
    "database/sql"
    "fmt"
    "time"

    "github.com/lib/pq"
)

func waitForNotification(l *pq.Listener) {
    select {
    case n := <-l.Notify:
        fmt.Println("received notification, new work available")
        fmt.Println(n.Channel)
        fmt.Println(n.Extra)

    case <-time.After(90 * time.Second):
        go l.Ping()
        // Check if there's more work available, just in case it takes
        // a while for the Listener to notice connection loss and
        // reconnect.
        fmt.Println("received no work for 90 seconds, checking for new work")
    }
}

func main() {
    var conninfo string = "user=mxadmin password=mxadmin dbname=postgres sslmode=disable"

    _, _ = sql.Open("postgres", conninfo)

    reportProblem := func(ev pq.ListenerEventType, err error) {
        if err != nil {
            fmt.Println(err.Error())
        }
    }

    minReconn := 10 * time.Second
    maxReconn := time.Minute
    fmt.Println("entering conn")
    listener := pq.NewListener(conninfo, minReconn, maxReconn, reportProblem)
    var err = listener.Listen("my_channel")
    if err != nil {
        panic(err)
    }

    fmt.Println("entering main loop")
    for {
        // process all available work before waiting for notifications
        //getWork(db)
        waitForNotification(listener)
    }
}

Client Example 2: Java Language

ref: https://jdbc.postgresql.org/documentation/81/listennotify.html

package test;

import java.sql.*;

public class NotificationTest {

    public static void main(String args[]) throws Exception {
        Class.forName("org.postgresql.Driver");
        String url = "jdbc:postgresql://172.16.100.32:5432/postgres";

        // Create two distinct connections, one for the notifier
        // and another for the listener to show the communication
        // works across connections although this example would
        // work fine with just one connection.
        Connection lConn = DriverManager.getConnection(url,"mxadmin","mxadmin");
        Connection nConn = DriverManager.getConnection(url,"mxadmin","mxadmin");

        // Create two threads, one to issue notifications and
        // the other to receive them.
        Listener listener = new Listener(lConn);
//        Notifier notifier = new Notifier(nConn);
        listener.start();
//        notifier.start();
    }

}

class Listener extends Thread {

    private Connection conn;
    private org.postgresql.PGConnection pgconn;

    Listener(Connection conn) throws SQLException {
        this.conn = conn;
        this.pgconn = (org.postgresql.PGConnection)conn;
        Statement stmt = conn.createStatement();
        stmt.execute("LISTEN my_channel");
        stmt.close();
    }

    public void run() {
        while (true) {
            try {
                // Issue a dummy query to contact the backend
                // and receive any pending notifications.
                Statement stmt = conn.createStatement();
                ResultSet rs = stmt.executeQuery("SELECT 1");
                rs.close();
                stmt.close();

                org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
                System.out.println(notifications.length);
                if (notifications != null) {
                    for (int i=0; i<notifications.length; i++) {
                        System.out.println("Got notification: " + notifications[i].getName());
                        System.out.println("Got notification: " + notifications[i].getName() + " with payload: " + notifications[i].getParameter());
                    }
                }

                // wait a while before checking again for new
                // Notifications
                Thread.sleep(500);
            } catch (SQLException sqle) {
                sqle.printStackTrace();
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
}

6. FAQ

  1. Will the data in the sliding window be persisted?
Won't. Expired data will be cleaned up regularly.
  1. Can the sliding window guarantee ACID?
Yes, the data in the sliding window uses the same transaction ID as the original table data.
  1. Is sliding window valid for window functions?
Window functions are not supported yet.
  1. Does the sliding window support partition table?
support.
  1. Does sliding window support multi-table monitoring?
Not supported.
  1. Can a table define multiple sliding windows?
Can.
  1. Can messages from the same sliding window be listened to by multiple clients?
Can.