В этом документе описывается решение YMatrix для сценариев пакетного объединения данных: использование различных методов пакетного объединения данных для реализации функции UPSERT в разных системах хранения (HEAP, MARS2).
Рассмотрим в качестве примера сценарий A — широкую таблицу подключенного транспортного средства.
В этом сценарии проект широкой таблицы (без учёта типов метрик) показан ниже:

Бортовая система сбора данных собирает информацию от одного и того же транспортного средства в один и тот же момент времени (данные генерируются одновременно) и отправляет её в базу данных YMatrix пакетами.
Широкая таблица предназначена для хранения, вычисления и анализа данных по транспортным средствам, в то время как бортовая система собирает и передаёт данные по датчикам. Таким образом, данные, передаваемые от транспортного средства в YMatrix, записываются в базу данных пакетами на уровне датчиков.
Это приводит к возникновению сценария пакетного объединения данных в YMatrix.
В примере сценария A требуется объединить как минимум четыре пакета загруженных данных после их записи в базу. Если в пакете существуют дублирующиеся данные, YMatrix обновляет запись, заменяя значения NULL на непустые и более новые значения — на устаревшие:

После объединения при запросе этих записей возвращается одна сводная строка вместо нескольких.
На практике при вводе данных могут возникать такие явления, как неупорядоченное поступление, задержка приёма или нерегулярная частота. Однако только сценарии пакетного объединения требуют от базы данных выполнения консолидации данных; другие случаи здесь не рассматриваются.
YMatrix считает, что UPSERT — это комбинация функций INSERT и UPDATE.
Когда новые данные должны быть сохранены в базе:
Примечание!
Под «указанной строкой» выше понимается ключ сортировки, который уже существует в базе данных: для таблицы MARS2 — это ключ, указанный при создании индексаmars2_btree, или ключ, указанный при создании уникального индекса/ограничения в таблице HEAP, совпадающий со строкой новых данных, которые будут добавлены в базу.
В YMatrix UPSERT не является ключевым словом SQL, а представляет собой операцию, объединяющую функции INSERT и UPDATE. Она может быть реализована следующими способами:
uniquemode=true индекса таблицы MARS2.INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE в таблице HEAP.Использование UPSERT в YMatrix в зависимости от системы хранения применимо к различным бизнес-сценариям:
| Система хранения | Использование UPSERT | Применимые сценарии |
| MARS2 | Обычно указывается `uniquemode=true` | рекомендованные практики для временных сценариев при использовании индексов таблицы MARS2. Данные в пакете не объединяются при записи, но автоматически объединяются в фоновом режиме MARS2 во время запроса, сразу отображая объединённые результаты. Этот метод значительно повышает производительность записи. В сочетании с высокой степенью сжатия таблиц MARS2 он становится лучшей практикой для массовой записи и хранения временных рядов |
| HEAP | Использование mxgate | Рекомендуется для временных сценариев в таблицах HEAP. mxgate — это высокопроизводительный инструмент записи YMatrix с отличной скоростью записи. Для этого метода необходимо создать уникальное ограничение/индекс в указанном столбце (обычно уникальный идентификатор устройства и временная метка) |
| Использование предложения SQL `ON CONFLICT` в таблице HEAP | Рекомендуется для небольших пакетных операций UPSERT. Поскольку этот метод физически объединяет пакет данных во время записи, он несколько снижает производительность записи, поэтому при необходимости массовой записи его производительность уступает двум предыдущим методам. Также требует создания уникальных ограничений/индексов |
Пример:
Сначала установите расширение matrixts.
=# CREATE EXTENSION matrixts;
Затем создайте тестовую таблицу MARS2.
=# CREATE TABLE v2x_mars2 (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) USING MARS2
DISTRIBUTED BY (tag_id);
Создайте индекс mars2_btree по uniquemode=true.
=# CREATE INDEX ON v2x_mars2 USING mars2_btree(tag_id,ts) WITH (uniquemode=true);
Добавьте четыре записи из одного пакета.
=# INSERT INTO v2x_mars2(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars2(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars2(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars2(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);
При запросе данных видно, что данные устройства tag1 за пакет 2022-07-19 00:00:00 были объединены и отображаются как одна строка, причём новое значение перезаписывает старое.
=# SELECT * FROM v2x_mars2;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
----------------------------+-----------------------------------------------------------------------------------------------------
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | t | f |
52
(1 row)
Time: 4.172 ms
Если ваш бизнес-сценарий является временным, а используемая система хранения — HEAP, мы рекомендуем использовать высокоскоростной инструмент записи mxgate для реализации UPSERT.
Для реализации функции UPSERT необходимо создать уникальное ограничение/индекс в указанном поле.
В mxgate можно перезаписать старое значение новым, указав --upsert-key, чтобы реализовать операцию UPSERT; --deduplicate-key сохранит старое значение, отбросит новое и реализует функцию удаления дубликатов.
--upsert-keyПримечание!
Этот способ эквивалентен следующему SQL-выражению:INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE.
Пример: Создайте тестовую таблицу HEAP.
=# CREATE TABLE v2x_heap_upsert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
Тестовые данные 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
После добавления нажмите клавишу esc, чтобы выйти из файла, затем введите :wq, чтобы сохранить и выйти.
Тестовые данные 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
После добавления нажмите клавишу esc, чтобы выйти из файла, затем введите :wq, чтобы сохранить и выйти.
Загрузите данные 1, установив --upsert-key в tag_id и ts.
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
Результат запроса 1.
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
----------------------------+-----------------------------------------------------------------------------------------------------
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | | |
(1 row)
Time: 18.049 ms
Загрузите данные 2, установив --upsert-key в tag_id и ts.
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
В результате запроса 2 видно, что значение speed устройства tag1 из исходных данных 1 было заменено соответствующим новым значением из данных 2 и объединено в одну строку.
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 80 | f | f
| 70
(2 rows)
Time: 19.652 ms
--deduplicate-keyПримечание!
Этот способ эквивалентен следующему SQL-выражению:INSERT INTO ... VALUES ... ON CONFLICT ... DO NOTHING.
Пример: Создайте тестовую таблицу.
=# CREATE TABLE v2x_heap_dedu (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
Тестовые данные 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
После добавления нажмите клавишу esc, чтобы выйти из файла, затем введите :wq, чтобы сохранить и выйти.
Тестовые данные 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
После добавления нажмите клавишу esc, чтобы выйти из файла, затем введите :wq, чтобы сохранить и выйти.
Загрузите данные 1, установив --deduplicate-key в tag_id и ts.
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
Результат запроса 1.
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
(1 row)
Time: 18.010 ms
Загрузите данные 2, установив --deduplicate-key в tag_id и ts.
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
В результате запроса 2 видно, что информация speed, left_turn_signal, right_turn_signal и power устройства tag1 из данных 2 была полностью отброшена, сохранены соответствующие старые значения (или значения NULL) из данных 1.
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
(2 rows)
Time: 12.881 ms
Примечание!
Этот способ применяется только для таблиц HEAP.
Пример: Создайте тестовую таблицу HEAP.
=# CREATE TABLE v2x_heap_insert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) DISTRIBUTED BY (tag_id);
Создайте уникальный индекс и укажите ключевые поля как (tag_id,ts).
=# CREATE UNIQUE INDEX ON v2x_heap_insert(tag_id,ts);
Добавьте тестовые данные.
=# INSERT INTO v2x_heap_insert(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45) ON CONFLICT(tag_id,ts) DO UPDATE
SET longitude = excluded.longitude,latitude = excluded.latitude;
=# INSERT INTO v2x_heap_insert(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2) ON CONFLICT(tag_id,ts) DO UPDATE
SET speed = excluded.speed;
=# INSERT INTO v2x_heap_insert(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',50) ON CONFLICT(tag_id,ts) DO UPDATE
SET power = excluded.power;
Просмотрев тестовые данные, можно увидеть, что данные трёх устройств tag1, вставленные в пакете 2022-07-19 00:00:00+00, были объединены в одну строку.
=# SELECT * FROM v2x_heap_insert;
ts | tag_id | longtitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
| 50
(1 row)
Time: 16.340 ms
Примечание!
ОператорINSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATEтакже описан в справочнике SQL — раздел INSERT.