This document describes YMatrix's solutions for batch data merge scenarios: implementing UPSERT functionality using different methods under various storage engines (HEAP, MARS2).
Consider a connected vehicle wide-table scenario A as an example.
In this scenario, the designed wide-table model (excluding metric types) is shown below:

The onboard data collection system collects data from the same vehicle at the same timestamp (simultaneously generated) and sends it to the YMatrix database in batches.
The wide table is designed to store, compute, and analyze data by vehicle, while the onboard system collects and uploads data by sensor. Therefore, data transmitted from the vehicle to YMatrix is written into the database in sensor-level batches.
This creates the batch data merge scenario in YMatrix.
In Example Scenario A, at least four batches of uploaded data require merging after being written to the database. If duplicate data exists in a batch, YMatrix updates the record by overwriting NULL values with non-NULL ones and newer values with older ones:

After merging, querying these records returns a single consolidated row instead of multiple rows.
In practice, phenomena such as out-of-order arrival, delayed ingestion, or irregular frequency may occur during data ingestion. However, only batch merge scenarios require the database to perform data consolidation; other cases are not discussed here.
YMatrix defines UPSERT as a combination of INSERT and UPDATE operations.
When inserting new data:
Note!
The "specified row" refers to an existing row in the current database where the sort key defined by a mars2_btree index on a MARS2 table, or the unique index/constraint key on a HEAP table matches the incoming row.
In YMatrix, UPSERT is not a SQL keyword but an operation combining INSERT and UPDATE functionalities. It can be implemented through the following methods:
uniquemode=true when creating a MARS3 table.uniquemode=true on a MARS2 table index.ON CONFLICT clause on a HEAP table.The use of UPSERT across different storage engines suits distinct business scenarios:
| Storage Engine | UPSERT Method | Use Case |
| MARS3 | Set `uniquemode=true` when creating a MARS3 table | Best practice for time-series scenarios: data within a batch is merged as much as possible during write-in to reduce physical disk footprint; remaining minor merges occur at query time, returning fully merged results directly. This significantly improves both write and query performance. No universal best practice applies to OLAP or OLTP; enable uniquemode if batch merging is required. |
| MARS2 | Set `uniquemode=true` on a MARS2 table index | Best practice for time-series scenarios: similar to MARS3, partial merge occurs during write-in to minimize disk usage, with final merging done at query time for immediate presentation of consolidated results. Enhances write and query performance effectively. |
| HEAP | Use mxgate on HEAP tables | Recommended for time-series workloads. mxgate is YMatrix’s high-performance data ingestion tool offering superior write throughput. Requires defining a unique constraint/index on specific columns (typically device ID and timestamp). |
| Use the `ON CONFLICT` SQL clause on HEAP tables | Suitable for small-scale UPSERT operations. Since this method performs physical merging during ingestion, it impacts write performance. Not ideal for large-scale writes compared to the above two options. Also requires a unique constraint/index. |
Example:
First, install the matrixts extension.
=# CREATE EXTENSION matrixts;
Create a test MARS3 table.
=# CREATE TABLE v2x_mars3 (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) USING MARS3
WITH (uniquemode=true)
DISTRIBUTED BY (tag_id)
ORDER BY (tag_id,ts);
Note!
When enabling Unique Mode, the first column in the ORDER BY clause must have a NOT NULL constraint.
Insert four rows from the same batch.
=# INSERT INTO v2x_mars3(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars3(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars3(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars3(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);
Query the data. The data for device tag1 at batch 2022-07-19 00:00:00 is merged into one row, with new values replacing old ones.
=# SELECT * FROM v2x_mars3;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal | power
------------------------+--------+-----------+----------+-------+------------------+-------------------+-------
2022-07-19 00:00:00+00 | tag1 | -32.3 | 45 | 70.2 | t | f | 52
(1 row)
Example:
Install the matrixts extension.
=# CREATE EXTENSION matrixts;
Create a test MARS2 table.
=# CREATE TABLE v2x_mars2 (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) USING MARS2
DISTRIBUTED BY (tag_id);
Create a mars2_btree index with uniquemode=true.
=# CREATE INDEX ON v2x_mars2 USING mars2_btree(tag_id,ts) WITH (uniquemode=true);
Insert four rows from the same batch.
=# INSERT INTO v2x_mars2(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars2(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars2(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars2(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);
Query the data. The data for device tag1 at batch 2022-07-19 00:00:00 is merged into one row, with new values replacing old ones.
=# SELECT * FROM v2x_mars2;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
------------------------+--------+------------+----------+-------+-----------------+------------------+
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | t | f |
52
(1 row)
Time: 4.172 ms
For time-series workloads using the HEAP storage engine, we recommend using mxgate for high-speed data ingestion with UPSERT support.
To enable UPSERT, define a unique constraint or index on relevant columns.
In mxgate:
--upsert-key to overwrite old values with new ones (UPSERT).--deduplicate-key to retain old values and discard new duplicates (deduplication).--upsert-key UsageNote!
This is equivalent to the SQL statementINSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATEdescribed later.
Example:
Create a test HEAP table.
=# CREATE TABLE v2x_heap_upsert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
Test data 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
After editing, press esc to exit, then enter :wq to save and quit.
Test data 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
After editing, press esc to exit, then enter :wq to save and quit.
Load data 1, setting --upsert-key to tag_id and ts.
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
Query result 1:
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
power
------------------------+--------+------------+----------+-------+-----------------+------------------+
-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | | |
(1 row)
Time: 18.049 ms
Load data 2, setting --upsert-key to tag_id and ts.
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_upsert \
--upsert-key tag_id \
--upsert-key ts
Query result 2: The speed, left_turn_signal, right_turn_signal, and power fields for device tag1 in batch speed are updated with new values and merged into one row.
=# SELECT * FROM v2x_heap_upsert;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 80 | f | f
| 70
(2 rows)
Time: 19.652 ms
--deduplicate-key UsageNote!
This is equivalent to the SQL statementINSERT INTO ... VALUES ... ON CONFLICT ... DO NOTHINGdescribed later.
Example:
Create a test table.
=# CREATE TABLE v2x_heap_dedu (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int,
UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);
Test data 1:
$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||
After editing, press esc to exit, then enter :wq to save and quit.
Test data 2:
$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66
After editing, press esc to exit, then enter :wq to save and quit.
Load data 1, setting --deduplicate-key to tag_id and ts.
$ cat upsert_demo1.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
Query result 1:
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
(1 row)
Time: 18.010 ms
Load data 2, setting --deduplicate-key to tag_id and ts.
$ cat upsert_demo2.dat|mxgate --source stdin \
--db-database postgres \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--time-format raw \
--delimiter "|" \
--stream-prepared 0 \
--target v2x_heap_dedu \
--deduplicate-key tag_id \
--deduplicate-key ts
Query result 2: For device tag1, the values of speed, left_turn_signal, right_turn_signal, and power in data 2 are discarded. Original values (or nulls) from data 1 are preserved.
=# SELECT * FROM v2x_heap_dedu;
ts | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
|
2022-07-19 00:00:00+08 | tag2 | | | | |
| 66
(2 rows)
Time: 12.881 ms
Note!
This method applies only to HEAP tables.
Example:
Create a test HEAP table.
=# CREATE TABLE v2x_heap_insert (
ts timestamptz NOT NULL,
tag_id text NOT NULL,
longitude float,
latitude float,
speed float,
left_turn_signal boolean,
right_turn_signal boolean,
power int
) DISTRIBUTED BY (tag_id);
Create a unique index on keys (tag_id,ts).
=# CREATE UNIQUE INDEX ON v2x_heap_insert(tag_id,ts);
Insert test data.
=# INSERT INTO v2x_heap_insert(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45) ON CONFLICT(tag_id,ts) DO UPDATE
SET longitude = excluded.longitude,latitude = excluded.latitude;
=# INSERT INTO v2x_heap_insert(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2) ON CONFLICT(tag_id,ts) DO UPDATE
SET speed = excluded.speed;
=# INSERT INTO v2x_heap_insert(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',50) ON CONFLICT(tag_id,ts) DO UPDATE
SET power = excluded.power;
Query the data. The three inserted records for device tag1 at batch 2022-07-19 00:00:00+00 are merged into one row.
=# SELECT * FROM v2x_heap_insert;
ts | tag_id | longtitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+------------+----------+-------+------------------+------------------
-+-------
2022-07-19 00:00:00+08 | tag1 | -32.3 | 45 | 70.2 | |
| 50
(1 row)
Time: 16.340 ms
Note!
TheINSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATEclause is also documented in the SQL Reference - INSERT.