Скользящее окно

В этом документе описана функция скользящего окна в YMatrix.

1. Предпосылки

Скользящие окна — распространённая функция в сценариях обработки потоковых данных. В таких случаях можно выполнять непрерывную агрегацию данных за последний временной период.
Скользящие окна обычно используются совместно с системами мониторинга и оповещения. Когда данные в недавнем временном окне соответствуют заранее заданным условиям, сервер базы данных отправляет клиенту сообщение-оповещение.
Например, вычислять среднюю температуру по устройству каждую минуту и формировать оповещение, если она превышает 90 градусов.

2. Что такое скользящее окно?

Скользящее окно отличается от каскадного (tumbling) тем, что его временные интервалы могут перекрываться.
Скользящее окно имеет два параметра: WINDOW_SIZE и SLIDING_SIZE.
WINDOW_SIZE определяет временной диапазон окна, а SLIDING_SIZE задаёт шаг смещения при каждом сдвиге.

3. Как происходит сдвиг временного окна?

Для скользящего окна используется время вставки, то есть момент, когда данные поступают в базу данных.
Шаг сдвига управляется параметром SLIDING_SIZE.

  • Если SLIDING_SIZE < WINDOW_SIZE, окна перекрываются. Каждая запись данных может попасть в несколько окон.

  • Если SLIDING_SIZE = WINDOW_SIZE, поведение эквивалентно каскадному окну — без перекрытий, и каждая запись попадает ровно в одно окно.

  • Если SLIDING_SIZE > WINDOW_SIZE, получается скачкообразное окно (hopping window) — без перекрытий, но с промежутками между окнами. Некоторые данные могут не попасть ни в одно окно.

4. Как работает оповещение со скользящими окнами?

Каждый раз при сдвиге окна база данных анализирует данные в текущем окне. Если данные удовлетворяют заданному условию, отправляется сообщение.
Сообщения передаются через механизм pg_notify PostgreSQL.
Клиенты получают сообщения с помощью интерфейса listener, предоставляемого протоколом libpq (например, libpq.NewListener на Go, java.sql.Listener на Java).
При создании скользящего окна используйте параметр CHANNEL_NAME, чтобы указать имя очереди сообщений.

Несколько клиентов могут одновременно прослушивать одну и ту же очередь сообщений.

Несколько скользящих окон также могут использовать одну и ту же очередь сообщений. Таким образом, один клиент может получать оповещения от нескольких скользящих окон.

5. Использование

5.1 Создание расширения

Функция скользящего окна зависит от расширения matrixts. Сначала создайте расширение:

=# CREATE EXTENSION matrixts;

5.2 Создание таблицы данных

=# CREATE TABLE metrics(
   time timestamp,
   tag_id int,
   sensor float8
   )
   USING MARS3
   DISTRIBUTED BY (tag_id)
   ORDER BY (time,tag_id);

5.3 Создание скользящего окна

=# CREATE VIEW sv1 WITH (
    CONTINUOUS, 
    WINDOW_SIZE='10 min',
    SLIDING_SIZE='1 min',
    CHANNEL_NAME='my_channel'
) AS
SELECT tag_id, COUNT(*), SUM(sensor)
FROM metrics GROUP BY tag_id
HAVING MAX(sensor) > 10;

5.4 Описание параметров

Параметры делятся на две категории: те, что определены в предложении WITH и относятся к скользящему окну и отправке сообщений, и SQL-запрос, определяющий способ агрегации потоковых данных внутри окна.

Предложение WITH

  • CONTINUOUS: Объявляет представление как непрерывное скользящее окно.
  • WINDOW_SIZE: Указывает продолжительность времени для скользящего окна.
  • SLIDING_SIZE: Задаёт интервал шага сдвига.
  • CHANNEL_NAME: Имя очереди сообщений, в которую публикуются оповещения при выполнении условий.

SQL-запрос

  • SELECT: Содержимое, отправляемое в каждом сообщении (можно использовать встроенные JSON-функции PostgreSQL для форматирования полезной нагрузки).
  • FROM: Исходная таблица, из которой читаются потоковые данные (скользящие окна не поддерживают объединения или несколько таблиц).
  • WHERE: Определяет, какая часть базовой таблицы видима для окна.
  • HAVING: Условие, вызывающее отправку сообщения. Если HAVING опущено, сообщение отправляется при каждом сдвиге окна.

5.5 Примеры

Пример 1: Мониторинг максимальной температуры

Вычислить максимальную температуру по устройству за последние 2 минуты, обновляя каждые 30 секунд (то есть 2-минутное окно, сдвигающееся каждые 30 секунд).
Формат сообщения: (идентификатор устройства, максимальная температура).
Сообщения отправляются в канал temp_monitor.
Сначала создайте таблицу метрик metrics_1.

=# CREATE TABLE metrics_1(
   time timestamp,
   tag_id int,
   temp float8
   )
   USING MARS3
   DISTRIBUTED BY (tag_id)
   ORDER BY (time,tag_id);

Затем создайте скользящее окно:

=# CREATE VIEW temp_sv WITH (
    CONTINUOUS, 
    WINDOW_SIZE='2 min',
    SLIDING_SIZE='30 seconds',
    CHANNEL_NAME='temp_monitor'
) AS
SELECT tag_id, MAX(temp)
FROM metrics_1 GROUP BY tag_id;

Пример 2: Оповещение о низком напряжении

Мониторинг значений напряжения по устройствам за последнюю минуту, обновление каждые 10 секунд.
Срабатывание оповещения, когда минимальное напряжение превышает 10.
Формат сообщения: (设备号,平均电压,最低电压,最高电压).
Сообщения отправляются в канал over_volt_alarm.
Сначала создайте таблицу данных metrics_2.

=# CREATE TABLE metrics_2(
   time timestamp,
   tag_id int,
   volt float8
   )
   USING MARS3
   DISTRIBUTED BY (tag_id)
   ORDER BY (time,tag_id);

Затем создайте скользящее окно:

=# CREATE VIEW volt_alarm WITH (
    CONTINUOUS, 
    WINDOW_SIZE='1 min',
    SLIDING_SIZE='10 seconds',
    CHANNEL_NAME='over_volt_alarm'
) AS
SELECT tag_id, AVG(volt), MIN(volt), MAX(volt)
FROM metrics_2 GROUP BY tag_id
HAVING MIN(volt) > 10;

Пример клиента 1: Язык Go

ссылка: https://pkg.go.dev/github.com/lib/pq/example/listen

package main

import (
    "database/sql"
    "fmt"
    "time"

    "github.com/lib/pq"
)

func waitForNotification(l *pq.Listener) {
    select {
    case n := <-l.Notify:
        fmt.Println("received notification, new work available")
        fmt.Println(n.Channel)
        fmt.Println(n.Extra)

    case <-time.After(90 * time.Second):
        go l.Ping()
        // Check if there's more work available, just in case it takes
        // a while for the Listener to notice connection loss and
        // reconnect.
        fmt.Println("received no work for 90 seconds, checking for new work")
    }
}

func main() {
    var conninfo string = "user=mxadmin password=mxadmin dbname=postgres sslmode=disable"

    _, _ = sql.Open("postgres", conninfo)

    reportProblem := func(ev pq.ListenerEventType, err error) {
        if err != nil {
            fmt.Println(err.Error())
        }
    }

    minReconn := 10 * time.Second
    maxReconn := time.Minute
    fmt.Println("entering conn")
    listener := pq.NewListener(conninfo, minReconn, maxReconn, reportProblem)
    var err = listener.Listen("my_channel")
    if err != nil {
        panic(err)
    }

    fmt.Println("entering main loop")
    for {
        // process all available work before waiting for notifications
        //getWork(db)
        waitForNotification(listener)
    }
}

Пример клиента 2: Язык Java

ссылка: https://jdbc.postgresql.org/documentation/81/listennotify.html

package test;

import java.sql.*;

public class NotificationTest {

    public static void main(String args[]) throws Exception {
        Class.forName("org.postgresql.Driver");
        String url = "jdbc:postgresql://172.16.100.32:5432/postgres";

        // Create two distinct connections, one for the notifier
        // and another for the listener to show the communication
        // works across connections although this example would
        // work fine with just one connection.
        Connection lConn = DriverManager.getConnection(url,"mxadmin","mxadmin");
        Connection nConn = DriverManager.getConnection(url,"mxadmin","mxadmin");

        // Create two threads, one to issue notifications and
        // the other to receive them.
        Listener listener = new Listener(lConn);
//        Notifier notifier = new Notifier(nConn);
        listener.start();
//        notifier.start();
    }

}

class Listener extends Thread {

    private Connection conn;
    private org.postgresql.PGConnection pgconn;

    Listener(Connection conn) throws SQLException {
        this.conn = conn;
        this.pgconn = (org.postgresql.PGConnection)conn;
        Statement stmt = conn.createStatement();
        stmt.execute("LISTEN my_channel");
        stmt.close();
    }

    public void run() {
        while (true) {
            try {
                // issue a dummy query to contact the backend
                // and receive any pending notifications.
                Statement stmt = conn.createStatement();
                ResultSet rs = stmt.executeQuery("SELECT 1");
                rs.close();
                stmt.close();

                org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
                System.out.println(notifications.length);
                if (notifications != null) {
                    for (int i=0; i<notifications.length; i++) {
                        System.out.println("Got notification: " + notifications[i].getName());
                        System.out.println("Got notification: " + notifications[i].getName() + " with payload: " + notifications[i].getParameter());
                    }
                }

                // wait a while before checking again for new
                // notifications
                Thread.sleep(500);
            } catch (SQLException sqle) {
                sqle.printStackTrace();
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
}

6. Часто задаваемые вопросы

  1. Сохраняются ли данные в скользящих окнах?
    Нет. Устаревшие данные периодически удаляются.

  2. Гарантирует ли скользящее окно ACID?
    Да. Данные в скользящих окнах имеют тот же идентификатор транзакции, что и базовая таблица.

  3. Поддерживаются ли оконные функции в скользящих окнах?
    На данный момент не поддерживаются.

  4. Поддерживает ли скользящее окно секционированные таблицы?
    Да.

  5. Может ли скользящее окно отслеживать несколько таблиц?
    Нет.

  6. Можно ли определить несколько скользящих окон на одной таблице?
    Да.

  7. Могут ли несколько клиентов одновременно получать оповещения от одного скользящего окна?
    Да.