Скользящие окна — распространённая функция в сценариях потоковой обработки данных. В условиях потоковой обработки можно непрерывно выполнять агрегацию данных за последний период времени.
Скользящие окна обычно используются совместно с мониторингом и оповещениями. Когда данные за последний временной интервал соответствуют заранее заданным условиям, сервер данных отправляет клиенту сообщение-оповещение.
Например, если каждую минуту рассчитывается средняя температура для каждого устройства, при превышении 90 градусов будет сгенерировано предупреждение.
Скользящие окна (Sliding Window) отличаются от календарных окон тем, что их временные интервалы могут перекрываться.
У скользящего окна есть два параметра: WINDOW_SIZE и SLIDING_SIZE. WINDOW_SIZE определяет диапазон действия окна, а SLIDING_SIZE — шаг смещения окна при каждом перемещении.
При перемещении окна используется время вставки (Insert Time) — момент времени, когда данные поступают в базу данных.
Шаг перемещения окна определяется параметром SLIDING_SIZE.
Если SLIDING_SIZE < WINDOW_SIZE, окна будут перекрываться, и каждый элемент данных может попасть в несколько временных окон.
Если SLIDING_SIZE = WINDOW_SIZE, поведение эквивалентно календарному окну: окна не перекрываются, и каждый элемент данных учитывается только один раз.
Если SLIDING_SIZE > WINDOW_SIZE, получаем «прыгающее» окно: между окнами возникают пропуски, некоторые данные могут быть пропущены.
Каждый раз при сдвиге окна база данных проверяет данные в текущем окне. Если данные удовлетворяют заданным условиям, отправляется сообщение-оповещение.
Отправка выполняется через механизм pg_notify, предоставляемый PostgreSQL.
Клиент использует интерфейс listener, предоставляемый протоколом libpq. (Например, в языке Go — libpq.NewListener; в Java — java.sql.Listener.)
При создании скользящего окна имя очереди сообщений указывается с помощью параметра CHANNEL_NAME.
Одну и ту же очередь сообщений могут одновременно прослушивать несколько клиентов.
Также несколько скользящих окон могут использовать одну и ту же очередь сообщений, то есть один и тот же клиент может одновременно получать оповещения от нескольких скользящих окон.
Скользящее окно зависит от расширения matrixts, сначала необходимо создать это расширение:
CREATE EXTENSION matrixts;
CREATE TABLE metrics(
time timestamp,
tag_id int,
sensor float8
)
DISTRIBUTED BY (tag_id);
CREATE VIEW sv1 WITH (
CONTINUOUS,
WINDOW_SIZE='10 min',
SLIDING_SIZE='1 min',
CHANNEL_NAME='my_channel'
) AS
SELECT tag_id, COUNT(*), SUM(sensor)
FROM metrics GROUP BY tag_id
HAVING MAX(sensor) > 10;
Параметры разделены на две части. Параметры в блоке OPTION относятся к скользящему окну и отправке сообщений. SQL определяет, как потоковые данные отображаются в скользящем окне.
CONTINUOUS — объявляет представление как агрегированное скользящее окно.WINDOW_SIZE — размер временного окна, в течение которого скользящее окно отслеживает данные.SLIDING_SIZE — интервал времени, через который окно сдвигается.CHANNEL_NAME — имя очереди сообщений, в которую база данных отправляет уведомления при выполнении условий мониторинга.SELECT — содержимое данных, отправляемых при каждом оповещении. (Можно использовать встроенные JSON-функции PostgreSQL для формирования формата сообщения.)FROM — источник данных для окна. (Скользящее окно не поддерживает данные из нескольких таблиц.)WHERE — какие данные из исходной таблицы могут быть проанализированы окном.HAVING — условия срабатывания отправки сообщения. Если условие HAVING опущено, сообщение отправляется при каждом сдвиге окна.Вычисляется максимальная температура каждого устройства за последние 2 минуты, обновление происходит каждые 30 секунд. (То есть временное окно — 2 минуты, сдвиг — каждые 30 секунд.)
Формат сообщения: (номер устройства, максимальная температура).
Сообщения отправляются в очередь temp_monitor.
Сначала создайте таблицу данных устройств metrics_1:
CREATE TABLE metrics_1(
time timestamp,
tag_id int,
temp float8
)
DISTRIBUTED BY (tag_id);
Затем создайте скользящее окно:
CREATE VIEW temp_sv WITH (
CONTINUOUS,
WINDOW_SIZE='2 min',
SLIDING_SIZE='30 seconds',
CHANNEL_NAME='temp_monitor'
) AS
SELECT tag_id, MAX(temp)
FROM metrics_1 GROUP BY tag_id;
Рассчитывается значение напряжения каждого устройства за последнюю минуту, обновление — каждые 10 секунд.
Срабатывание оповещения происходит, если минимальное напряжение превышает 10.
Формат сообщения — (device number, average voltage, minimum voltage, maximum voltage).
Сообщения отправляются в очередь over_volt_alarm.
Сначала создайте таблицу данных устройств metrics_2:
CREATE TABLE metrics_2(
time timestamp,
tag_id int,
volt float8
)
DISTRIBUTED BY (tag_id);
Затем создайте скользящее окно:
CREATE VIEW volt_alarm WITH (
CONTINUOUS,
WINDOW_SIZE='1 min',
SLIDING_SIZE='10 seconds',
CHANNEL_NAME='over_volt_alarm'
) AS
SELECT tag_id, AVG(volt), MIN(volt), MAX(volt)
FROM metrics_2 GROUP BY tag_id
HAVING MIN(volt) > 10;
см.: https://pkg.go.dev/github.com/lib/pq/example/listen
package main
import (
"database/sql"
"fmt"
"time"
"github.com/lib/pq"
)
func waitForNotification(l *pq.Listener) {
select {
case n := <-l.Notify:
fmt.Println("received notification, new work available")
fmt.Println(n.Channel)
fmt.Println(n.Extra)
case <-time.After(90 * time.Second):
go l.Ping()
// Check if there's more work available, just in case it takes
// a while for the Listener to notice connection loss and
// reconnect.
fmt.Println("received no work for 90 seconds, checking for new work")
}
}
func main() {
var conninfo string = ""
db, err := sql.Open("postgres", conninfo)
reportProblem := func(ev pq.ListenerEventType, err error) {
if err != nil {
fmt.Println(err.Error())
}
}
minReconn := 10 * time.Second
maxReconn := time.Minute
fmt.Println("entering conn")
listener := pq.NewListener(conninfo, minReconn, maxReconn, reportProblem)
var err = listener.Listen("my_channel")
if err != nil {
panic(err)
}
fmt.Println("entering main loop")
for {
// process all available work before waiting for notifications
// getWork(db)
waitForNotification(listener)
}
}
см.: https://jdbc.postgresql.org/documentation/81/listennotify.html
import java.sql.*;
public class NotificationTest {
public static void main(String args[]) throws Exception {
Class.forName("org.postgresql.Driver");
String url = "jdbc:postgresql://localhost:5432/test";
// Create two distinct connections, one for the notifier
// and another for the listener to show the communication
// works across connections although this example would
// work fine with just one connection.
Connection lConn = DriverManager.getConnection(url,"test","");
Connection nConn = DriverManager.getConnection(url,"test","");
// Create two threads, one to issue notifications and
// the other to receive them.
Listener listener = new Listener(lConn);
// Notifier notifier = new Notifier(nConn);
listener.start();
// notifier.start();
}
}
class Listener extends Thread {
private Connection conn;
private org.postgresql.PGConnection pgconn;
Listener(Connection conn) throws SQLException {
this.conn = conn;
this.pgconn = (org.postgresql.PGConnection)conn;
Statement stmt = conn.createStatement();
stmt.execute("LISTEN my_channel");
stmt.close();
}
public void run() {
while (true) {
try {
// Issue a dummy query to contact the backend
// and receive any pending notifications.
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 1");
rs.close();
stmt.close();
org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
if (notifications != null) {
for (int i=0; i<notifications.length; i++) {
System.out.println("Got notification: " + notifications[i].getName());
}
}
// wait a while before checking again for new
// Notifications
Thread.sleep(500);
} catch (SQLException sqle) {
sqle.printStackTrace();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
class Notifier extends Thread {
private Connection conn;
public Notifier(Connection conn) {
this.conn = conn;
}
public void run() {
while (true) {
try {
Statement stmt = conn.createStatement();
stmt.execute("NOTIFY mymessage");
stmt.close();
Thread.sleep(2000);
} catch (SQLException sqle) {
sqle.printStackTrace();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
Нет. Устаревшие данные регулярно удаляются.
Да. Данные в скользящем окне используют тот же идентификатор транзакции, что и данные в исходной таблице.
Оконные функции пока не поддерживаются.
Поддерживает.
Не поддерживает.
Можно.
Могут.