В этом документе описываются решения YMatrix для сценариев пакетного объединения данных: реализация функциональности UPSERT различными методами в разных механизмах хранения (HEAP, MARS2).
Рассмотрим в качестве примера сценарий A — широкую таблицу подключённого транспортного средства.
В этом сценарии проект широкой таблицы (без учёта типов метрик) показан ниже:

Бортовая система сбора данных собирает информацию от одного и того же транспортного средства в один и тот же момент времени (одновременно сгенерированные данные) и отправляет её в базу данных YMatrix пакетами.
Широкая таблица предназначена для хранения, вычислений и анализа данных по транспортным средствам, в то время как бортовая система собирает и загружает данные по датчикам. Таким образом, данные, передаваемые от транспортного средства в YMatrix, записываются в базу данных пакетами на уровне датчиков.
Это и формирует сценарий пакетного объединения данных в YMatrix.
В примере сценария A требуется объединить как минимум четыре пакета загруженных данных после их записи в базу. Если в пакете присутствуют дублирующиеся данные, YMatrix обновляет запись, заменяя значения NULL на непустые, а более новые значения — на более старые:

После объединения при запросе этих записей возвращается одна объединённая строка вместо нескольких.
На практике при вводе данных могут возникать такие явления, как неупорядоченное поступление, задержки при загрузке или нерегулярная частота. Однако здесь рассматриваются только сценарии пакетного объединения, требующие консолидации данных на уровне базы; другие случаи в данном документе не обсуждаются.
YMatrix определяет UPSERT как комбинацию операций INSERT и UPDATE.
При вставке новых данных:
Примечание!
Под «указанной строкой» понимается существующая строка в текущей базе данных, у которой ключ сортировки, определённый индексом mars2_btree в таблице MARS2, либо уникальный индекс/ограничение в таблице HEAP, совпадает с входящей строкой.
В YMatrix UPSERT не является ключевым словом SQL, а представляет собой операцию, сочетающую функции INSERT и UPDATE. Она может быть реализована следующими способами:
uniquemode=true при создании таблицы MARS3.uniquemode=true в индексе таблицы MARS2.ON CONFLICT в таблице HEAP.Применение UPSERT в различных механизмах хранения соответствует разным бизнес-сценариям:
| Механизм хранения | Метод UPSERT | Сценарий использования |
| MARS3 | Установка `uniquemode=true` при создании таблицы MARS3 | Рекомендуемый подход для временных рядов: данные внутри пакета максимально объединяются при записи, что снижает объём на физическом диске; оставшееся небольшое объединение выполняется во время запроса, возвращая полностью объединённые результаты. Значительно повышает производительность записи и запросов. Не является универсальным решением для OLAP или OLTP; включайте uniquemode, если требуется объединение пакетов. |
| MARS2 | Установка `uniquemode=true` в индексе таблицы MARS2 | Рекомендуемый подход для временных рядов: аналогично MARS3, частичное объединение происходит при записи для минимизации использования диска, окончательное объединение — при выполнении запроса, обеспечивая немедленное получение объединённых результатов. Эффективно повышает производительность записи и запросов. |
| HEAP | Использование mxgate в таблицах HEAP | Рекомендуется для рабочих нагрузок с временными рядами. mxgate — это высокопроизводительный инструмент ввода данных YMatrix, обеспечивающий высокую пропускную способность записи. Требуется определить уникальное ограничение или индекс по определённым столбцам (обычно ID устройства и временная метка). |
| Использование предложения `ON CONFLICT` в таблицах HEAP | Подходит для небольших операций UPSERT. Поскольку этот метод выполняет физическое объединение во время ввода, он влияет на производительность записи. Менее эффективен при массовой записи по сравнению с двумя предыдущими вариантами. Также требует наличия уникального ограничения или индекса. |
Пример:
Сначала установите расширение matrixts.
=# CREATE EXTENSION matrixts;
Создайте тестовую таблицу MARS3.
=# CREATE TABLE v2x_mars3 (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) USING MARS3
WITH (uniquemode=true)
DISTRIBUTED BY (tag_id)
ORDER BY (tag_id,ts);
Примечание!
При включении режима Unique Mode первый столбец в предложении ORDER BY должен иметь ограничение NOT NULL.
Вставьте четыре строки из одного пакета.
=# INSERT INTO v2x_mars3(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars3(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars3(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars3(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);
Выполните запрос данных. Данные устройства tag1 за пакет 2022-07-19 00:00:00 объединены в одну строку, новые значения заменили старые.
=# SELECT * FROM v2x_mars3;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal | power
------------------------+--------+-----------+----------+-------+------------------+-------------------+-------
2022-07-19 00:00:00+00 | tag1 | -32.3 | 45 | 70.2 | t | f | 52
(1 row)
Пример:
Установите расширение matrixts.
=# CREATE EXTENSION matrixts;
Создайте тестовую таблицу MARS2.
=# CREATE TABLE v2x_mars2 (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) USING MARS2
DISTRIBUTED BY (tag_id);
Создайте индекс mars2_btree с параметром uniquemode=true.
=# CREATE INDEX ON v2x_mars2 USING mars2_btree(tag_id,ts) WITH (uniquemode=true);
Вставьте четыре строки из одного пакета.
=# INSERT INTO v2x_mars2(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars2(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars2(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars2(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);
Выполните запрос данных. Данные устройства tag1 за пакет 2022-07-19 00:00:00 объединены в одну строку, новые значения заменили старые.
=# SELECT * FROM v2x_mars2;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
------------------------+--------+------------+----------+-------+-----------------+------------------+
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | t | f |
52
(1 row)
Time: 4.172 ms
Для рабочих нагрузок с временными рядами с использованием механизма хранения HEAP рекомендуется использовать mxgate для высокоскоростного ввода данных с поддержкой UPSERT.
Для активации UPSERT необходимо определить уникальное ограничение или индекс на соответствующих столбцах.
В mxgate:
--upsert-key, чтобы заменить старые значения новыми (UPSERT).--deduplicate-key, чтобы сохранить старые значения и отбросить новые дубликаты (удаление дублей).--upsert-keyПримечание!
Это эквивалентно SQL-выражениюINSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE, описанному далее.
Пример:
Создайте тестовую таблицу HEAP.
=# CREATE TABLE v2x_heap_upsert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
Тестовые данные 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
После редактирования нажмите esc, чтобы выйти, затем введите :wq, чтобы сохранить и завершить.
Тестовые данные 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
После редактирования нажмите esc, чтобы выйти, затем введите :wq, чтобы сохранить и завершить.
Загрузите данные 1, установив --upsert-key в значение tag_id и ts.
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
Результат запроса 1:
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
------------------------+--------+------------+----------+-------+-----------------+------------------+
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | | |
(1 row)
Time: 18.049 ms
Загрузите данные 2, установив --upsert-key в значение tag_id и ts.
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
Результат запроса 2: Поля speed, left_turn_signal, right_turn_signal и power устройства tag1 в пакете speed обновлены новыми значениями и объединены в одну строку.
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 80 | f | f
| 70
(2 rows)
Time: 19.652 ms
--deduplicate-keyПримечание!
Это эквивалентно SQL-выражениюINSERT INTO ... VALUES ... ON CONFLICT ... DO NOTHING, описанному далее.
Пример:
Создайте тестовую таблицу.
=# CREATE TABLE v2x_heap_dedu (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
Тестовые данные 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
После редактирования нажмите esc, чтобы выйти, затем введите :wq, чтобы сохранить и завершить.
Тестовые данные 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
После редактирования нажмите esc, чтобы выйти, затем введите :wq, чтобы сохранить и завершить.
Загрузите данные 1, установив --deduplicate-key в значение tag_id и ts.
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
Результат запроса 1:
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
(1 row)
Time: 18.010 ms
Загрузите данные 2, установив --deduplicate-key в значение tag_id и ts.
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
Результат запроса 2: Для устройства tag1 значения полей speed, left_turn_signal, right_turn_signal и power из данных 2 отброшены. Сохранены исходные значения (или null) из данных 1.
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
(2 rows)
Time: 12.881 ms
Примечание!
Этот метод применим только к таблицам HEAP.
Пример:
Создайте тестовую таблицу HEAP.
=# CREATE TABLE v2x_heap_insert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) DISTRIBUTED BY (tag_id);
Создайте уникальный индекс по ключам (tag_id,ts).
=# CREATE UNIQUE INDEX ON v2x_heap_insert(tag_id,ts);
Вставьте тестовые данные.
=# INSERT INTO v2x_heap_insert(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45) ON CONFLICT(tag_id,ts) DO UPDATE
SET longitude = excluded.longitude,latitude = excluded.latitude;
=# INSERT INTO v2x_heap_insert(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2) ON CONFLICT(tag_id,ts) DO UPDATE
SET speed = excluded.speed;
=# INSERT INTO v2x_heap_insert(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',50) ON CONFLICT(tag_id,ts) DO UPDATE
SET power = excluded.power;
Выполните запрос данных. Три вставленные записи устройства tag1 за пакет 2022-07-19 00:00:00+00 объединены в одну строку.
=# SELECT * FROM v2x_heap_insert;
ts | tag_id | longtitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+------------+----------+-------+------------------+------------------
-+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
| 50
(1 row)
Time: 16.340 ms
Примечание!
ПредложениеINSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATEтакже описано в справочнике SQL — INSERT.