Sliding windows are a common feature of stream computing scenarios. In the stream computing data scenario, aggregation operations can be performed continuously on the data in the recent period. Sliding windows are usually used with monitoring alarms. When the data meets preset conditions in the last period of time, the data server will send an alarm message to the client. For example, if the average temperature is calculated every minute for each device, it will be alerted if it exceeds 90 degrees.
Sliding windows (Sliding Window), unlike scrolling windows that do not overlap, the time windows of sliding windows can overlap. The sliding window has two parameters: WINDOW_SIZE and SLIDING_SIZE. WINDOW_SIZE specifies the display range of the window, and SLIDING_SIZE is the step size of each slide.
When sliding the window, use Insert Time, which is the time when data is entered into the database. Determine the sliding step of the window by setting SLIDING_SIZE.
When SLIDING_SIZE
< WINDOW_SIZE
, the displayed windows will overlap and each piece of data will be observed by multiple time windows.
When SLIDING_SIZE
= WINDOW_SIZE
, it is equivalent to the roller window, without overlap, and each piece of data will only be displayed once.
When SLIDING_SIZE
> WINDOW_SIZE
is a jump window, with no overlap and gaps between the windows, and some data may not be displayed.
Each time the window slides, the database monitors the data in the current window. If the data in the window meets the preset conditions, a message is pushed.
Push is sent through pg_notify
provided by PostgreSQL.
The client uses the listener
interface provided by the libpq protocol. (For example, go language libpq.NewListener; Java language java.sql.Listener)
When creating a sliding window, declare the name of the message queue with the parameter CHANNEL_NAME
.
The same message queue can be listened to by multiple clients at the same time.
Multiple sliding windows can also reuse the same message queue, that is, the same client can listen to alert messages from multiple sliding windows at the same time.
The sliding window depends on the matrixts extension, first create the extension:
CREATE EXTENSION matrixts;
CREATE TABLE metrics(
time timestamp,
tag_id int,
sensor float8
)
DISTRIBUTED BY (tag_id);
CREATE VIEW sv1 WITH (
CONTINUOUS,
WINDOW_SIZE='10 min',
SLIDING_SIZE='1 min',
CHANNEL_NAME='my_channel'
) AS
SELECT tag_id, COUNT(*), SUM(sensor)
FROM metrics GROUP BY tag_id
HAVING MAX(sensor) > 10;
The parameters are divided into two parts, and the parameters of OPTION are related to the sliding window and message push. SQL specifies how stream data is displayed in a sliding window.
CONTINUOUS
declares that the view is a gathered sliding window.WINDOW_SIZE
Sliding window monitors the time window size of the data.SLIDING_SIZE
The amount of time the sliding window will slide.CHANNEL_NAME
The message queue name pushed by the database when the monitoring condition is met.SELECT
The data content pushed each time. (You can use postgres built-in json function to assemble the message format.)FROM
window data comes from. (Sliding window does not support multi-table data)WHERE
Which part of the data can be monitored by the window in the original data table.HAVING
monitors the trigger conditions for push. If the HAVING
condition is omitted, a message is pushed every time the window slides.The maximum temperature of each device in the past 2 minutes is counted and updated every 30 seconds. (i.e., a 2-minute time window, swipe once every 30 seconds)
The message format is (device number, maximum temperature).
Messages are pushed to message queue temp_monitor
.
First, create the device data table metrics_1.
CREATE TABLE metrics_1(
time timestamp,
tag_id int,
temp float8
)
DISTRIBUTED BY (tag_id);
Then create a sliding window.
CREATE VIEW temp_sv WITH (
CONTINUOUS,
WINDOW_SIZE='2 min',
SLIDING_SIZE='30 seconds',
CHANNEL_NAME='temp_monitor'
) AS
SELECT tag_id, MAX(temp)
FROM metrics_1 GROUP BY tag_id;
The voltage value of each device in the past 1 minute is counted and updated every 10 seconds.
An alarm is triggered when the minimum voltage exceeds 10.
The message format is (device number, average voltage, minimum voltage, maximum voltage)
.
Messages are pushed to message queue over_volt_alarm
.
First, create the device data table metrics_2
.
CREATE TABLE metrics_2(
time timestamp,
tag_id int,
volt float8
)
DISTRIBUTED BY (tag_id);
Then create a sliding window.
CREATE VIEW volt_alarm WITH (
CONTINUOUS,
WINDOW_SIZE='1 min',
SLIDING_SIZE='10 seconds',
CHANNEL_NAME='over_volt_alarm'
) AS
SELECT tag_id, AVG(volt), MIN(volt), MAX(volt)
FROM metrics_2 GROUP BY tag_id
HAVING MIN(volt) > 10;
ref: https://pkg.go.dev/github.com/lib/pq/example/listen
package main
import (
"database/sql"
"fmt"
"time"
"github.com/lib/pq"
)
func waitForNotification(l *pq.Listener) {
select {
case n := <-l.Notify:
fmt.Println("received notification, new work available")
fmt.Println(n.Channel)
fmt.Println(n.Extra)
case <-time.After(90 * time.Second):
go l.Ping()
// Check if there's more work available, just in case it takes
// a while for the Listener to notice connection loss and
// reconnect.
fmt.Println("received no work for 90 seconds, checking for new work")
}
}
func main() {
var conninfo string = "user=mxadmin password=mxadmin dbname=postgres sslmode=disable"
_, _ = sql.Open("postgres", conninfo)
reportProblem := func(ev pq.ListenerEventType, err error) {
if err != nil {
fmt.Println(err.Error())
}
}
minReconn := 10 * time.Second
maxReconn := time.Minute
fmt.Println("entering conn")
listener := pq.NewListener(conninfo, minReconn, maxReconn, reportProblem)
var err = listener.Listen("my_channel")
if err != nil {
panic(err)
}
fmt.Println("entering main loop")
for {
// process all available work before waiting for notifications
//getWork(db)
waitForNotification(listener)
}
}
ref: https://jdbc.postgresql.org/documentation/81/listennotify.html
package test;
import java.sql.*;
public class NotificationTest {
public static void main(String args[]) throws Exception {
Class.forName("org.postgresql.Driver");
String url = "jdbc:postgresql://172.16.100.32:5432/postgres";
// Create two distinct connections, one for the notifier
// and another for the listener to show the communication
// works across connections although this example would
// work fine with just one connection.
Connection lConn = DriverManager.getConnection(url,"mxadmin","mxadmin");
Connection nConn = DriverManager.getConnection(url,"mxadmin","mxadmin");
// Create two threads, one to issue notifications and
// the other to receive them.
Listener listener = new Listener(lConn);
// Notifier notifier = new Notifier(nConn);
listener.start();
// notifier.start();
}
}
class Listener extends Thread {
private Connection conn;
private org.postgresql.PGConnection pgconn;
Listener(Connection conn) throws SQLException {
this.conn = conn;
this.pgconn = (org.postgresql.PGConnection)conn;
Statement stmt = conn.createStatement();
stmt.execute("LISTEN my_channel");
stmt.close();
}
public void run() {
while (true) {
try {
// Issue a dummy query to contact the backend
// and receive any pending notifications.
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 1");
rs.close();
stmt.close();
org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
System.out.println(notifications.length);
if (notifications != null) {
for (int i=0; i<notifications.length; i++) {
System.out.println("Got notification: " + notifications[i].getName());
System.out.println("Got notification: " + notifications[i].getName() + " with payload: " + notifications[i].getParameter());
}
}
// wait a while before checking again for new
// Notifications
Thread.sleep(500);
} catch (SQLException sqle) {
sqle.printStackTrace();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
Won't. Expired data will be cleaned up regularly.
Yes, the data in the sliding window uses the same transaction ID as the original table data.
Window functions are not supported yet.
support.
Not supported.
Can.
Can.