This document introduces YMatrix's solution for data batch merging scenarios: using different data batch merging methods to implement the UPSERT function under different storage engines (HEAP, MARS2).
Consider a connected vehicle wide-table scenario A as an example.
In this scenario, the designed wide-table model (excluding metric types) is shown below:

The onboard data collection system collects data from the same vehicle at the same timestamp (simultaneously generated) and sends it to the YMatrix database in batches.
The wide table is designed to store, compute, and analyze data by vehicle, while the onboard system collects and uploads data by sensor. Therefore, data transmitted from the vehicle to YMatrix is written into the database in sensor-level batches.
This creates the batch data merge scenario in YMatrix.
In Example Scenario A, at least four batches of uploaded data require merging after being written to the database. If duplicate data exists in a batch, YMatrix updates the record by overwriting NULL values with non-NULL ones and newer values with older ones:

After merging, querying these records returns a single consolidated row instead of multiple rows.
In practice, phenomena such as out-of-order arrival, delayed ingestion, or irregular frequency may occur during data ingestion. However, only batch merge scenarios require the database to perform data consolidation; other cases are not discussed here.
YMatrix believes that UPSERT is a combination of INSERT and UPDATE functions.
When a new data is about to be stored in the library:
Notes!
The above "specified row" refers to the sort key that already exists in the database, the MARS2 table that creates the mars2_btree index specified, or the key that creates a unique index/constraint specified in the HEAP table that is the same row as the new row that is about to be entered into the database.
In YMatrix, UPSERT is not a SQL keyword, but an operation that integrates INSERT and UPDATE functions. It can be implemented in the following ways:
uniquemode=true when creating a MARS3 table.uniquemode=true of the MARS2 table index.INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE statement under the HEAP table.The usage of UPSERT in YMatrix under different storage engines is applicable to different business scenarios:
| Storage Engine | UPSERT Usage | Applicable Scenarios |
| MARS3 | Specify uniquemode=true | Schematic best practices when creating MARS3 tables: except when writing to MARS3, the data in a batch will be merged as much as possible to reduce the size of the actual drop-off data. The remaining small amount of data will be merged in real time during querying to directly display the merged query results. This method effectively improves the writing and query performance of data; there are no general best practices for OLAP and OLTP scenarios, and if data is merged in batches, it is recommended to enable it |
| MARS2 | Temporarily specify uniquemode=true | Schematic best practices for MARS2 table index: except when writing to MARS3, the data in a batch will be merged as much as possible to reduce the size of the actual drop-off data. The remaining small amount of data will be merged in real time during query to directly display the merged query results. This method effectively improves the writing and query performance of data |
| HEAP | Use mxgate | Schematic scenarios under the HEAP table are recommended. mxgate is a high-performance write tool of YMatrix with superior write performance. This method requires creating a unique constraint/index in the specified column (usually device unique identifier and timestamp) |
| Using the ON CONFLICT SQL clause under the HEAP table | Small batch UPSERT operation is recommended. Since this method physically merges a batch of data during writing, it will affect a certain degree of writing performance, so if large-scale data writing is required, the performance is not as good as the above two. This method also requires creating unique constraints/indexes |
Here is an example:
First, install the matrixts extension.
=# CREATE EXTENSION matrixts;
Then, create the MARS3 test table.
=# CREATE TABLE v2x_mars3 (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) USING MARS3
WITH (uniquemode=true)
DISTRIBUTED BY (tag_id)
ORDER BY (tag_id,ts);
Notes!
If Unique Mode is enabled, the first field of the ORDER BY clause needs to be added to the NOT NULL constraint when defining.
Insert four pieces of data from the same batch.
=# INSERT INTO v2x_mars3(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars3(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars3(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars3(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);
Query the data and it can be seen that the data of the tag1 device on 2022-07-19 00:00:00 batch has been merged and displayed as a row, and the new value overwrites the old value.
=# SELECT * FROM v2x_mars3;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal | power
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
2022-07-19 00:00:00+00 | tag1 | -32.3 | 45 | 70.2 | t | f | 52
(1 row)
Here is an example:
First, install the matrixts extension.
=# CREATE EXTENSION matrixts;
Then, create the MARS2 test table.
=# CREATE TABLE v2x_mars2 (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) USING MARS2
DISTRIBUTED BY (tag_id);
Creates a mars2_btree index of uniquemode=true.
=# CREATE INDEX ON v2x_mars2 USING mars2_btree(tag_id,ts) WITH (uniquemode=true);
Insert four pieces of data from the same batch.
=# INSERT INTO v2x_mars2(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars2(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars2(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars2(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);
Query the data and it can be seen that the data of the tag1 device on 2022-07-19 00:00:00 batch has been merged and displayed as a row, and the new value overwrites the old value.
=# SELECT * FROM v2x_mars2;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
----------------------------+-----------------------------------------------------------------------------------------------------
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | t | f |
52
(1 row)
Time: 4.172 ms
If your business scenario is a timing scenario and the storage engine used is HEAP, then we recommend UPSERT through the mxgate high-speed writing tool.
To implement the UPSERT function, you need to create a unique constraint/index in the specified field.
In mxgate, the old value can be overwritten with the new value by specifying --upsert-key to implement the UPSERT operation; --deduplicate-key will maintain the old value, discard the new value, and realize the deduplication function.
--upsert-keyNotes!
This usage is equivalent to the following SQL statementINSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE.
Examples are as follows: Create a HEAP test table.
=# CREATE TABLE v2x_heap_upsert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
Data for testing 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
After adding, click the esc key to exit the file and enter :wq to save and exit.
Data for testing 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
After adding, click the esc key to exit the file and enter :wq to save and exit.
Load data 1, set --upsert-key to tag_id and ts.
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
Query result 1.
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
----------------------------+-----------------------------------------------------------------------------------------------------
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | | |
(1 row)
Time: 18.049 ms
Load data 2, set --upsert-key to tag_id and ts.
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
In query result 2, you can see that the speed of tag1 in the original data 1 was replaced by the corresponding new value in data 2 and merged into one row.
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 80 | f | f
| 70
(2 rows)
Time: 19.652 ms
--deduplicate-keyNotes!
This usage is equivalent to the following SQL statementINSERT INTO ... VALUES ... ON CONFLICT ... DO NOTHING.
Examples are as follows: Create a test table.
=# CREATE TABLE v2x_heap_dedu (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
Data for testing 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
After adding, click the esc key to exit the file and enter :wq to save and exit.
Data for testing 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
After adding, click the esc key to exit the file and enter :wq to save and exit.
Load data 1, set --deduplicate-key to tag_id and ts.
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
Query result 1.
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
(1 row)
Time: 18.010 ms
Load data 2, set --deduplicate-key to tag_id and ts.
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
In query result 2, you can see that the speed, left_turn_signal, right_turn_signal, and power information of tag1 in data 2 are all discarded, retaining the relevant old value (or null value) in data 1.
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
(2 rows)
Time: 12.881 ms
Notes!
This usage is for use with HEAP tables only.
Examples are as follows: Create a HEAP test table.
=# CREATE TABLE v2x_heap_insert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) DISTRIBUTED BY (tag_id);
Create a unique index and specify the key value to (tag_id,ts).
=# CREATE UNIQUE INDEX ON v2x_heap_insert(tag_id,ts);
Insert test data.
=# INSERT INTO v2x_heap_insert(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45) ON CONFLICT(tag_id,ts) DO UPDATE
SET longitude = excluded.longitude,latitude = excluded.latitude;
=# INSERT INTO v2x_heap_insert(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2) ON CONFLICT(tag_id,ts) DO UPDATE
SET speed = excluded.speed;
=# INSERT INTO v2x_heap_insert(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',50) ON CONFLICT(tag_id,ts) DO UPDATE
SET power = excluded.power;
By viewing the test data, you can see that the data of the three tag1 devices inserted at 2022-07-19 00:00:00+00 batches have been merged into one row.
=# SELECT * FROM v2x_heap_insert;
ts | tag_id | longtitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
| 50
(1 row)
Time: 16.340 ms
Notes!
TheINSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATEstatement is also introduced in the SQL Reference - INSERT section.