Data batch merge scenario (UPSERT)

This document introduces YMatrix's solution for data batch merging scenarios: using different data batch merging methods to implement the UPSERT function under different storage engines (HEAP, MARS2).

1 What is the data batch merge scenario?

First, take a wide table scenario A of the Internet of Vehicles as an example. In this scenario, the wide table model we designed (excluding indicator types) is as follows: ![](https://img.ymatrix.cn/ymatrix_home/Wide table model (screenshot)_1681724350.png)

At this time, the vehicle-side data acquisition system is collecting data from the same time stamp (generated at the same time)** and *** and sending it to the YMatrix database in batches: ![](https://img.ymatrix.cn/ymatrix_home/data batch merge scenario (screenshot)_1681724392.png)

It can be seen that the wide table model we constructed wants to use car as the storage device unit to perform data storage, calculation and analysis, while the data acquisition system on the vehicle side uses sensor as the acquisition device unit to perform data acquisition and upload. Therefore, the data transmitted from the vehicle end to YMatrix will be written to the database in batches in units of sensors.
Under this condition, a data batch merge scenario in YMatrix is ​​formed.

In Example Scenario A, there are (at least) four batches of uploaded data that need to be merged after the database is performed. If a batch of data is uploaded repeatedly, YMatrix will update this batch of data with a non-NULL value overwrite NULL value and the new value overwrites the old value: ![](https://img.ymatrix.cn/ymatrix_home/data batch scene logic diagram (screenshot)_1681805538.png)

After the merge process, we query the data again and see that the row has been merged instead of several rows of data.

In actual situation, different frequency, out-of-order, delay and other phenomena may occur during the data writing stage, but only batch scenarios require the database to merge the data, so the other types will not be described again.

2 What is UPSERT?

YMatrix believes that UPSERT is a combination of INSERT and UPDATE functions.

When a new data is about to be stored in the library:

  • If the specified row already exists in the table, update it.
  • If the specified row does not exist in the table, a new row is inserted.

Notes!
The above "specified row" refers to the sort key that already exists in the database, the MARS2 table that creates the mars2_btree index specified, or the key that creates a unique index/constraint specified in the HEAP table that is the same row as the new row that is about to be entered into the database.

3 Implementation of UPSERT function under different storage engines

In YMatrix, UPSERT is not a SQL keyword, but an operation that integrates INSERT and UPDATE functions. It can be implemented in the following ways:

  • Specify uniquemode=true when creating a MARS3 table.
  • Directly specify uniquemode=true of the MARS2 table index.
  • Use the mxgate or INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE statement under the HEAP table.

The usage of UPSERT in YMatrix under different storage engines is applicable to different business scenarios:

Storage EngineUPSERT UsageApplicable Scenarios
MARS3Specify uniquemode=trueSchematic best practices when creating MARS3 tables: except when writing to MARS3, the data in a batch will be merged as much as possible to reduce the size of the actual drop-off data. The remaining small amount of data will be merged in real time during querying to directly display the merged query results. This method effectively improves the writing and query performance of data; there are no general best practices for OLAP and OLTP scenarios, and if data is merged in batches, it is recommended to enable it
MARS2Temporarily specify uniquemode=trueSchematic best practices for MARS2 table index: except when writing to MARS3, the data in a batch will be merged as much as possible to reduce the size of the actual drop-off data. The remaining small amount of data will be merged in real time during query to directly display the merged query results. This method effectively improves the writing and query performance of data
HEAPUse mxgateSchematic scenarios under the HEAP table are recommended. mxgate is a high-performance write tool of YMatrix with superior write performance. This method requires creating a unique constraint/index in the specified column (usually device unique identifier and timestamp)
Using the ON CONFLICT SQL clause under the HEAP tableSmall batch UPSERT operation is recommended. Since this method physically merges a batch of data during writing, it will affect a certain degree of writing performance, so if large-scale data writing is required, the performance is not as good as the above two. This method also requires creating unique constraints/indexes

3.1 MARS3 Storage Engine

Here is an example:

First, install the matrixts extension.

=# CREATE EXTENSION matrixts;

Then, create the MARS3 test table.

=# CREATE TABLE v2x_mars3 (
  ts               timestamptz NOT NULL,
  tag_id           text NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) USING MARS3
WITH (uniquemode=true)
DISTRIBUTED BY (tag_id)
ORDER BY (tag_id,ts);

Notes!
If Unique Mode is enabled, the first field of the ORDER BY clause needs to be added to the NOT NULL constraint when defining.

Insert four pieces of data from the same batch.

=# INSERT INTO v2x_mars3(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars3(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars3(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars3(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);

Query the data and it can be seen that the data of the tag1 device on 2022-07-19 00:00:00 batch has been merged and displayed as a row, and the new value overwrites the old value.

=# SELECT * FROM v2x_mars3;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal | power 
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 2022-07-19 00:00:00+00 | tag1   |     -32.3 |       45 |  70.2 | t                | f                 |    52
(1 row)

3.2 MARS2 Storage Engine

Here is an example:

First, install the matrixts extension.

=# CREATE EXTENSION matrixts;

Then, create the MARS2 test table.

=# CREATE TABLE v2x_mars2 (
  ts               timestamptz NOT NULL,
  tag_id           text NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) USING MARS2
DISTRIBUTED BY (tag_id);

Creates a mars2_btree index of uniquemode=true.

=# CREATE INDEX ON v2x_mars2 USING mars2_btree(tag_id,ts) WITH (uniquemode=true);

Insert four pieces of data from the same batch.

=# INSERT INTO v2x_mars2(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars2(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars2(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars2(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);

Query the data and it can be seen that the data of the tag1 device on 2022-07-19 00:00:00 batch has been merged and displayed as a row, and the new value overwrites the old value.

=# SELECT * FROM v2x_mars2;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
 power
----------------------------+-----------------------------------------------------------------------------------------------------
-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 | t               | f                |
    52
(1 row)
Time: 4.172 ms

3.3 HEAP Storage Engine

3.3.1 Implemented by mxgate

If your business scenario is a timing scenario and the storage engine used is HEAP, then we recommend UPSERT through the mxgate high-speed writing tool.
To implement the UPSERT function, you need to create a unique constraint/index in the specified field.

In mxgate, the old value can be overwritten with the new value by specifying --upsert-key to implement the UPSERT operation; --deduplicate-key will maintain the old value, discard the new value, and realize the deduplication function.

  1. Usage of --upsert-key

Notes!
This usage is equivalent to the following SQL statement INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE.

Examples are as follows: Create a HEAP test table.

=# CREATE TABLE v2x_heap_upsert (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int,
  UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);

Data for testing 1:

$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||

After adding, click the esc key to exit the file and enter :wq to save and exit.

Data for testing 2:

$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66

After adding, click the esc key to exit the file and enter :wq to save and exit.

Load data 1, set --upsert-key to tag_id and ts.

$ cat upsert_demo1.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_upsert \
  --upsert-key tag_id \
  --upsert-key ts

Query result 1.

=# SELECT * FROM v2x_heap_upsert;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
 power
----------------------------+-----------------------------------------------------------------------------------------------------
-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 |                 |                  |
(1 row)
Time: 18.049 ms

Load data 2, set --upsert-key to tag_id and ts.

$ cat upsert_demo2.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_upsert \
  --upsert-key tag_id \
  --upsert-key ts

In query result 2, you can see that the speed of tag1 in the original data 1 was replaced by the corresponding new value in data 2 and merged into one row.

=# SELECT * FROM v2x_heap_upsert;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
 2022-07-19 00:00:00+08 | tag2   |           |          |       |                  |
|    66
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |    80 | f                | f
|    70
(2 rows)
Time: 19.652 ms
  1. Usage of --deduplicate-key

Notes!
This usage is equivalent to the following SQL statement INSERT INTO ... VALUES ... ON CONFLICT ... DO NOTHING.

Examples are as follows: Create a test table.

=# CREATE TABLE v2x_heap_dedu (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int,
  UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);

Data for testing 1:

$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||

After adding, click the esc key to exit the file and enter :wq to save and exit.

Data for testing 2:

$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66

After adding, click the esc key to exit the file and enter :wq to save and exit.

Load data 1, set --deduplicate-key to tag_id and ts.

$ cat upsert_demo1.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_dedu \
  --deduplicate-key tag_id \
  --deduplicate-key ts

Query result 1.

=# SELECT * FROM v2x_heap_dedu;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |  70.2 |                  |
|
(1 row)
Time: 18.010 ms

Load data 2, set --deduplicate-key to tag_id and ts.

$ cat upsert_demo2.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_dedu \
  --deduplicate-key tag_id \
  --deduplicate-key ts

In query result 2, you can see that the speed, left_turn_signal, right_turn_signal, and power information of tag1 in data 2 are all discarded, retaining the relevant old value (or null value) in data 1.

=# SELECT * FROM v2x_heap_dedu;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |  70.2 |                  |
|
 2022-07-19 00:00:00+08 | tag2   |           |          |       |                  |
|    66
(2 rows)
Time: 12.881 ms

3.3.2 Implemented through INSERT statement

Notes!
This usage is for use with HEAP tables only.

Examples are as follows: Create a HEAP test table.

=# CREATE TABLE v2x_heap_insert (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) DISTRIBUTED BY (tag_id);

Create a unique index and specify the key value to (tag_id,ts).

=# CREATE UNIQUE INDEX ON v2x_heap_insert(tag_id,ts);

Insert test data.

=# INSERT INTO v2x_heap_insert(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45) ON CONFLICT(tag_id,ts) DO UPDATE
SET  longitude = excluded.longitude,latitude = excluded.latitude;
=# INSERT INTO v2x_heap_insert(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2) ON CONFLICT(tag_id,ts) DO UPDATE
SET  speed = excluded.speed;
=# INSERT INTO v2x_heap_insert(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',50) ON CONFLICT(tag_id,ts) DO UPDATE
SET  power = excluded.power;

By viewing the test data, you can see that the data of the three tag1 devices inserted at 2022-07-19 00:00:00+00 batches have been merged into one row.

=# SELECT * FROM v2x_heap_insert;
           ts           | tag_id | longtitude | latitude | speed | left_turn_signal | right_turn_signal
 | power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-+-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 |                  |
 |    50
(1 row)
Time: 16.340 ms

Notes!
The INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE statement is also introduced in the SQL Reference - INSERT section.