Типовые запросы для сценариев временных рядов IoT

В этом документе приведены примеры типовых запросов в сценариях временных рядов. Поскольку большинство реальных шаблонов запросов носят составной характер, классификация здесь не является строго исключительной.

Примечание!
Понимание требований к запросам сценария — один из ключевых этапов построения точной модели данных.


1 Базовые запросы для одного устройства

1.1 Точечный запрос по одному устройству

Запрос значения одного или нескольких метрик для одного устройства в определённый момент времени. Наиболее распространённый случай — получение последнего или последнего ненулевого значения определённых метрик, например, текущей скорости движущегося транспортного средства:

=# SELECT timestamp, car_id, value FROM car
    WHERE car_id = '2' 
    ORDER BY timestamp DESC
    LIMIT 1;

1.2 Подробный запрос по одному устройству

Получение детализированных данных по одному устройству за указанный временной интервал. Может включать одну или несколько метрик и обычно используется для глубокого анализа, например, при поиске первопричин в послепродажном обслуживании:

=# SELECT * FROM analysis
    WHERE device_id = '2' 
    AND timestamp >= '2023-10-08 00:00:00' 
    AND timestamp <= '2023-10-08 09:00:00';

1.3 Агрегированный запрос по одному устройству

Вычисление агрегированных значений (например, количество, сумма, среднее, максимум, минимум) определённых метрик для одного устройства в заданном временном диапазоне:

=# SELECT count(1),sum(c1),avg(c1),max(c2),min(c2) FROM metrics
    WHERE device_id = '1'
    AND timestamp >= '2023-10-08 00:00:00' 
    AND timestamp <= '2023-10-08 09:00:00';


2 Базовые запросы для нескольких устройств

2.1 Точечный запрос по нескольким устройствам

Запрос значений метрик по нескольким устройствам в определённый момент времени. Может включать одну или несколько метрик, например, комбинированная проверка сигнала:

=# SELECT timestamp, device_id, value FROM metrics
    WHERE device_id = '2' OR device_id = '3'
    AND timestamp = '2023-10-08 00:00:00'
    ORDER BY device_id, timestamp DESC;

Совет
Результат возвращает строки, удовлетворяющие условиям, сначала отсортированные по возрастанию по device_id. Если есть несколько записей с одинаковым значением device_id, они затем сортируются по убыванию по timestamp. Чтобы отсортировать оба столбца по убыванию, измените SQL-запрос, используя ORDER BY device_id DESC, timestamp DESC.

2.2 Подробный запрос по нескольким устройствам

Извлечение детализированных данных с нескольких устройств за временной интервал. Такой запрос поддерживает такие случаи использования, как обнаружение аномалий, анализ первопричин или отслеживание новых продуктов, и часто применяется командами R&D:

=# SELECT d.device_id, d.timestamp, d.value, s.device_name, l.location_name
FROM device_data AS d
JOIN device_status AS s ON d.device_id = s.device_id
JOIN device_location AS l ON d.device_id = l.device_id
WHERE d.device_id IN ('device1', 'device2', 'device3')
  AND d.timestamp >= '2023-11-01 00:00:00'
  AND d.timestamp <= '2023-11-10 23:59:59'
ORDER BY d.device_id, d.timestamp;

Это пример анализа первопричин, при условии существования следующих таблиц:

  • device_data: Таблица с данными устройств, содержащая столбцы device_id, timestamp, value
  • device_status: Таблица хранения состояния устройств с полями device_id, device_name
  • device_location: Таблица хранения информации о местоположении устройств с полями device_id, location_name

Ключевые моменты:

  • Запрос объединяет все три таблицы для получения данных об устройстве, его состоянии и местоположении.
    • Предложение JOIN связывает соответствующие таблицы по идентификатору устройства.
    • Предложение WHERE фильтрует данные по конкретным идентификаторам устройств и временному диапазону.

2.3 Агрегированный запрос по нескольким устройствам

Вычисление агрегированных метрик (например, максимум, минимум, сумма, среднее) по нескольким устройствам в заданном временном интервале с группировкой по устройству или другим атрибутам — например, проверка сетевых условий по регионам:

=# SELECT l.location_name, COUNT(d.device_id) AS total_devices, AVG(d.value) AS average_value
FROM device_data AS d
JOIN device_location AS l ON d.device_id = l.device_id
WHERE l.location_name IN ('location1', 'location2', 'location3')
  AND d.timestamp >= '2023-11-01 00:00:00'
  AND d.timestamp <= '2023-11-10 23:59:59'
GROUP BY l.location_name;

В этом примере предполагается наличие двух таблиц:

  • device_data: Содержит данные устройств с полями device_id, timestamp, value
  • device_location: Хранит информацию о местоположении устройств с полями device_id, location_name

Ключевые моменты:

  • Запрос объединяет две таблицы для получения данных об устройстве и его местоположении.
    • Предложение JOIN соединяет таблицы по идентификатору устройства.
    • Предложение WHERE фильтрует данные по региону и времени.
    • Предложение GROUP BY группирует результаты по местоположению.
    • Агрегирующие функции COUNT и AVG вычисляют количество устройств и среднее значение.

3 Расширенные запросы


3.1 Пространственно-временные запросы

Запросы, включающие временные функции, такие как first() и last(), иногда в сочетании с пространственными данными — например, отслеживание начального и конечного уровней топлива.

Для использования функций временных рядов YMatrix необходимо сначала создать расширение:

=# CREATE EXTENSION matrixts;

Расширение создаётся на уровне базы данных; его нужно создавать один раз для каждой базы данных.


3.1.1 Запрос самого раннего значения

Возвращает значение из первой записи указанного столбца:

=# SELECT id,first(c1, ts) AS c,
    FROM t
    GROUP BY id
    ORDER BY id;


3.1.2 Запрос последнего значения

Возвращает значение из последней записи указанного столбца:

=# SELECT device_id, last(value, timestamp) AS last_value
    FROM t
    GROUP BY device_id
    ORDER BY device_id;


3.1.3 Вычисление среднего за временной интервал

=# SELECT time_bucket('5 s', timestamp) AS five_second, count(*) FROM t
    GROUP BY five_second;

Приведённый выше SQL означает, что таблица t дискретизируется каждые 5s секунд с использованием count(*).

Ключевой момент:

  • Группировка исходных данных с помощью GROUP BY на более широкие временные интервалы и вычисление сводных статистик называется понижающей дискретизацией (downsampling). Понижающая дискретизация снижает затраты на хранение, сохраняя при этом ключевые характеристики данных, что позволяет проводить анализ исторических тенденций и прогнозирование.


3.1.4 Последнее ненулевое значение

=# SELECT 
    device_id,
    max(timestamp) as max_timestamp,
    last_not_null_value(value1, timestamp) as lastvalue1,
    last_not_null_value(value2, timestamp) as lastvalue2,
    last_not_null_value(value3, timestamp) as lastvalue3
   FROM t
     GROUP BY device_id;

Или:

=# SELECT 
    device_id,
    max(timestamp) as max_timestamp,
    last_not_null(value1, timestamp) as lastvalue1,
    last_not_null(value2, timestamp) as lastvalue2,
    last_not_null(value3, timestamp) as lastvalue3
   FROM t
     GROUP BY device_id;

Ключевые моменты:

  • Используйте функции last_not_null() или last_not_null_value().
  • По сравнению с last_not_null(), функция last_not_null_value() возвращает как значение, так и отметку времени. Тип возвращаемого значения — строка, формат: [<value>, <time>].


3.1.5 Вычисление разницы

Вычисление разницы между последовательными значениями для обнаружения изменений — например, мониторинг поведения акций:

=# SELECT
    device_id,
    timestamp, 
    value,
    lag(value) OVER(PARTITION BY device_id ORDER BY timestamp),
    lead(value) OVER(PARTITION BY device_id ORDER BY timestamp),
    lead(value) OVER(PARTITION BY device_id ORDER BY timestamp) - value AS lead_value 
FROM t;

Ключевые моменты:

  • Оконная функция lag() извлекает значение из предыдущей строки.
  • Оконная функция lead() извлекает значение из следующей строки.
  • Предложение PARTITION BY с device_id применяет оконную функцию внутри каждого раздела по устройству.
  • lead(value) - value вычисляет разницу между следующим и текущим значением, которая называется lead_value.


3.2 Очистка аномальных данных

=# SELECT time_bucket_gapfill('45 minutes', timestamp) AS five_min,
          locf(AVG(value)) As locf_value,
          interpolate(AVG(value)) AS interpolate_value
   FROM t
       WHERE device_id = '1'
       AND timestamp >= '2021-12-01 00:00:00'
       AND timestamp < '2021-12-02 06:00:00'
   GROUP BY five_min
   ORDER BY five_min;

Ключевые моменты:

  • Запрос использует time_bucket_gapfill() вместе с методом LOCF (последнее наблюдение переносится вперёд) и линейной интерполяцией для заполнения пропущенных значений в таблице t.
    • time_bucket_gapfill('45 minutes', timestamp): Группирует временные метки (timestamp) в фиксированные интервалы (45-минутные блоки) и заполняет пробелы для обеспечения непрерывности.
    • locf(AVG(c1)): Применяет метод LOCF для заполнения value в каждом блоке, используя последнее ненулевое наблюдённое значение.
    • interpolate(AVG(c1)): Использует линейную интерполяцию для оценки value на основе окружающих ненулевых значений.


3.3 Обнаружение аномалий

3.3.1 Накопленная сумма, среднее и медиана

Обнаружение аномалий по изменению накопленного среднего или медианы:

-- Cumulative sum
=# SELECT device_id, timestamp, value,
          SUM(value) OVER(PARTITION BY device_id, timestamp::date ORDER BY timestamp) AS accumulation_value
   FROM t;

-- Cumulative average
=# SELECT device_id, timestamp, value,
       AVG(value) OVER(PARTITION BY device_id, timestamp::date ORDER BY timestamp) AS accumulation_value
   FROM t;

-- Median
=# SELECT device_id,
       percentile_cont(0.5) WITHIN GROUP (ORDER BY value) AS accumulation_value
   FROM t
   GROUP BY device_id
   ORDER BY device_id;

3.3.2 Вычисление приращения

Вычисление приращений для монотонных последовательностей или простых изменений между последовательными записями, полезно для обнаружения аномалий.

Пример 1

Вычисляет изменение чтения с диска для диска tag_id с идентификатором тега 1 между 2021-4-10 21:00:00 и 21:01:00:

=# SELECT
      time,
      (
       CASE WHEN lag(read) OVER (ORDER BY time) IS NULL THEN NULL
       ELSE round(read - lag(read) OVER (ORDER BY time))
       END
      ) AS read
   FROM disk
      WHERE time BETWEEN '2021-04-10 21:00:00'::timestamp AND '2021-04-10 21:01:00'::timestamp
      AND tag_id = 1
      ORDER BY time;

Ключевые моменты:

  • Используется оконная функция lag() для получения предыдущего значения.
  • Положительные значения указывают на увеличение; отрицательные — на уменьшение по сравнению с предыдущей секундой.

Пример 2

Вычисляет ежедневное приращение продаж с использованием таблицы sales:

=# SELECT
      date,
      sales_amount,
      sales_amount - LAG(sales_amount) OVER (ORDER BY date) AS sales_increment
   FROM
      sales
   ORDER BY
      date;

Ключевые моменты:

  • Ежедневное приращение продаж вычисляется как разница между продажами текущего дня и предыдущего.

3.3.3 Вычисление скорости изменения

Пример 1

Разделите приращение на временной интервал, чтобы получить скорость изменения:

=# SELECT
    time,
    (
     CASE WHEN lag(read) OVER (ORDER BY time) IS NULL THEN NULL
     ELSE round(read - lag(read) OVER (ORDER BY time))
     END
    ) / extract(epoch from time - lag(time) OVER (ORDER BY time)) AS read_rate,
    extract(epoch from time - lag(time) OVER (ORDER BY time)) AS "time lag"
    FROM disk
    WHERE time BETWEEN '2021-04-10 21:00:00'::timestamp AND '2021-04-10 21:01:00'::timestamp
    AND tag_id = 1
    ORDER BY time;

Ключевые моменты:

  • Прирост / Временной интервал = Темп роста.


3.4 Обнаружение скачков метрик

Определение скачков значений метрик с возвратом временных меток, идентификаторов устройств и значений до и после скачка — полезно для аудита торговых операций с акциями или обнаружения резких изменений состояния:

=# SELECT stock_id, timestamp, value, lag_value, lead_value, lag_diff_value, lead_diff_value FROM (
    SELECT stock_id, timestamp, value,
        lag(value) OVER (PARTITION BY stock_id ORDER BY timestamp) AS lag_value,  -- Previous value in value column
        lead(value) OVER (PARTITION BY stock_id ORDER BY timestamp) AS lead_value,  -- Next value in value column
        value - lag(value) OVER (PARTITION BY stock_id ORDER BY timestamp) AS lag_diff_value,
        value - lead(value) OVER (PARTITION BY stock_id ORDER BY timestamp) AS lead_diff_value
    FROM t
       WHERE 1=1
       AND stock_id in ('1')
       AND timestamp >= '2021-12-01 00:00:00'
       AND timestamp < '2021-12-02 23:59:59'::timestamp
       AND value IS NOT NULL
   ) ht 
   WHERE abs(lag_diff_c1) > 3 OR abs(lead_diff_c1) > 3
   ORDER BY stock_id, timestamp;

Ключевые моменты:

  • Подзапрос использует оконные функции lag() и lead() для получения предыдущего и следующего значений в рамках каждого раздела stock_id, упорядоченного по timestamp.
    • lag_value: Предыдущее значение value.
    • lead_value: Следующее значение value.
    • lag_diff_value: Разница между текущим value и предыдущим value.
    • lead_diff_value: Разница между текущим value и следующим value.
  • Внешний запрос фильтрует строки, где абсолютная разница превышает 3.

Примечание!
Обнаружение скачков выявляет резкие изменения метрики, превышающие пороговое значение между соседними временными метками для заданного устройства в определённом временном окне.


3.5 Мониторинговая панель

3.5.1 Функции процентилей

Использование функций процентилей для вычисления метрик в реальном времени на панели мониторинга — например, использование 90-го процентиля для отражения тенденций акций:

=# SELECT stock_id,
          percentile_cont(0.9) WITHIN GROUP (ORDER BY value) AS value_9
    FROM t
    GROUP BY stock_id
    ORDER BY stock_id;

3.5.2 Функция кумулятивного распределения (CDF)

Использование CDF для вычисления кумулятивного распределения цен или метрик во времени. Это помогает оценить относительное положение и определить диапазоны цен и статистические свойства — идеально подходит для отображения статистики распределения на панелях в реальном времени:

=# SELECT stock_id, timestamp, price, 
          cume_dist() OVER(PARTITION BY stock_id, timestamp::date ORDER BY timestamp) AS cume_dist
   FROM t;

Ключевые моменты:

  • Функция cume_dist() вычисляет кумулятивное распределение: Значение кумулятивного распределения = (Количество строк в секции до текущей строки или на том же уровне) / Общее количество строк в секции


3.6 Специальный анализ (Ad Hoc Analysis)

В сценариях временных рядов операции преобразования (преобразование строк в столбцы или столбцов в строки) поддерживают гибкий специальный анализ.

3.6.1 Преобразование строк в столбцы

=# SELECT name,
          max(CASE WHEN attribute='age' THEN value ELSE 0 END) AS age,
          max(CASE WHEN attribute='height' THEN value ELSE 0 END) AS height,
          max(CASE WHEN attribute='weight' THEN value ELSE 0 END) AS weight,
          max(CASE WHEN attribute='shoe_size' THEN value ELSE 0 END) AS shoe_size
   FROM t
   GROUP BY name
   ORDER BY age DESC;

Пример структуры таблицы t:

+------+----------+-------+
| name | attribute | value |
+------+----------+-------+
| John | age      | 30    |
| John | height   | 175   |
| John | weight   | 70    |
| John | shoe_size| 9.5   |
| Mary | age      | 25    |
| Mary | height   | 160   |
| Mary | weight   | 55    |
| Mary | shoe_size| 8     |
+------+----------+-------+

Пример вывода при преобразовании строк в столбцы:

+------+-----+-------+--------+-----------+
| name | age | height| weight | shoe_size |
+------+-----+-------+--------+-----------+
| John | 30  | 175   | 70     | 9.5       |
| Mary | 25  | 160   | 55     | 8         |
+------+-----+-------+--------+-----------+

Ключевые моменты:

  • Это преобразование полезно при сравнении или представлении атрибутов на основе строк в виде столбцов. Условные выражения преобразуют атрибуты в столбцы, а агрегирующие функции (например, max) извлекают их значения.

3.6.2 Преобразование столбцов в строки

=# SELECT currenttimestamp, 
          deviceid, 
          devicetemplatecode,
          statisticstype,
          (b.rec).key AS key, 
          (b.rec).value AS value 
   FROM
     (SELECT currenttimestamp, 
             deviceid, 
             devicetemplatecode,
             statisticstype,
             jsonb_each_text(row_to_json(t.*)::jsonb-'currenttimestamp'-'deviceid'-'devicetemplatecode'-'statisticstype') AS rec  
     FROM t
   ) b
   WHERE (b.rec).value IS NOT NULL;

Пример структуры таблицы t:

+---------------------+----------+-------------------+----------------+--------+--------+--------+
| currenttimestamp    | deviceid | devicetemplatecode| statisticstype | key1   | key2   | key3   |
+---------------------+----------+-------------------+----------------+--------+--------+--------+
| 2023-11-13 08:30:45 | 1234567  | template1         | type1          | value1 | value2 | value3 |
+---------------------+----------+-------------------+----------------+--------+--------+--------+

Пример вывода при преобразовании столбцов в строки:

+---------------------+----------+-------------------+----------------+------+-------+
| currenttimestamp    | deviceid | devicetemplatecode| statisticstype | key  | value |
+---------------------+----------+-------------------+----------------+------+-------+
| 2023-11-13 08:30:45 | 123456   | template1         | type1          | key1 | value1|
| 2023-11-13 08:30:45 | 123456   | template1         | type1          | key2 | value2|
| 2023-11-13 08:30:45 | 123456   | template1         | type1          | key3 | value3|
| ...                 | ...      | ...               | ...            | ...  | ...   |
+---------------------+----------+-------------------+----------------+------+-------+

Ключевые моменты:

  • Используется row_to_json() для преобразования каждой строки в JSON-объект, затем jsonb_each_text() для развёртывания его в пары «ключ-значение», что обеспечивает преобразование столбцов в строки.

3.7 Присвоение меток в реальном времени

Отображение состояния машины, учётной записи или лица:

=# SELECT first_name,
          last_name,
          salary,
    CASE
     WHEN salary >= 80000 THEN 'senior'
     WHEN salary >= 60000 THEN 'intermediate'
    ELSE 'junior'
    END AS employee_level
   FROM employees;

Ключевые моменты:

  • Используется предложение CASE для вычисления составных метрик:
    • Если salary больше или равно 80000, столбец employee_level будет помечен как 'senior'.
    • Если salary больше или равно 60000 и меньше 80000, столбец employee_level будет помечен как 'intermediate'.
    • В противном случае, когда salary меньше 60000, столбец employee_level будет помечен как 'junior'.


3.8 Обнаружение пиков

Определение 10 наибольших пиков для указанной метрики заданной акции в заданном временном диапазоне вместе с их временными метками:

=# SELECT  stock_id, timestamp, value FROM (
    SELECT stock_id, timestamp, value,
           row_number() OVER (PARTITION BY stock_id ORDER BY value DESC) AS rn
    FROM t_test
      WHERE stock_id ='1'
      AND timestamp >= '2021-12-01 00:00:00'
      AND timestamp < '2021-12-03 00:00:00'
   ) AS derived_table
   WHERE rn <= 10;

Ключевые моменты:

  • Запрос использует подзапрос и оконную функцию row_number() для фильтрации и сортировки данных из таблицы t_test.
  • Возвращаются 10 строк с наибольшим значением value для каждого stock_id.


3.9 Запрос происхождения данных

Используется для трассировки происхождения данных и обхода метрик в полях типа JSON:

=# WITH RECURSIVE json_recursive AS (
    SELECT id,
           'person' AS key,
           data->'person' AS value
    FROM my_table
    UNION ALL
    SELECT j.id,
           k AS key,
           v AS value
    FROM json_recursive j,
         jsonb_each(j.value) AS kv(k, v)
    WHERE jsonb_typeof(j.value) = 'object'
   )
   SELECT id, key, value
   FROM json_recursive;

Ключевые моменты:

  • Запрос использует рекурсивный CTE (WITH RECURSIVE) и два подзапроса.
    • Якорный элемент (первый подзапрос) выбирает id, фиксированный ключ 'person' и значение JSON data->'person' из таблицы my_table в качестве начального набора результатов.
    • Рекурсивный элемент (второй подзапрос) итеративно соединяет CTE json_recursive с результатом функции jsonb_each(). На каждой итерации текущий JSON-объект (value) разбивается на пары «ключ-значение» с помощью jsonb_each(). Полученные ключ (k) и значение (v) становятся частью следующей итерации.
    • Условие WHERE jsonb_typeof(j.value) = 'object' гарантирует, что обрабатываются только записи, в которых значение JSON является объектом, обеспечивая условие завершения рекурсии.
  • Окончательный вывод выбирает id, key и value из CTE json_recursive для предоставления полной информации о происхождении данных.


3.10 Машинное обучение

Машинное обучение может использоваться для задач прогнозирования и классификации.

Например, использовать данные с 1 по 10 декабря для построения модели линейной регрессии и прогнозирования значения метрики 11 декабря:

=# SELECT '2021-12-11 00:00:01' AS timestamp,
    extract(epoch FROM '2021-12-11 00:00:01'::timestamp) * slope + intercept AS c1_value
   FROM (
    SELECT regr_slope(value, extract(epoch from timestamp)) AS slope,
         regr_intercept(value, extract(epoch from timestamp)) AS intercept
    FROM t_test
    WHERE vin = '1'
        AND timestamp >= '2021-12-01 00:00:00'
        AND timestamp < '2021-12-11 00:00:00'
   ) AS liner_regression;

Ключевые моменты:

  • Этот запрос использует два подзапроса и функции линейной регрессии regr_slope() и regr_intercept() для выполнения линейной регрессии над таблицей t_test.
  • Эти функции вычисляют наклон и смещение линейной модели, позволяя выполнять базовые задачи машинного обучения непосредственно в SQL.
    • Внутренний подзапрос фильтрует записи, где vin равен '1', а timestamp находится между '2021-12-01 00:00:00' (включительно) и '2021-12-11 00:00:00' (исключительно). Он вычисляет наклон и смещение между столбцом value и временем Unix-эпохи, извлечённым из timestamp.
    • Внешний запрос устанавливает timestamp равным '2021-12-11 00:00:01', затем вычисляет прогнозируемое значение c1_value по линейному уравнению: slope * epoch_time + intercept.