Сценарии пакетного объединения данных (UPSERT)

В этом документе описываются решения YMatrix для сценариев пакетного объединения данных: реализация функциональности UPSERT различными методами в разных механизмах хранения (HEAP, MARS2).

1 Что такое сценарий пакетного объединения данных?

Рассмотрим в качестве примера сценарий A — широкую таблицу подключённого транспортного средства.
В этом сценарии проект широкой таблицы (без учёта типов метрик) показан ниже:

Бортовая система сбора данных собирает информацию от одного и того же транспортного средства в один и тот же момент времени (одновременно сгенерированные данные) и отправляет её в базу данных YMatrix пакетами.

Широкая таблица предназначена для хранения, вычислений и анализа данных по транспортным средствам, в то время как бортовая система собирает и загружает данные по датчикам. Таким образом, данные, передаваемые от транспортного средства в YMatrix, записываются в базу данных пакетами на уровне датчиков.
Это и формирует сценарий пакетного объединения данных в YMatrix.

В примере сценария A требуется объединить как минимум четыре пакета загруженных данных после их записи в базу. Если в пакете присутствуют дублирующиеся данные, YMatrix обновляет запись, заменяя значения NULL на непустые, а более новые значения — на более старые:

После объединения при запросе этих записей возвращается одна объединённая строка вместо нескольких.

На практике при вводе данных могут возникать такие явления, как неупорядоченное поступление, задержки при загрузке или нерегулярная частота. Однако здесь рассматриваются только сценарии пакетного объединения, требующие консолидации данных на уровне базы; другие случаи в данном документе не обсуждаются.

2 Что такое UPSERT?

YMatrix определяет UPSERT как комбинацию операций INSERT и UPDATE.

При вставке новых данных:

  • Если указанная строка уже существует в таблице, она обновляется.
  • Если указанной строки нет, добавляется новая строка.

Примечание!
Под «указанной строкой» понимается существующая строка в текущей базе данных, у которой ключ сортировки, определённый индексом mars2_btree в таблице MARS2, либо уникальный индекс/ограничение в таблице HEAP, совпадает с входящей строкой.

3 Реализация UPSERT в различных механизмах хранения

В YMatrix UPSERT не является ключевым словом SQL, а представляет собой операцию, сочетающую функции INSERT и UPDATE. Она может быть реализована следующими способами:

  • Установка параметра uniquemode=true при создании таблицы MARS3.
  • Включение параметра uniquemode=true в индексе таблицы MARS2.
  • Использование mxgate или предложения ON CONFLICT в таблице HEAP.

Применение UPSERT в различных механизмах хранения соответствует разным бизнес-сценариям:

Механизм храненияМетод UPSERTСценарий использования
MARS3Установка `uniquemode=true` при создании таблицы MARS3Рекомендуемый подход для временных рядов: данные внутри пакета максимально объединяются при записи, что снижает объём на физическом диске; оставшееся небольшое объединение выполняется во время запроса, возвращая полностью объединённые результаты. Значительно повышает производительность записи и запросов. Не является универсальным решением для OLAP или OLTP; включайте uniquemode, если требуется объединение пакетов.
MARS2Установка `uniquemode=true` в индексе таблицы MARS2Рекомендуемый подход для временных рядов: аналогично MARS3, частичное объединение происходит при записи для минимизации использования диска, окончательное объединение — при выполнении запроса, обеспечивая немедленное получение объединённых результатов. Эффективно повышает производительность записи и запросов.
HEAPИспользование mxgate в таблицах HEAPРекомендуется для рабочих нагрузок с временными рядами. mxgate — это высокопроизводительный инструмент ввода данных YMatrix, обеспечивающий высокую пропускную способность записи. Требуется определить уникальное ограничение или индекс по определённым столбцам (обычно ID устройства и временная метка).
Использование предложения `ON CONFLICT` в таблицах HEAPПодходит для небольших операций UPSERT. Поскольку этот метод выполняет физическое объединение во время ввода, он влияет на производительность записи. Менее эффективен при массовой записи по сравнению с двумя предыдущими вариантами. Также требует наличия уникального ограничения или индекса.

3.1 Механизм хранения MARS3

Пример:

Сначала установите расширение matrixts.

=# CREATE EXTENSION matrixts;

Создайте тестовую таблицу MARS3.

=# CREATE TABLE v2x_mars3 (
  ts               timestamptz NOT NULL,
  tag_id           text NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) USING MARS3
WITH (uniquemode=true)
DISTRIBUTED BY (tag_id)
ORDER BY (tag_id,ts);

Примечание!
При включении режима Unique Mode первый столбец в предложении ORDER BY должен иметь ограничение NOT NULL.

Вставьте четыре строки из одного пакета.

=# INSERT INTO v2x_mars3(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars3(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars3(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars3(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);

Выполните запрос данных. Данные устройства tag1 за пакет 2022-07-19 00:00:00 объединены в одну строку, новые значения заменили старые.

=# SELECT * FROM v2x_mars3;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal | power 
------------------------+--------+-----------+----------+-------+------------------+-------------------+-------
 2022-07-19 00:00:00+00 | tag1   |     -32.3 |       45 |  70.2 | t                | f                 |    52
(1 row)

3.2 Механизм хранения MARS2

Пример:

Установите расширение matrixts.

=# CREATE EXTENSION matrixts;

Создайте тестовую таблицу MARS2.

=# CREATE TABLE v2x_mars2 (
  ts               timestamptz NOT NULL,
  tag_id           text NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) USING MARS2
DISTRIBUTED BY (tag_id);

Создайте индекс mars2_btree с параметром uniquemode=true.

=# CREATE INDEX ON v2x_mars2 USING mars2_btree(tag_id,ts) WITH (uniquemode=true);

Вставьте четыре строки из одного пакета.

=# INSERT INTO v2x_mars2(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars2(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars2(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars2(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);

Выполните запрос данных. Данные устройства tag1 за пакет 2022-07-19 00:00:00 объединены в одну строку, новые значения заменили старые.

=# SELECT * FROM v2x_mars2;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
 power
------------------------+--------+------------+----------+-------+-----------------+------------------+
-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 | t               | f                |
    52
(1 row)
Time: 4.172 ms

3.3 Механизм хранения HEAP

3.3.1 Использование mxgate

Для рабочих нагрузок с временными рядами с использованием механизма хранения HEAP рекомендуется использовать mxgate для высокоскоростного ввода данных с поддержкой UPSERT.
Для активации UPSERT необходимо определить уникальное ограничение или индекс на соответствующих столбцах.

В mxgate:

  • Используйте --upsert-key, чтобы заменить старые значения новыми (UPSERT).
  • Используйте --deduplicate-key, чтобы сохранить старые значения и отбросить новые дубликаты (удаление дублей).
  1. Использование --upsert-key

Примечание!
Это эквивалентно SQL-выражению INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE, описанному далее.

Пример:
Создайте тестовую таблицу HEAP.

=# CREATE TABLE v2x_heap_upsert (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int,
  UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);

Тестовые данные 1:

$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||

После редактирования нажмите esc, чтобы выйти, затем введите :wq, чтобы сохранить и завершить.

Тестовые данные 2:

$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66

После редактирования нажмите esc, чтобы выйти, затем введите :wq, чтобы сохранить и завершить.

Загрузите данные 1, установив --upsert-key в значение tag_id и ts.

$ cat upsert_demo1.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_upsert \
  --upsert-key tag_id \
  --upsert-key ts

Результат запроса 1:

=# SELECT * FROM v2x_heap_upsert;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
 power
------------------------+--------+------------+----------+-------+-----------------+------------------+
-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 |                 |                  |
(1 row)
Time: 18.049 ms

Загрузите данные 2, установив --upsert-key в значение tag_id и ts.

$ cat upsert_demo2.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_upsert \
  --upsert-key tag_id \
  --upsert-key ts

Результат запроса 2: Поля speed, left_turn_signal, right_turn_signal и power устройства tag1 в пакете speed обновлены новыми значениями и объединены в одну строку.

=# SELECT * FROM v2x_heap_upsert;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
 2022-07-19 00:00:00+08 | tag2   |           |          |       |                  |
|    66
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |    80 | f                | f
|    70
(2 rows)
Time: 19.652 ms
  1. Использование --deduplicate-key

Примечание!
Это эквивалентно SQL-выражению INSERT INTO ... VALUES ... ON CONFLICT ... DO NOTHING, описанному далее.

Пример:
Создайте тестовую таблицу.

=# CREATE TABLE v2x_heap_dedu (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int,
  UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);

Тестовые данные 1:

$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||

После редактирования нажмите esc, чтобы выйти, затем введите :wq, чтобы сохранить и завершить.

Тестовые данные 2:

$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66

После редактирования нажмите esc, чтобы выйти, затем введите :wq, чтобы сохранить и завершить.

Загрузите данные 1, установив --deduplicate-key в значение tag_id и ts.

$ cat upsert_demo1.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_dedu \
  --deduplicate-key tag_id \
  --deduplicate-key ts

Результат запроса 1:

=# SELECT * FROM v2x_heap_dedu;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |  70.2 |                  |
|
(1 row)
Time: 18.010 ms

Загрузите данные 2, установив --deduplicate-key в значение tag_id и ts.

$ cat upsert_demo2.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_dedu \
  --deduplicate-key tag_id \
  --deduplicate-key ts

Результат запроса 2: Для устройства tag1 значения полей speed, left_turn_signal, right_turn_signal и power из данных 2 отброшены. Сохранены исходные значения (или null) из данных 1.

=# SELECT * FROM v2x_heap_dedu;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |  70.2 |                  |
|
 2022-07-19 00:00:00+08 | tag2   |           |          |       |                  |
|    66
(2 rows)
Time: 12.881 ms

3.3.2 Использование операторов INSERT

Примечание!
Этот метод применим только к таблицам HEAP.

Пример:
Создайте тестовую таблицу HEAP.

=# CREATE TABLE v2x_heap_insert (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) DISTRIBUTED BY (tag_id);

Создайте уникальный индекс по ключам (tag_id,ts).

=# CREATE UNIQUE INDEX ON v2x_heap_insert(tag_id,ts);

Вставьте тестовые данные.

=# INSERT INTO v2x_heap_insert(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45) ON CONFLICT(tag_id,ts) DO UPDATE
SET  longitude = excluded.longitude,latitude = excluded.latitude;
=# INSERT INTO v2x_heap_insert(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2) ON CONFLICT(tag_id,ts) DO UPDATE
SET  speed = excluded.speed;
=# INSERT INTO v2x_heap_insert(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',50) ON CONFLICT(tag_id,ts) DO UPDATE
SET  power = excluded.power;

Выполните запрос данных. Три вставленные записи устройства tag1 за пакет 2022-07-19 00:00:00+00 объединены в одну строку.

=# SELECT * FROM v2x_heap_insert;
           ts           | tag_id | longtitude | latitude | speed | left_turn_signal | right_turn_signal
 | power
------------------------+--------+------------+----------+-------+------------------+------------------
-+-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 |                  |
 |    50
(1 row)
Time: 16.340 ms

Примечание!
Предложение INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE также описано в справочнике SQL — INSERT.