Batch Data Merge Scenarios (UPSERT)

This document describes YMatrix's solutions for batch data merge scenarios: implementing UPSERT functionality using different methods under various storage engines (HEAP, MARS2).

1 What Is a Batch Data Merge Scenario?

Consider a connected vehicle wide-table scenario A as an example.
In this scenario, the designed wide-table model (excluding metric types) is shown below:

The onboard data collection system collects data from the same vehicle at the same timestamp (simultaneously generated) and sends it to the YMatrix database in batches.

The wide table is designed to store, compute, and analyze data by vehicle, while the onboard system collects and uploads data by sensor. Therefore, data transmitted from the vehicle to YMatrix is written into the database in sensor-level batches.
This creates the batch data merge scenario in YMatrix.

In Example Scenario A, at least four batches of uploaded data require merging after being written to the database. If duplicate data exists in a batch, YMatrix updates the record by overwriting NULL values with non-NULL ones and newer values with older ones:

After merging, querying these records returns a single consolidated row instead of multiple rows.

In practice, phenomena such as out-of-order arrival, delayed ingestion, or irregular frequency may occur during data ingestion. However, only batch merge scenarios require the database to perform data consolidation; other cases are not discussed here.

2 What Is UPSERT?

YMatrix defines UPSERT as a combination of INSERT and UPDATE operations.

When inserting new data:

  • If the specified row already exists in the table, update it.
  • If the specified row does not exist, insert a new row.

Note!
The "specified row" refers to an existing row in the current database where the sort key defined by a mars2_btree index on a MARS2 table, or the unique index/constraint key on a HEAP table matches the incoming row.

3 Implementing UPSERT Across Different Storage Engines

In YMatrix, UPSERT is not a SQL keyword but an operation combining INSERT and UPDATE functionalities. It can be implemented through the following methods:

  • Set uniquemode=true when creating a MARS3 table.
  • Enable uniquemode=true on a MARS2 table index.
  • Use mxgate or the ON CONFLICT clause on a HEAP table.

The use of UPSERT across different storage engines suits distinct business scenarios:

Storage EngineUPSERT MethodUse Case
MARS3Set `uniquemode=true` when creating a MARS3 tableBest practice for time-series scenarios: data within a batch is merged as much as possible during write-in to reduce physical disk footprint; remaining minor merges occur at query time, returning fully merged results directly. This significantly improves both write and query performance. No universal best practice applies to OLAP or OLTP; enable uniquemode if batch merging is required.
MARS2Set `uniquemode=true` on a MARS2 table indexBest practice for time-series scenarios: similar to MARS3, partial merge occurs during write-in to minimize disk usage, with final merging done at query time for immediate presentation of consolidated results. Enhances write and query performance effectively.
HEAPUse mxgate on HEAP tablesRecommended for time-series workloads. mxgate is YMatrix’s high-performance data ingestion tool offering superior write throughput. Requires defining a unique constraint/index on specific columns (typically device ID and timestamp).
Use the `ON CONFLICT` SQL clause on HEAP tablesSuitable for small-scale UPSERT operations. Since this method performs physical merging during ingestion, it impacts write performance. Not ideal for large-scale writes compared to the above two options. Also requires a unique constraint/index.

3.1 MARS3 Storage Engine

Example:

First, install the matrixts extension.

=# CREATE EXTENSION matrixts;

Create a test MARS3 table.

=# CREATE TABLE v2x_mars3 (
  ts               timestamptz NOT NULL,
  tag_id           text NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) USING MARS3
WITH (uniquemode=true)
DISTRIBUTED BY (tag_id)
ORDER BY (tag_id,ts);

Note!
When enabling Unique Mode, the first column in the ORDER BY clause must have a NOT NULL constraint.

Insert four rows from the same batch.

=# INSERT INTO v2x_mars3(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars3(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars3(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars3(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);

Query the data. The data for device tag1 at batch 2022-07-19 00:00:00 is merged into one row, with new values replacing old ones.

=# SELECT * FROM v2x_mars3;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal | power 
------------------------+--------+-----------+----------+-------+------------------+-------------------+-------
 2022-07-19 00:00:00+00 | tag1   |     -32.3 |       45 |  70.2 | t                | f                 |    52
(1 row)

3.2 MARS2 Storage Engine

Example:

Install the matrixts extension.

=# CREATE EXTENSION matrixts;

Create a test MARS2 table.

=# CREATE TABLE v2x_mars2 (
  ts               timestamptz NOT NULL,
  tag_id           text NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) USING MARS2
DISTRIBUTED BY (tag_id);

Create a mars2_btree index with uniquemode=true.

=# CREATE INDEX ON v2x_mars2 USING mars2_btree(tag_id,ts) WITH (uniquemode=true);

Insert four rows from the same batch.

=# INSERT INTO v2x_mars2(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45);
=# INSERT INTO v2x_mars2(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2);
=# INSERT INTO v2x_mars2(ts,tag_id,left_turn_signal,right_turn_signal) VALUES('2022-07-19 00:00:00','tag1',true,false);
=# INSERT INTO v2x_mars2(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',52);

Query the data. The data for device tag1 at batch 2022-07-19 00:00:00 is merged into one row, with new values replacing old ones.

=# SELECT * FROM v2x_mars2;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
 power
------------------------+--------+------------+----------+-------+-----------------+------------------+
-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 | t               | f                |
    52
(1 row)
Time: 4.172 ms

3.3 HEAP Storage Engine

3.3.1 Using mxgate

For time-series workloads using the HEAP storage engine, we recommend using mxgate for high-speed data ingestion with UPSERT support.
To enable UPSERT, define a unique constraint or index on relevant columns.

In mxgate:

  • Use --upsert-key to overwrite old values with new ones (UPSERT).
  • Use --deduplicate-key to retain old values and discard new duplicates (deduplication).
  1. --upsert-key Usage

Note!
This is equivalent to the SQL statement INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE described later.

Example:
Create a test HEAP table.

=# CREATE TABLE v2x_heap_upsert (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int,
  UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);

Test data 1:

$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||

After editing, press esc to exit, then enter :wq to save and quit.

Test data 2:

$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66

After editing, press esc to exit, then enter :wq to save and quit.

Load data 1, setting --upsert-key to tag_id and ts.

$ cat upsert_demo1.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_upsert \
  --upsert-key tag_id \
  --upsert-key ts

Query result 1:

=# SELECT * FROM v2x_heap_upsert;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal |
 power
------------------------+--------+------------+----------+-------+-----------------+------------------+
-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 |                 |                  |
(1 row)
Time: 18.049 ms

Load data 2, setting --upsert-key to tag_id and ts.

$ cat upsert_demo2.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_upsert \
  --upsert-key tag_id \
  --upsert-key ts

Query result 2: The speed, left_turn_signal, right_turn_signal, and power fields for device tag1 in batch speed are updated with new values and merged into one row.

=# SELECT * FROM v2x_heap_upsert;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
 2022-07-19 00:00:00+08 | tag2   |           |          |       |                  |
|    66
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |    80 | f                | f
|    70
(2 rows)
Time: 19.652 ms
  1. --deduplicate-key Usage

Note!
This is equivalent to the SQL statement INSERT INTO ... VALUES ... ON CONFLICT ... DO NOTHING described later.

Example:
Create a test table.

=# CREATE TABLE v2x_heap_dedu (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int,
  UNIQUE(tag_id,ts)
) DISTRIBUTED BY (tag_id);

Test data 1:

$ vim upsert_demo1.dat
2022-07-19 00:00:00|tag1|-32.3|45|70.2||||

After editing, press esc to exit, then enter :wq to save and quit.

Test data 2:

$ vim upsert_demo2.dat
2022-07-19 00:00:00|tag1|-32.3|45||||
2022-07-19 00:00:00|tag1|-32.3|45|80|false|false|70
2022-07-19 00:00:00|tag2||||||66

After editing, press esc to exit, then enter :wq to save and quit.

Load data 1, setting --deduplicate-key to tag_id and ts.

$ cat upsert_demo1.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_dedu \
  --deduplicate-key tag_id \
  --deduplicate-key ts

Query result 1:

=# SELECT * FROM v2x_heap_dedu;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |  70.2 |                  |
|
(1 row)
Time: 18.010 ms

Load data 2, setting --deduplicate-key to tag_id and ts.

$ cat upsert_demo2.dat|mxgate --source stdin \
  --db-database postgres \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --stream-prepared 0 \
  --target v2x_heap_dedu \
  --deduplicate-key tag_id \
  --deduplicate-key ts

Query result 2: For device tag1, the values of speed, left_turn_signal, right_turn_signal, and power in data 2 are discarded. Original values (or nulls) from data 1 are preserved.

=# SELECT * FROM v2x_heap_dedu;
           ts           | tag_id | longitude | latitude | speed | left_turn_signal | right_turn_signal
| power
------------------------+--------+-----------+----------+-------+------------------+-------------------
+-------
 2022-07-19 00:00:00+08 | tag1   |     -32.3 |       45 |  70.2 |                  |
|
 2022-07-19 00:00:00+08 | tag2   |           |          |       |                  |
|    66
(2 rows)
Time: 12.881 ms

3.3.2 Using INSERT Statements

Note!
This method applies only to HEAP tables.

Example:
Create a test HEAP table.

=# CREATE TABLE v2x_heap_insert (
  ts               timestamptz   NOT NULL,
  tag_id           text  NOT NULL,
  longitude       float,
  latitude         float,
  speed            float,
  left_turn_signal  boolean,
  right_turn_signal boolean,
  power            int
) DISTRIBUTED BY (tag_id);

Create a unique index on keys (tag_id,ts).

=# CREATE UNIQUE INDEX ON v2x_heap_insert(tag_id,ts);

Insert test data.

=# INSERT INTO v2x_heap_insert(ts,tag_id,longitude,latitude) VALUES('2022-07-19 00:00:00','tag1',-32.3,45) ON CONFLICT(tag_id,ts) DO UPDATE
SET  longitude = excluded.longitude,latitude = excluded.latitude;
=# INSERT INTO v2x_heap_insert(ts,tag_id,speed) VALUES('2022-07-19 00:00:00','tag1',70.2) ON CONFLICT(tag_id,ts) DO UPDATE
SET  speed = excluded.speed;
=# INSERT INTO v2x_heap_insert(ts,tag_id,power) VALUES('2022-07-19 00:00:00','tag1',50) ON CONFLICT(tag_id,ts) DO UPDATE
SET  power = excluded.power;

Query the data. The three inserted records for device tag1 at batch 2022-07-19 00:00:00+00 are merged into one row.

=# SELECT * FROM v2x_heap_insert;
           ts           | tag_id | longtitude | latitude | speed | left_turn_signal | right_turn_signal
 | power
------------------------+--------+------------+----------+-------+------------------+------------------
-+-------
 2022-07-19 00:00:00+08 | tag1   |      -32.3 |       45 |  70.2 |                  |
 |    50
(1 row)
Time: 16.340 ms

Note!
The INSERT INTO ... VALUES ... ON CONFLICT ... DO UPDATE clause is also documented in the SQL Reference - INSERT.