Скользящее окно

В этом документе описываются функции скользящего окна в YMatrix.

1 Введение в тему

Скользящие окна — распространённая функция в сценариях потоковой обработки данных. В таких сценариях можно непрерывно выполнять агрегацию данных за последний период времени.
Скользящие окна обычно используются совместно с системами мониторинга и оповещения. Когда данные за последний временной интервал соответствуют заранее заданным условиям, сервер отправляет клиенту сообщение-оповещение.
Например, если средняя температура для каждого устройства рассчитывается каждую минуту, то при превышении 90 градусов будет сгенерировано предупреждение.

2 Что такое скользящее окно?

Скользящие окна (Sliding Window) отличаются от прокручиваемых окон тем, что их временные интервалы могут перекрываться.
У скользящего окна есть два параметра: WINDOW_SIZE и SLIDING_SIZE. WINDOW_SIZE определяет диапазон отображения окна, а SLIDING_SIZE — шаг смещения при каждом перемещении окна.

3 Как происходит сдвиг временного окна?

При сдвиге окна используется время вставки (Insert Time) — момент, когда данные поступают в базу данных.
Шаг сдвига окна определяется параметром SLIDING_SIZE.

  • Если SLIDING_SIZE < WINDOW_SIZE, окна будут перекрываться, и каждый элемент данных может попасть в несколько временных окон.

  • Если SLIDING_SIZE = WINDOW_SIZE, это эквивалентно прокручиваемому окну без перекрытий, и каждый элемент данных будет обработан только один раз.

  • Если SLIDING_SIZE > WINDOW_SIZE, получается «прыгающее» окно — без перекрытий, но с промежутками между окнами, из-за чего некоторые данные могут быть пропущены.

4 Как осуществляется мониторинг и оповещение в скользящем окне?

Каждый раз при сдвиге окна база данных проверяет данные в текущем окне. Если они соответствуют заданным условиям, отправляется уведомление.
Отправка выполняется через механизм pg_notify, предоставляемый PostgreSQL.
Клиент использует интерфейс listener, предоставляемый протоколом libpq. (Например, в языке Go — libpq.NewListener; в Java — java.sql.Listener).
При создании скользящего окна имя очереди сообщений указывается с помощью параметра CHANNEL_NAME.

Одну и ту же очередь сообщений могут одновременно прослушивать несколько клиентов.

Также несколько скользящих окон могут использовать одну и ту же очередь сообщений, то есть один клиент может одновременно получать оповещения от нескольких скользящих окон.

5 Как использовать

5.1 Создание расширения

Скользящее окно зависит от расширения matrixts, поэтому сначала необходимо создать расширение:

=# CREATE EXTENSION matrixts;

5.2 Создание таблицы данных

=# CREATE TABLE metrics(
   time timestamp,
   tag_id int,
   sensor float8
   )
   USING MARS3
   DISTRIBUTED BY (tag_id)
   ORDER BY (time,tag_id);

5.3 Создание скользящего окна

=# CREATE VIEW sv1 WITH (
    CONTINUOUS, 
    WINDOW_SIZE='10 min',
    SLIDING_SIZE='1 min',
    CHANNEL_NAME='my_channel'
) AS
SELECT tag_id, COUNT(*), SUM(sensor)
FROM metrics GROUP BY tag_id
HAVING MAX(sensor) > 10;

5.4 Описание параметров

Параметры разделены на две части. Параметры в блоке OPTION связаны со скользящим окном и отправкой сообщений. SQL определяет, как потоковые данные отображаются в скользящем окне.

OPTION

  • CONTINUOUS — объявляет представление как агрегированное скользящее окно.
  • WINDOW_SIZE — размер временного окна, которое скользящее окно отслеживает.
  • SLIDING_SIZE — величина временного шага, с которым окно сдвигается.
  • CHANNEL_NAME — имя очереди сообщений, в которую база данных отправляет уведомления при выполнении условий мониторинга.

SQL

  • SELECT — содержимое данных, отправляемых при каждом срабатывании. (Можно использовать встроенные JSON-функции PostgreSQL для формирования формата сообщения.)
  • FROM — источник данных для окна. (Скользящее окно не поддерживает данные из нескольких таблиц.)
  • WHERE — какие данные из исходной таблицы могут быть проанализированы окном.
  • HAVING — условия срабатывания отправки сообщения. Если условие HAVING опущено, сообщение отправляется при каждом сдвиге окна.

5.5 Примеры

Пример 1: Мониторинг максимальной температуры

Вычисляется максимальная температура для каждого устройства за последние 2 минуты, обновление каждые 30 секунд (то есть временное окно 2 минуты, сдвиг каждые 30 секунд).
Формат сообщения: (номер устройства, максимальная температура).
Сообщения отправляются в очередь temp_monitor.
Сначала создайте таблицу данных устройств metrics_1.

=# CREATE TABLE metrics_1(
   time timestamp,
   tag_id int,
   temp float8
   )
   USING MARS3
   DISTRIBUTED BY (tag_id)
   ORDER BY (time,tag_id);

Затем создайте скользящее окно.

=# CREATE VIEW temp_sv WITH (
    CONTINUOUS, 
    WINDOW_SIZE='2 min',
    SLIDING_SIZE='30 seconds',
    CHANNEL_NAME='temp_monitor'
) AS
SELECT tag_id, MAX(temp)
FROM metrics_1 GROUP BY tag_id;

Пример 2: Оповещение о минимальном напряжении

Рассчитывается значение напряжения для каждого устройства за последнюю минуту, обновление каждые 10 секунд.
Срабатывание тревоги при значении минимального напряжения выше 10.
Формат сообщения: (device number, average voltage, minimum voltage, maximum voltage).
Сообщения отправляются в очередь over_volt_alarm.
Сначала создайте таблицу данных устройств metrics_2.

=# CREATE TABLE metrics_2(
   time timestamp,
   tag_id int,
   volt float8
   )
   USING MARS3
   DISTRIBUTED BY (tag_id)
   ORDER BY (time,tag_id);

Затем создайте скользящее окно.

=# CREATE VIEW volt_alarm WITH (
    CONTINUOUS, 
    WINDOW_SIZE='1 min',
    SLIDING_SIZE='10 seconds',
    CHANNEL_NAME='over_volt_alarm'
) AS
SELECT tag_id, AVG(volt), MIN(volt), MAX(volt)
FROM metrics_2 GROUP BY tag_id
HAVING MIN(volt) > 10;

Пример клиента 1: Язык Go

ссылка: https://pkg.go.dev/github.com/lib/pq/example/listen

package main

import (
    "database/sql"
    "fmt"
    "time"

    "github.com/lib/pq"
)

func waitForNotification(l *pq.Listener) {
    select {
    case n := <-l.Notify:
        fmt.Println("received notification, new work available")
        fmt.Println(n.Channel)
        fmt.Println(n.Extra)

    case <-time.After(90 * time.Second):
        go l.Ping()
        // Check if there's more work available, just in case it takes
        // a while for the Listener to notice connection loss and
        // reconnect.
        fmt.Println("received no work for 90 seconds, checking for new work")
    }
}

func main() {
    var conninfo string = "user=mxadmin password=mxadmin dbname=postgres sslmode=disable"

    _, _ = sql.Open("postgres", conninfo)

    reportProblem := func(ev pq.ListenerEventType, err error) {
        if err != nil {
            fmt.Println(err.Error())
        }
    }

    minReconn := 10 * time.Second
    maxReconn := time.Minute
    fmt.Println("entering conn")
    listener := pq.NewListener(conninfo, minReconn, maxReconn, reportProblem)
    var err = listener.Listen("my_channel")
    if err != nil {
        panic(err)
    }

    fmt.Println("entering main loop")
    for {
        // process all available work before waiting for notifications
        //getWork(db)
        waitForNotification(listener)
    }
}

Пример клиента 2: Язык Java

ссылка: https://jdbc.postgresql.org/documentation/81/listennotify.html

package test;

import java.sql.*;

public class NotificationTest {

    public static void main(String args[]) throws Exception {
        Class.forName("org.postgresql.Driver");
        String url = "jdbc:postgresql://172.16.100.32:5432/postgres";

        // Create two distinct connections, one for the notifier
        // and another for the listener to show the communication
        // works across connections although this example would
        // work fine with just one connection.
        Connection lConn = DriverManager.getConnection(url,"mxadmin","mxadmin");
        Connection nConn = DriverManager.getConnection(url,"mxadmin","mxadmin");

        // Create two threads, one to issue notifications and
        // the other to receive them.
        Listener listener = new Listener(lConn);
//        Notifier notifier = new Notifier(nConn);
        listener.start();
//        notifier.start();
    }

}

class Listener extends Thread {

    private Connection conn;
    private org.postgresql.PGConnection pgconn;

    Listener(Connection conn) throws SQLException {
        this.conn = conn;
        this.pgconn = (org.postgresql.PGConnection)conn;
        Statement stmt = conn.createStatement();
        stmt.execute("LISTEN my_channel");
        stmt.close();
    }

    public void run() {
        while (true) {
            try {
                // Issue a dummy query to contact the backend
                // and receive any pending notifications.
                Statement stmt = conn.createStatement();
                ResultSet rs = stmt.executeQuery("SELECT 1");
                rs.close();
                stmt.close();

                org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
                System.out.println(notifications.length);
                if (notifications != null) {
                    for (int i=0; i<notifications.length; i++) {
                        System.out.println("Got notification: " + notifications[i].getName());
                        System.out.println("Got notification: " + notifications[i].getName() + " with payload: " + notifications[i].getParameter());
                    }
                }

                // wait a while before checking again for new
                // Notifications
                Thread.sleep(500);
            } catch (SQLException sqle) {
                sqle.printStackTrace();
            } catch (InterruptedException ie) {
                ie.printStackTrace();
            }
        }
    }
}

6 Часто задаваемые вопросы

  1. Будут ли данные в скользящем окне сохраняться?
Нет. Устаревшие данные регулярно очищаются.
  1. Гарантирует ли скользящее окно ACID?
Да. Данные в скользящем окне используют тот же идентификатор транзакции, что и данные в исходной таблице.
  1. Поддерживаются ли оконные функции в скользящем окне?
Оконные функции пока не поддерживаются.
  1. Поддерживает ли скользящее окно секционированные таблицы?
Поддерживает.
  1. Поддерживает ли скользящее окно мониторинг нескольких таблиц?
Не поддерживает.
  1. Можно ли определить несколько скользящих окон для одной таблицы?
Можно.
  1. Могут ли несколько клиентов одновременно прослушивать сообщения от одного скользящего окна?
Могут.