This document introduces YMatrix's solution for data batch merging scenarios: using different data batch merging methods to implement the UPSERT function under different storage engines (HEAP, MARS2).
First, take a wide table scenario A of the Internet of Vehicles as an example. In this scenario, the wide table model we designed (excluding indicator types) is as follows: _1681724350.png)
At this time, the vehicle-side data acquisition system is collecting data from the same time stamp (generated at the same time)** and *** and sending it to the YMatrix database in batches: _1681724392.png)
It can be seen that the wide table model we constructed wants to use car as the storage device unit to perform data storage, calculation and analysis, while the data acquisition system on the vehicle side uses sensor as the acquisition device unit to perform data acquisition and upload. Therefore, the data transmitted from the vehicle end to YMatrix will be written to the database in batches in units of sensors.
Under this condition, a data batch merge scenario in YMatrix is formed.
In Example Scenario A, there are (at least) four batches of uploaded data that need to be merged after the database is performed. If a batch of data is uploaded repeatedly, YMatrix will update this batch of data with a non-NULL value overwrite NULL value and the new value overwrites the old value: _1681805538.png)
After the merge process, we query the data again and see that the row has been merged instead of several rows of data.
In actual situation, different frequency, out-of-order, delay and other phenomena may occur during the data writing stage, but only batch scenarios require the database to merge the data, so the other types will not be described again.
YMatrix believes that UPSERT is a combination of INSERT and UPDATE functions.
When a new data is about to be stored in the library:
Notes!
The above "specified row" refers to the sort key that already exists in the database, the MARS2 table that creates the mars2_btree index specified, or the key that creates a unique index/constraint specified in the HEAP table that is the same row as the new row that is about to be entered into the database.
In YMatrix, UPSERT is not a SQL keyword, but an operation that integrates INSERT and UPDATE functions. It can be implemented in the following ways:
uniquemode=true
of the MARS2 table index.INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE
statement under the HEAP table.The usage of UPSERT in YMatrix under different storage engines is applicable to different business scenarios:
Storage Engine | UPSERT Usage | Applicable Scenarios |
MARS2 | Typically specify the `uniquemode=true` | timing scenario best practices for MARS2 table indexes. The data in a batch is not merged when written, but is automatically merged in the background by MARS2 during query, directly displaying the merged query results. This method can greatly improve write performance. Combined with the good compression performance of MARS2 tables, it can become the best practice for large-scale batch time series data writing and storage |
HEAP | Use mxgate | Schematic scenarios under the HEAP table are recommended. mxgate is a high-performance write tool of YMatrix with superior write performance. This method requires creating a unique constraint/index in the specified column (usually device unique identifier and timestamp) |
Use the `ON CONFLICT` SQL clause under the HEAP table | Small batch UPSERT operation is recommended. Since this method physically merges a batch of data during writing, it will affect a certain degree of writing performance, so if large-scale data writing is required, the performance is not as good as the above two. This method also requires creating unique constraints/indexes |
Here is an example:
First, install the matrixts
extension.
=# CREATE EXTENSION matrixts;
Then, create the MARS2 test table.
=# CREATE TABLE v2x_mars2 (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) USING MARS2
DISTRIBUTED BY (tag_id);
Creates a mars2_btree index of uniquemode=true
.
=# CREATE INDEX ON v2x_mars2 USING mars2_btree(tag_id,ts) WITH (uniquemode=true);
Insert four pieces of data from the same batch.
=# INSERT INTO v2x_mars2(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars2(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars2(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars2(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);
Query the data and it can be seen that the data of the tag1
device on 2022-07-19 00:00:00
batch has been merged and displayed as a row, and the new value overwrites the old value.
=# SELECT * FROM v2x_mars2;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
----------------------------+-----------------------------------------------------------------------------------------------------
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | t | f |
52
(1 row)
Time: 4.172 ms
If your business scenario is a timing scenario and the storage engine used is HEAP, then we recommend UPSERT through the mxgate high-speed writing tool.
To implement the UPSERT function, you need to create a unique constraint/index in the specified field.
In mxgate, the old value can be overwritten with the new value by specifying --upsert-key
to implement the UPSERT operation; --deduplicate-key
will maintain the old value, discard the new value, and realize the deduplication function.
--upsert-key
Notes!
This usage is equivalent to the following SQL statementINSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE
.
Examples are as follows: Create a HEAP test table.
=# CREATE TABLE v2x_heap_upsert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
Data for testing 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
After adding, click the esc
key to exit the file and enter :wq
to save and exit.
Data for testing 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
After adding, click the esc
key to exit the file and enter :wq
to save and exit.
Load data 1, set --upsert-key
to tag_id
and ts
.
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
Query result 1.
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
----------------------------+-----------------------------------------------------------------------------------------------------
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | | |
(1 row)
Time: 18.049 ms
Load data 2, set --upsert-key
to tag_id
and ts
.
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
In query result 2, you can see that the speed
of tag1
in the original data 1 was replaced by the corresponding new value in data 2 and merged into one row.
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 80 | f | f
| 70
(2 rows)
Time: 19.652 ms
--deduplicate-key
Notes!
This usage is equivalent to the following SQL statementINSERT INTO ... VALUES ... ON CONFLICT ... DO NOTHING
.
Examples are as follows: Create a test table.
=# CREATE TABLE v2x_heap_dedu (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
Data for testing 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
After adding, click the esc
key to exit the file and enter :wq
to save and exit.
Data for testing 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
After adding, click the esc
key to exit the file and enter :wq
to save and exit.
Load data 1, set --deduplicate-key
to tag_id
and ts
.
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
Query result 1.
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
(1 row)
Time: 18.010 ms
Load data 2, set --deduplicate-key
to tag_id
and ts
.
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
In query result 2, you can see that the speed
, left_turn_signal
, right_turn_signal
, and power
information of tag1
in data 2 are all discarded, retaining the relevant old value (or null value) in data 1.
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
(2 rows)
Time: 12.881 ms
Notes!
This usage is for use with HEAP tables only.
Examples are as follows: Create a HEAP test table.
=# CREATE TABLE v2x_heap_insert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) DISTRIBUTED BY (tag_id);
Create a unique index and specify the key value to (tag_id,ts)
.
=# CREATE UNIQUE INDEX ON v2x_heap_insert(tag_id,ts);
Insert test data.
=# INSERT INTO v2x_heap_insert(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45) ON CONFLICT(tag_id,ts) DO UPDATE
SET longitude = excluded.longitude,latitude = excluded.latitude;
=# INSERT INTO v2x_heap_insert(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2) ON CONFLICT(tag_id,ts) DO UPDATE
SET speed = excluded.speed;
=# INSERT INTO v2x_heap_insert(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',50) ON CONFLICT(tag_id,ts) DO UPDATE
SET power = excluded.power;
By viewing the test data, you can see that the data of the three tag1
devices inserted at 2022-07-19 00:00:00+00
batches have been merged into one row.
=# SELECT * FROM v2x_heap_insert;
ts | tag_id | longtitude | latitude | speed | left_turn_signal | right_turn_signal
| power
----------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
-+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
| 50
(1 row)
Time: 16.340 ms
Notes!
TheINSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE
statement is also introduced in the SQL Reference - INSERT section.