Сценарий слияния пакетов данных (UPSERT)

В этом документе описывается решение YMatrix для сценариев слияния пакетов данных: использование различных методов слияния пакетов данных для реализации функции UPSERT в разных системах хранения (HEAP, MARS2).

1 Что такое сценарий слияния пакетов данных?

Рассмотрим в качестве примера сценарий A — широкую таблицу подключенного транспортного средства.
В этом сценарии проект широкой таблицы (без учёта типов метрик) показан ниже:

Бортовая система сбора данных собирает информацию от одного и того же транспортного средства в один и тот же момент времени (одновременно сгенерированные данные) и отправляет её в базу данных YMatrix пакетами.

Широкая таблица предназначена для хранения, вычисления и анализа данных по транспортным средствам, в то время как бортовая система собирает и передаёт данные по датчикам. Таким образом, данные, передаваемые от транспортного средства в YMatrix, записываются в базу данных пакетами на уровне датчиков.
Это приводит к появлению сценария слияния пакетных данных в YMatrix.

В примере сценария A требуется объединить как минимум четыре пакета загруженных данных после их записи в базу данных. Если в пакете есть дублирующиеся данные, YMatrix обновляет запись, заменяя значения NULL на непустые и более новые значения — на более старые:

После слияния при запросе этих записей возвращается одна объединённая строка вместо нескольких.

На практике при вводе данных могут возникать такие явления, как неупорядоченное поступление, задержка приёма или нерегулярная частота. Однако только сценарии пакетного слияния требуют от базы данных выполнения консолидации данных; другие случаи здесь не рассматриваются.

2 Что такое UPSERT?

YMatrix считает, что UPSERT — это комбинация функций INSERT и UPDATE.

Когда новые данные должны быть сохранены в базе:

  • Если указанная строка уже существует в таблице, она обновляется.
  • Если указанной строки нет в таблице, вставляется новая строка.

Примечание!
Под «указанной строкой» выше понимается ключ сортировки, который уже существует в базе данных: для таблицы MARS2 — это ключ, указанный при создании индекса mars2_btree, или ключ, указанный при создании уникального индекса/ограничения в таблице HEAP, совпадающий со строкой новых данных, которые будут добавлены в базу данных.

3 Реализация функции UPSERT в различных системах хранения

В YMatrix UPSERT не является SQL-ключевым словом, а представляет собой операцию, объединяющую функции INSERT и UPDATE. Она может быть реализована следующими способами:

  • Указание uniquemode=true при создании таблицы MARS3.
  • Прямое указание uniquemode=true индекса таблицы MARS2.
  • Использование mxgate или INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE в таблице HEAP.

Использование UPSERT в YMatrix в зависимости от системы хранения применимо к различным бизнес-сценариям:

Система храненияИспользование UPSERTПрименимые сценарии
MARS3Указать uniquemode=trueРекомендованная лучшая практика при создании таблиц MARS3: за исключением случая записи в MARS3, данные в пакете максимально объединяются, чтобы уменьшить объем фактически сохраняемых данных. Оставшийся небольшой объем данных объединяется в режиме реального времени при запросе, чтобы сразу отображать результаты объединённого запроса. Этот метод эффективно повышает производительность записи и запроса данных; общих лучших практик для OLAP и OLTP сценариев нет, но если данные объединяются пакетами, рекомендуется включить эту опцию
MARS2Временно указать uniquemode=trueРекомендованная лучшая практика для индекса таблицы MARS2: за исключением случая записи в MARS3, данные в пакете максимально объединяются, чтобы уменьшить объем фактически сохраняемых данных. Оставшийся небольшой объем данных объединяется в режиме реального времени при запросе, чтобы сразу отображать результаты объединённого запроса. Этот метод эффективно повышает производительность записи и запроса данных
HEAPИспользовать mxgateРекомендуется для сценариев под таблицей HEAP. mxgate — это высокопроизводительный инструмент записи YMatrix с превосходной производительностью записи. Для этого метода необходимо создать уникальное ограничение/индекс в указанном столбце (обычно уникальный идентификатор устройства и временная метка)
Использование предложения ON CONFLICT SQL в таблице HEAPРекомендуется для операций UPSERT с малыми пакетами. Поскольку этот метод физически объединяет пакет данных во время записи, он несколько снижает производительность записи, поэтому при необходимости массовой записи его производительность хуже, чем у двух предыдущих. Этот метод также требует создания уникальных ограничений/индексов

3.1 Система хранения MARS3

Пример:

Сначала установите расширение matrixts.

=# CREATE EXTENSION matrixts;

Затем создайте тестовую таблицу MARS3.

=# CREATE TABLE v2x_mars3 (
  ts               timestamptz NOT NULL,
  tag_id           text NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) USING MARS3
WITH (uniquemode=true)
DISTRIBUTED BY (tag_id)
ORDER BY (tag_id,ts);

Примечание!
Если режим Unique Mode включён, первый элемент в предложении ORDER BY должен быть указан с ограничением NOT NULL при определении.

Вставьте четыре записи из одного пакета.

=# INSERT INTO v2x_mars3(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars3(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars3(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars3(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);

Выполните запрос данных и убедитесь, что данные устройства tag1 за пакет 2022-07-19 00:00:00 были объединены и отображаются как одна строка, причём новое значение перезаписывает старое.

=# SELECT * FROM v2x_mars3;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal | power 
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 2022-07-19 00:00:00+00 | tag1   |     -32.3 |       45 |  70.2 | t                | f                 |    52
(1 row)

3.2 Система хранения MARS2

Пример:

Сначала установите расширение matrixts.

=# CREATE EXTENSION matrixts;

Затем создайте тестовую таблицу MARS2.

=# CREATE TABLE v2x_mars2 (
  ts               timestamptz NOT NULL,
  tag_id           text NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) USING MARS2
DISTRIBUTED BY (tag_id);

Создайте индекс mars2_btree для uniquemode=true.

=# CREATE INDEX ON v2x_mars2 USING mars2_btree(tag_id,ts) WITH (uniquemode=true);

Вставьте четыре записи из одного пакета.

=# INSERT INTO v2x_mars2(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars2(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars2(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars2(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);

Выполните запрос данных и убедитесь, что данные устройства tag1 за пакет 2022-07-19 00:00:00 были объединены и отображаются как одна строка, причём новое значение перезаписывает старое.

=# SELECT * FROM v2x_mars2;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
 power
----------------------------+-----------------------------------------------------------------------------------------------------
-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 | t               | f                |
    52
(1 row)
Time: 4.172 ms

3.3 Система хранения HEAP

3.3.1 Реализация через mxgate

Если ваш бизнес-сценарий относится к временным данным, а используемая система хранения — HEAP, мы рекомендуем использовать высокоскоростной инструмент записи mxgate для реализации UPSERT.
Для реализации функции UPSERT необходимо создать уникальное ограничение/индекс в указанном поле.

В mxgate старое значение можно перезаписать новым, указав --upsert-key, чтобы реализовать операцию UPSERT; --deduplicate-key сохранит старое значение, отбросит новое и реализует функцию удаления дубликатов.

  1. Использование --upsert-key

Примечание!
Этот способ эквивалентен следующему SQL-выражению: INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE.

Пример: Создайте тестовую таблицу HEAP.

=# CREATE TABLE v2x_heap_upsert (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int,
  UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);

Тестовые данные 1:

$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||

После добавления нажмите клавишу esc, чтобы выйти из файла, и введите :wq, чтобы сохранить и выйти.

Тестовые данные 2:

$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66

После добавления нажмите клавишу esc, чтобы выйти из файла, и введите :wq, чтобы сохранить и выйти.

Загрузите данные 1, установив --upsert-key равным tag_id и ts.

$ cat upsert_demo1.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_upsert \
  --upsert-key tag_id \
  --upsert-key ts

Результат запроса 1.

=# SELECT * FROM v2x_heap_upsert;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
 power
----------------------------+-----------------------------------------------------------------------------------------------------
-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 |                 |                  |
(1 row)
Time: 18.049 ms

Загрузите данные 2, установив --upsert-key равным tag_id и ts.

$ cat upsert_demo2.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_upsert \
  --upsert-key tag_id \
  --upsert-key ts

В результате запроса 2 видно, что значение speed устройства tag1 из исходных данных 1 было заменено соответствующим новым значением из данных 2 и объединено в одну строку.

=# SELECT * FROM v2x_heap_upsert;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
 2022-07-19 00:00:00+08 | tag2   |           |          |       |                  |
|    66
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |    80 | f                | f
|    70
(2 rows)
Time: 19.652 ms
  1. Использование --deduplicate-key

Примечание!
Этот способ эквивалентен следующему SQL-выражению: INSERT INTO ... VALUES ... ON CONFLICT ... DO NOTHING.

Пример: Создайте тестовую таблицу.

=# CREATE TABLE v2x_heap_dedu (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int,
  UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);

Тестовые данные 1:

$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||

После добавления нажмите клавишу esc, чтобы выйти из файла, и введите :wq, чтобы сохранить и выйти.

Тестовые данные 2:

$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66

После добавления нажмите клавишу esc, чтобы выйти из файла, и введите :wq, чтобы сохранить и выйти.

Загрузите данные 1, установив --deduplicate-key равным tag_id и ts.

$ cat upsert_demo1.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_dedu \
  --deduplicate-key tag_id \
  --deduplicate-key ts

Результат запроса 1.

=# SELECT * FROM v2x_heap_dedu;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |  70.2 |                  |
|
(1 row)
Time: 18.010 ms

Загрузите данные 2, установив --deduplicate-key равным tag_id и ts.

$ cat upsert_demo2.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_dedu \
  --deduplicate-key tag_id \
  --deduplicate-key ts

В результате запроса 2 видно, что информация speed, left_turn_signal, right_turn_signal и power устройства tag1 из данных 2 была полностью отброшена, сохранены соответствующие старые значения (или значения NULL) из данных 1.

=# SELECT * FROM v2x_heap_dedu;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |  70.2 |                  |
|
 2022-07-19 00:00:00+08 | tag2   |           |          |       |                  |
|    66
(2 rows)
Time: 12.881 ms

3.3.2 Реализация через оператор INSERT

Примечание!
Этот способ предназначен только для использования с таблицами HEAP.

Пример: Создайте тестовую таблицу HEAP.

=# CREATE TABLE v2x_heap_insert (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) DISTRIBUTED BY (tag_id);

Создайте уникальный индекс и укажите ключевое значение как (tag_id,ts).

=# CREATE UNIQUE INDEX ON v2x_heap_insert(tag_id,ts);

Вставьте тестовые данные.

=# INSERT INTO v2x_heap_insert(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45) ON CONFLICT(tag_id,ts) DO UPDATE
SET  longitude = excluded.longitude,latitude = excluded.latitude;
=# INSERT INTO v2x_heap_insert(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2) ON CONFLICT(tag_id,ts) DO UPDATE
SET  speed = excluded.speed;
=# INSERT INTO v2x_heap_insert(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',50) ON CONFLICT(tag_id,ts) DO UPDATE
SET  power = excluded.power;

Просматривая тестовые данные, можно увидеть, что данные трёх устройств tag1, вставленные в пакетах 2022-07-19 00:00:00+00, были объединены в одну строку.

=# SELECT * FROM v2x_heap_insert;
           ts           | tag_id | longtitude | latitude | speed | left_turn_signal | right_turn_signal
 | power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-+-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 |                  |
 |    50
(1 row)
Time: 16.340 ms

Примечание!
Оператор INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE также описан в справочнике SQL — раздел INSERT.