В этом документе описываются функции скользящего окна в YMatrix.
Скользящие окна — распространённая функция в сценариях потоковой обработки данных. В таких сценариях можно непрерывно выполнять агрегацию данных за последний период времени.
Скользящие окна обычно используются совместно с системами мониторинга и оповещения. Когда данные за последний временной интервал соответствуют заранее заданным условиям, сервер отправляет клиенту сообщение-оповещение.
Например, если средняя температура для каждого устройства рассчитывается каждую минуту, то при превышении 90 градусов будет сгенерировано предупреждение.
Скользящие окна (Sliding Window) отличаются от прокручиваемых окон тем, что их временные интервалы могут перекрываться.
У скользящего окна есть два параметра: WINDOW_SIZE и SLIDING_SIZE. WINDOW_SIZE определяет диапазон отображения окна, а SLIDING_SIZE — шаг смещения при каждом перемещении окна.
При сдвиге окна используется время вставки (Insert Time) — момент, когда данные поступают в базу данных.
Шаг сдвига окна определяется параметром SLIDING_SIZE.
Если SLIDING_SIZE < WINDOW_SIZE, окна будут перекрываться, и каждый элемент данных может попасть в несколько временных окон.
Если SLIDING_SIZE = WINDOW_SIZE, это эквивалентно прокручиваемому окну без перекрытий, и каждый элемент данных будет обработан только один раз.
Если SLIDING_SIZE > WINDOW_SIZE, получается «прыгающее» окно — без перекрытий, но с промежутками между окнами, из-за чего некоторые данные могут быть пропущены.
Каждый раз при сдвиге окна база данных проверяет данные в текущем окне. Если они соответствуют заданным условиям, отправляется уведомление.
Отправка выполняется через механизм pg_notify, предоставляемый PostgreSQL.
Клиент использует интерфейс listener, предоставляемый протоколом libpq. (Например, в языке Go — libpq.NewListener; в Java — java.sql.Listener).
При создании скользящего окна имя очереди сообщений указывается с помощью параметра CHANNEL_NAME.
Одну и ту же очередь сообщений могут одновременно прослушивать несколько клиентов.
Также несколько скользящих окон могут использовать одну и ту же очередь сообщений, то есть один клиент может одновременно получать оповещения от нескольких скользящих окон.
Скользящее окно зависит от расширения matrixts, поэтому сначала необходимо создать расширение:
=# CREATE EXTENSION matrixts;
=# CREATE TABLE metrics(
time timestamp,
tag_id int,
sensor float8
)
USING MARS3
DISTRIBUTED BY (tag_id)
ORDER BY (time,tag_id);
=# CREATE VIEW sv1 WITH (
CONTINUOUS,
WINDOW_SIZE='10 min',
SLIDING_SIZE='1 min',
CHANNEL_NAME='my_channel'
) AS
SELECT tag_id, COUNT(*), SUM(sensor)
FROM metrics GROUP BY tag_id
HAVING MAX(sensor) > 10;
Параметры разделены на две части. Параметры в блоке OPTION связаны со скользящим окном и отправкой сообщений. SQL определяет, как потоковые данные отображаются в скользящем окне.
CONTINUOUS — объявляет представление как агрегированное скользящее окно.WINDOW_SIZE — размер временного окна, которое скользящее окно отслеживает.SLIDING_SIZE — величина временного шага, с которым окно сдвигается.CHANNEL_NAME — имя очереди сообщений, в которую база данных отправляет уведомления при выполнении условий мониторинга.SELECT — содержимое данных, отправляемых при каждом срабатывании.
(Можно использовать встроенные JSON-функции PostgreSQL для формирования формата сообщения.)FROM — источник данных для окна.
(Скользящее окно не поддерживает данные из нескольких таблиц.)WHERE — какие данные из исходной таблицы могут быть проанализированы окном.HAVING — условия срабатывания отправки сообщения.
Если условие HAVING опущено, сообщение отправляется при каждом сдвиге окна.Вычисляется максимальная температура для каждого устройства за последние 2 минуты, обновление каждые 30 секунд (то есть временное окно 2 минуты, сдвиг каждые 30 секунд).
Формат сообщения: (номер устройства, максимальная температура).
Сообщения отправляются в очередь temp_monitor.
Сначала создайте таблицу данных устройств metrics_1.
=# CREATE TABLE metrics_1(
time timestamp,
tag_id int,
temp float8
)
USING MARS3
DISTRIBUTED BY (tag_id)
ORDER BY (time,tag_id);
Затем создайте скользящее окно.
=# CREATE VIEW temp_sv WITH (
CONTINUOUS,
WINDOW_SIZE='2 min',
SLIDING_SIZE='30 seconds',
CHANNEL_NAME='temp_monitor'
) AS
SELECT tag_id, MAX(temp)
FROM metrics_1 GROUP BY tag_id;
Рассчитывается значение напряжения для каждого устройства за последнюю минуту, обновление каждые 10 секунд.
Срабатывание тревоги при значении минимального напряжения выше 10.
Формат сообщения: (device number, average voltage, minimum voltage, maximum voltage).
Сообщения отправляются в очередь over_volt_alarm.
Сначала создайте таблицу данных устройств metrics_2.
=# CREATE TABLE metrics_2(
time timestamp,
tag_id int,
volt float8
)
USING MARS3
DISTRIBUTED BY (tag_id)
ORDER BY (time,tag_id);
Затем создайте скользящее окно.
=# CREATE VIEW volt_alarm WITH (
CONTINUOUS,
WINDOW_SIZE='1 min',
SLIDING_SIZE='10 seconds',
CHANNEL_NAME='over_volt_alarm'
) AS
SELECT tag_id, AVG(volt), MIN(volt), MAX(volt)
FROM metrics_2 GROUP BY tag_id
HAVING MIN(volt) > 10;
ссылка: https://pkg.go.dev/github.com/lib/pq/example/listen
package main
import (
"database/sql"
"fmt"
"time"
"github.com/lib/pq"
)
func waitForNotification(l *pq.Listener) {
select {
case n := <-l.Notify:
fmt.Println("received notification, new work available")
fmt.Println(n.Channel)
fmt.Println(n.Extra)
case <-time.After(90 * time.Second):
go l.Ping()
// Check if there's more work available, just in case it takes
// a while for the Listener to notice connection loss and
// reconnect.
fmt.Println("received no work for 90 seconds, checking for new work")
}
}
func main() {
var conninfo string = "user=mxadmin password=mxadmin dbname=postgres sslmode=disable"
_, _ = sql.Open("postgres", conninfo)
reportProblem := func(ev pq.ListenerEventType, err error) {
if err != nil {
fmt.Println(err.Error())
}
}
minReconn := 10 * time.Second
maxReconn := time.Minute
fmt.Println("entering conn")
listener := pq.NewListener(conninfo, minReconn, maxReconn, reportProblem)
var err = listener.Listen("my_channel")
if err != nil {
panic(err)
}
fmt.Println("entering main loop")
for {
// process all available work before waiting for notifications
//getWork(db)
waitForNotification(listener)
}
}
ссылка: https://jdbc.postgresql.org/documentation/81/listennotify.html
package test;
import java.sql.*;
public class NotificationTest {
public static void main(String args[]) throws Exception {
Class.forName("org.postgresql.Driver");
String url = "jdbc:postgresql://172.16.100.32:5432/postgres";
// Create two distinct connections, one for the notifier
// and another for the listener to show the communication
// works across connections although this example would
// work fine with just one connection.
Connection lConn = DriverManager.getConnection(url,"mxadmin","mxadmin");
Connection nConn = DriverManager.getConnection(url,"mxadmin","mxadmin");
// Create two threads, one to issue notifications and
// the other to receive them.
Listener listener = new Listener(lConn);
// Notifier notifier = new Notifier(nConn);
listener.start();
// notifier.start();
}
}
class Listener extends Thread {
private Connection conn;
private org.postgresql.PGConnection pgconn;
Listener(Connection conn) throws SQLException {
this.conn = conn;
this.pgconn = (org.postgresql.PGConnection)conn;
Statement stmt = conn.createStatement();
stmt.execute("LISTEN my_channel");
stmt.close();
}
public void run() {
while (true) {
try {
// Issue a dummy query to contact the backend
// and receive any pending notifications.
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("SELECT 1");
rs.close();
stmt.close();
org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
System.out.println(notifications.length);
if (notifications != null) {
for (int i=0; i<notifications.length; i++) {
System.out.println("Got notification: " + notifications[i].getName());
System.out.println("Got notification: " + notifications[i].getName() + " with payload: " + notifications[i].getParameter());
}
}
// wait a while before checking again for new
// Notifications
Thread.sleep(500);
} catch (SQLException sqle) {
sqle.printStackTrace();
} catch (InterruptedException ie) {
ie.printStackTrace();
}
}
}
}
Нет. Устаревшие данные регулярно очищаются.
Да. Данные в скользящем окне используют тот же идентификатор транзакции, что и данные в исходной таблице.
Оконные функции пока не поддерживаются.
Поддерживает.
Не поддерживает.
Можно.
Могут.