В этом документе описывается решение YMatrix для сценариев пакетного объединения данных: использование различных методов пакетного объединения данных для реализации функции UPSERT в разных системах хранения (HEAP, MARS2).
Рассмотрим в качестве примера сценарий A — широкую таблицу подключенного транспортного средства.
Модель широкой таблицы, разработанная в этом сценарии (без учёта типов метрик), показана ниже:

Бортовая система сбора данных собирает информацию от одного и того же транспортного средства в один и тот же момент времени (одновременно сгенерированные данные) и отправляет её в базу данных YMatrix пачками.
Широкая таблица предназначена для хранения, вычислений и анализа данных по транспортным средствам, в то время как бортовая система собирает и передаёт данные по датчикам. Таким образом, данные, передаваемые от транспортного средства в YMatrix, записываются в базу данных пакетами на уровне датчиков.
Это приводит к возникновению сценария пакетного объединения данных в YMatrix.
В примере сценария A требуется объединить как минимум четыре пакета загруженных данных после их записи в базу. Если в пакете есть дублирующиеся данные, YMatrix обновляет запись, заменяя значения NULL на непустые и более новые значения — на более старые:

После объединения при запросе этих записей возвращается одна объединённая строка вместо нескольких.
На практике при вводе данных могут возникать такие явления, как неупорядоченное поступление, задержки при загрузке или нерегулярная частота. Однако только сценарии пакетного объединения требуют от базы данных выполнения консолидации данных; другие случаи здесь не рассматриваются.
YMatrix считает, что UPSERT — это комбинация функций INSERT и UPDATE.
Когда новые данные готовы к сохранению в базе:
Примечание!
Под «указанной строкой» выше понимается ключ сортировки, который уже существует в базе данных: для таблицы MARS2 — это ключ, указанный при создании индексаmars2_btree, или ключ, указанный при создании уникального индекса/ограничения в таблице HEAP, совпадающий со строкой, которая должна быть добавлена в базу данных.
В YMatrix UPSERT не является SQL-ключевым словом, а представляет собой операцию, объединяющую функции INSERT и UPDATE. Она может быть реализована следующими способами:
uniquemode=true при создании таблицы MARS3.uniquemode=true индекса таблицы MARS2.INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE в таблице HEAP.Использование UPSERT в YMatrix в зависимости от системы хранения применимо к различным бизнес-сценариям:
| Система хранения | Использование UPSERT | Применимые сценарии |
| MARS3 | Указать uniquemode=true | Рекомендованная практика при создании таблиц MARS3: за исключением случая записи в MARS3, данные в пакете максимально объединяются, чтобы уменьшить объем фактически сохраняемых данных. Оставшийся небольшой объем данных объединяется в режиме реального времени при выполнении запроса, и результат сразу отображается как объединенный. Этот метод эффективно повышает производительность записи и запросов; нет универсальных рекомендаций для OLAP и OLTP сценариев, но если данные объединяются пакетами, рекомендуется включить эту опцию |
| MARS2 | Временно указать uniquemode=true | Рекомендованная практика для индекса таблицы MARS2: за исключением случая записи в MARS3, данные в пакете максимально объединяются, чтобы уменьшить объем фактически сохраняемых данных. Оставшийся небольшой объем данных объединяется в режиме реального времени при выполнении запроса, и результат сразу отображается как объединенный. Этот метод эффективно повышает производительность записи и запросов |
| HEAP | Использовать mxgate | Рекомендуется для сценариев с таблицей HEAP. mxgate — это высокопроизводительный инструмент записи YMatrix с отличной производительностью записи. Для этого метода необходимо создать уникальное ограничение/индекс в указанном столбце (обычно уникальный идентификатор устройства и временная метка) |
| Использование предложения ON CONFLICT в таблице HEAP | Рекомендуется для операций UPSERT с малыми пакетами. Поскольку этот метод физически объединяет пакет данных во время записи, он несколько снижает производительность записи, поэтому при необходимости массовой записи его производительность уступает двум предыдущим. Также требует создания уникальных ограничений/индексов |
Пример:
Сначала установите расширение matrixts.
=# CREATE EXTENSION matrixts;
Затем создайте тестовую таблицу MARS3.
=# CREATE TABLE v2x_mars3 (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) USING MARS3
WITH (uniquemode=true)
DISTRIBUTED BY (tag_id)
ORDER BY (tag_id,ts);
Примечание!
Если режим Unique Mode включён, первый элемент в предложении ORDER BY должен быть объявлен с ограничением NOT NULL при определении.
Вставьте четыре записи из одного пакета.
=# INSERT INTO v2x_mars3(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars3(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars3(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars3(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);
Выполните запрос данных и убедитесь, что данные устройства tag1 за пакет 2022-07-19 00:00:00 были объединены и отображаются как одна строка, причём новое значение перезаписало старое.
=# SELECT * FROM v2x_mars3;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal | power
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2022-07-19 00:00:00+00 | tag1 | -32.3 | 45 | 70.2 | t | f | 52
(1 row)
Пример:
Сначала установите расширение matrixts.
=# CREATE EXTENSION matrixts;
Затем создайте тестовую таблицу MARS2.
=# CREATE TABLE v2x_mars2 (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) USING MARS2
DISTRIBUTED BY (tag_id);
Создайте индекс mars2_btree для uniquemode=true.
=# CREATE INDEX ON v2x_mars2 USING mars2_btree(tag_id,ts) WITH (uniquemode=true);
Вставьте четыре записи из одного пакета.
=# INSERT INTO v2x_mars2(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars2(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars2(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars2(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);
Выполните запрос данных и убедитесь, что данные устройства tag1 за пакет 2022-07-19 00:00:00 были объединены и отображаются как одна строка, причём новое значение перезаписало старое.
=# SELECT * FROM v2x_mars2;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
----------------------------+-----------------------------------------------------------------------------------------------------
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | t | f |
52
(1 row)
Time: 4.172 ms
Если ваш бизнес-сценарий относится к временным данным, а используемая система хранения — HEAP, мы рекомендуем использовать высокоскоростной инструмент записи mxgate для реализации UPSERT.
Для реализации функции UPSERT необходимо создать уникальное ограничение/индекс в указанном поле.
В mxgate можно перезаписать старое значение новым, указав --upsert-key, чтобы реализовать операцию UPSERT; --deduplicate-key сохранит старое значение, отбросит новое и реализует функцию удаления дубликатов.
--upsert-keyПримечание!
Этот способ эквивалентен следующему SQL-выражению:INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE.
Пример: Создайте тестовую таблицу HEAP.
=# CREATE TABLE v2x_heap_upsert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
Тестовые данные 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
После добавления нажмите клавишу esc, чтобы выйти из файла, затем введите :wq, чтобы сохранить и выйти.
Тестовые данные 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
После добавления нажмите клавишу esc, чтобы выйти из файла, затем введите :wq, чтобы сохранить и выйти.
Загрузите данные 1, установив --upsert-key равным tag_id и ts.
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
Результат запроса 1.
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
----------------------------+-----------------------------------------------------------------------------------------------------
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | | |
(1 row)
Time: 18.049 ms
Загрузите данные 2, установив --upsert-key равным tag_id и ts.
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
В результате запроса 2 видно, что значение speed устройства tag1 из первоначальных данных 1 было заменено соответствующим новым значением из данных 2 и объединено в одну строку.
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 80 | f | f
| 70
(2 rows)
Time: 19.652 ms
--deduplicate-keyПримечание!
Этот способ эквивалентен следующему SQL-выражению:INSERT INTO ... VALUES ... ON CONFLICT ... DO NOTHING.
Пример: Создайте тестовую таблицу.
=# CREATE TABLE v2x_heap_dedu (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
Тестовые данные 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
После добавления нажмите клавишу esc, чтобы выйти из файла, затем введите :wq, чтобы сохранить и выйти.
Тестовые данные 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
После добавления нажмите клавишу esc, чтобы выйти из файла, затем введите :wq, чтобы сохранить и выйти.
Загрузите данные 1, установив --deduplicate-key равным tag_id и ts.
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
Результат запроса 1.
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
(1 row)
Time: 18.010 ms
Загрузите данные 2, установив --deduplicate-key равным tag_id и ts.
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
В результате запроса 2 видно, что информация speed, left_turn_signal, right_turn_signal и power устройства tag1 из данных 2 была полностью отброшена, сохранены соответствующие старые значения (или значения NULL) из данных 1.
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
(2 rows)
Time: 12.881 ms
Примечание!
Этот способ предназначен только для использования с таблицами HEAP.
Пример: Создайте тестовую таблицу HEAP.
=# CREATE TABLE v2x_heap_insert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) DISTRIBUTED BY (tag_id);
Создайте уникальный индекс и укажите ключевое значение как (tag_id,ts).
=# CREATE UNIQUE INDEX ON v2x_heap_insert(tag_id,ts);
Вставьте тестовые данные.
=# INSERT INTO v2x_heap_insert(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45) ON CONFLICT(tag_id,ts) DO UPDATE
SET longitude = excluded.longitude,latitude = excluded.latitude;
=# INSERT INTO v2x_heap_insert(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2) ON CONFLICT(tag_id,ts) DO UPDATE
SET speed = excluded.speed;
=# INSERT INTO v2x_heap_insert(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',50) ON CONFLICT(tag_id,ts) DO UPDATE
SET power = excluded.power;
Просмотрев тестовые данные, можно увидеть, что три записи устройства tag1, вставленные в пакете 2022-07-19 00:00:00+00, были объединены в одну строку.
=# SELECT * FROM v2x_heap_insert;
ts | tag_id | longtitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
| 50
(1 row)
Time: 16.340 ms
Примечание!
ОператорINSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATEтакже описан в справочнике SQL — раздел INSERT.