MatrixGate Data Loading Server

MatrixGate, abbreviated as mxgate, is a high-performance streaming data loading server located at bin/mxgate in the MatrixDB installation directory. MatrixGate currently supports data ingestion via HTTP and STDIN interfaces, with support for TEXT and CSV data formats.

1 MatrixGate Working Principle

The data loading logic of MatrixGate is illustrated below:

  1. A data collection system gathers device data or receives data sent by devices.
  2. The collection system continuously sends data to the MatrixGate service process (mxgate) in concurrent micro-batches.
  3. The mxgate process communicates efficiently with the MatrixDB master process to exchange transaction and control information.
  4. Data is directly transmitted to segment nodes for parallel, high-speed writes.

MatrixGate Architecture

2 Using MatrixGate

  • Generate a MatrixGate configuration file by specifying the target database and table:
mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf

This command generates a configuration file named mxgate.conf. It allows customized loading settings for testtable and testtable2, while also enabling default global settings for loading into other tables.

  • Modify the mxgate.conf file as needed (e.g., set field delimiters). Skip this step if using defaults. The generated configuration includes entries like:
    [[job.target]]
      # delimiter = "|"
      # exclude-columns = []
      # format = "text"
      name = "job_text_to_public.testtable"
      # null-as = ""
      table = "public.testtable"
      # time-format = "unix-second"
      # use-auto-increment = true

    [[job.target]]
      # delimiter = "|"
      # exclude-columns = []
      # format = "text"
      name = "job_text_to_public.testtable2"
      # null-as = ""
      table = "public.testtable2"
      # time-format = "unix-second"
      # use-auto-increment = true

If testtable uses @ as delimiter and testtable2 uses %, update the config accordingly:

    [[job.target]]
      delimiter = "@"
      # exclude-columns = []
      # format = "text"
      name = "job_text_to_public.testtable"
      # null-as = ""
      table = "public.testtable"
      # time-format = "unix-second"
      # use-auto-increment = true

    [[job.target]]
      delimiter = "%"
      # exclude-columns = []
      # format = "text"
      name = "job_text_to_public.testtable2"
      # null-as = ""
      table = "public.testtable2"
      # time-format = "unix-second"
      # use-auto-increment = true

By default, mxgate listens on port 8086. This can be seen in the mxgate.conf under [source.http]:

[source]

  ## Source plugin is the data entrance to MatrixGate
  ## Types restricted to: http
  source = "http"

  [source.http]

    ## Port of http push
    # http-port = 8086

    ## Maximum request body size (after gzip)
    ## The server rejects requests with bodies exceeding this limit.
    # max-body-bytes = 4194304

    ## The maximum number of concurrent HTTP connections to the server
    ## The server response with 503 after exceed this limit.
    # max-concurrency = 40000

To change the listening port, modify the http-port value.

  • Start mxgate with the configuration file and connect to the demo database to prepare for data loading:
mxgate start --config mxgate.conf
  • Check background service status:
mxgate status
  • Stop the background service:
mxgate stop

To force stop in case of timeout or other issues:

mxgate stop --force

3 MatrixGate Command-Line Parameters

Parameter Default Value Description
[general]
--job-interval-get false Get job interval for write transactions
--job-list false List all write jobs
--job-state false Get state of all write jobs
--stream-prepared-get 11 Get number of active write connections in a transaction
--stream-status-get false Get status of write connections in a transaction
--stream-prepared-cli int 0 Number of active write connections in a transaction
--pause false Pause data writing
--resume false Resume paused writing
[database]
--db-database postgres Target MatrixDB database name
--db-master-host localhost hostname Hostname of MatrixDB master
--db-master-port 5432 Port of MatrixDB master
--db-user current OS user Username to connect to MatrixDB
Note: This user must have permission to create external tables. For non-superusers, grant privileges using:
GRANT CREATE ON DATABASE demo TO username;
--db-password empty Password for MatrixDB user
--db-max-conn 10 Maximum number of connections from MatrixGate to MatrixDB
[job]
--allow-dynamic false When set to true, enables dynamic table mapping based on POST content (first line). Use only when target table is unknown at startup. For known tables, use --target.
--delimiter | Field delimiter within each row
--error-handling accurate Error handling mode:
accurate: Skip malformed rows, log errors, continue processing.
legacy: Reject entire batch on any error.
--exclude-columns empty Columns excluded from input data. Input columns must match table order for included fields. Auto-increment columns skipped via --use-auto-increment need not be listed here.
--format text Data format: text or csv. text is faster but does not allow newlines in text fields. csv is more robust; text fields must be quoted.
--null-as empty string String representing NULL values. Default is unquoted empty string. If column has NOT NULL constraint and receives null, load fails. To use \N, escape backslash: --null-as \\N.
--time-format unix-second Timestamp unit: unix-second, unix-ms, unix-nano, or raw. MatrixGate treats first column as Unix timestamp by default. Use raw if timestamp is not in first column or already in DB format.
--upsert-key empty Key(s) for upsert operations. Table must have UNIQUE constraint on specified key(s).
--deduplicate-key empty Similar to --upsert-key, but only updates NULL fields. New values are discarded if old value is non-NULL. Mutually exclusive with --upsert-key.
--use-auto-increment true Whether to skip auto-increment columns in input and use system-generated values.
--target schemaName.tableName Target table name. Schema defaults to public. Multiple tables allowed: --target table1 --target table2 .... If omitted, use --allow-dynamic for dynamic table resolution.
[misc]
--log-archive-hours 72 Hours after which unchanged log files are compressed
--log-compress true Enable automatic log compression
--log-dir /home/mxadmin/gpAdminLogs Log directory
--log-max-archive-files 0 Max number of archived logs to keep. Oldest deleted when exceeded. 0 means no deletion.
--log-remove-after-days 0 Days after compression before log files are deleted. 0 means never delete.
--log-rotate-size-mb 100 Rotate log file when size exceeds this value (in MB)
[source]
--source http Data source type: http, stdin, kafka, transfer
[source][http]
--http-port 8086 HTTP port for data ingestion
--max-body-bytes 4194304 Maximum size (bytes) of HTTP request body
--max-concurrency 40000 Maximum concurrent HTTP connections
--request-timeout 0 Request timeout in milliseconds. 0 means no timeout. Non-zero values return HTTP 408 on timeout.
--disable-keep-alive false Close connection after each HTTP request
--http-debug false Enable verbose HTTP diagnostic logging
[source][transfer]
--src-host Source database master host IP
--src-port Source database master port
--src-user Username to connect to source (superuser recommended)
--src-password Password for source connection
--src-schema Schema of source table
--src-table Name of source table
--src-sql SQL filter for data migration
--compress Compression method for transfer:
"" (no compression)
gzip (requires gzip installed on segments)
lz4 (requires lz4 installed on segments)
Recommended: lz4 > gzip > none
--port-base Base port for transfer (range starts at 9129)
--local-ip Local IP address reachable from source database
[writer]
--interval 100ms Batch write interval (milliseconds)
--stream-prepared 10 Parallelism level for insert workers
--use-gzip auto Whether to compress data sent to segments: auto, yes, or no
--max-seg-conn 128 Number of segment connections used by external tables to pull data. Increasing consumes more network resources.
--timing false Include timing info for each INSERT in logs
--insert-timeout 0 Timeout (ms) for INSERT execution. 0 means no timeout. Non-zero values abort after specified duration.
Other
--help Show usage and parameter list


4 MatrixGate API

MatrixGate provides an HTTP API that allows data ingestion into MatrixDB from any programming language.

MatrixGate HTTP Protocol Format

Field Format Usage and Example
URL http://mxgate-host:port Address to connect to mxgate
PATH / Only root path / is supported; any suffix is ignored
HTTP Method POST Only POST is supported for data loading
HTTP Header Content-Encoding: gzip Supports gzip compression of request body
Content-Type: text/plain Only text/plain is supported
HTTP Body SchemaName.TableName
Timestamp\|ID\|C1\|C2\|...\|Cn
First line specifies target table. Schema can be omitted (defaults to public). Subsequent lines contain time-series data. Each line corresponds to one row. Fields are separated by |, rows by \n. First field is Unix timestamp (seconds), see --time-format. Second field is TagID (integer). Remaining fields map to table columns. Recommended table DDL follows (Timestamp, TagID, C1, C2, ..., Cn) order.

MatrixGate HTTP Response Codes

Code Meaning Notes
200 StatusOK Partial success. Response body may include error details for failed rows, e.g.:
{"error":"invalid format","line":2}
{"error":"column mismatch","line":3}
204 StatusNoContent All data successfully ingested
400 StatusBadRequest Bad request: invalid POST body, table not found, header-content mismatch, etc.
405 StatusMethodNotAllowed Non-POST HTTP method used
408 StatusTimeout Request timed out
500 StatusInternalServerError Database-side error; load failed. Response body contains detailed error message.
503 StatusServiceUnavailable Service rejected: too many connections, shutting down, etc.

5 MatrixGate HTTP API Command-Line Example

  • Create table testtable in the demo database:
CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
  • Create data file data.txt:
public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
  • Start mxgate with configuration:
mxgate --config mxgate.conf
  • Send HTTP request to load data:
curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
  • Query database to verify load:
demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
 date_part  |          time          | tagid | c1  | c2  | c3
------------+------------------------+-------+-----+-----+-----
 1603777821 | 2020-10-27 13:50:21+08 |     1 | 101 | 201 | 301
 1603777822 | 2020-10-27 13:50:22+08 |     2 | 102 | 202 | 302
 1603777823 | 2020-10-27 13:50:23+08 |     3 | 103 | 203 | 303
(3 rows)

6 Connecting to MatrixGate from Programming Languages


6.1 Java Example for MatrixGate HTTP API

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

public class MxgateExample {
    public static void main(String[] args) throws Exception {
        MxgateExample http = new MxgateExample();
        http.sendingPostRequest();
    }

    // HTTP Post request
    private void sendingPostRequest() throws Exception {
        // mxgate listens on port 8086 of localhost
        String url = "http://localhost:8086/";
        URL obj = new URL(url);
        HttpURLConnection con = (HttpURLConnection) obj.openConnection();

        // Setting basic post request
        con.setRequestMethod("POST");
        con.setRequestProperty("Content-Type","text/plain");
        String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";

        con.setDoOutput(true);
        DataOutputStream wr = new DataOutputStream(con.getOutputStream());
        // Encode using UTF-8 if data contains Chinese characters
        wr.write(postJsonData.toString().getBytes("UTF-8"));
        wr.flush();
        wr.close();

        int responseCode = con.getResponseCode();
        System.out.println("Sending 'POST' request to URL : " + url);
        System.out.println("Post Data : " + postJsonData);
        System.out.println("Response Code : " + responseCode);

        BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
        String output;
        StringBuffer response = new StringBuffer();

        while ((output = in.readLine()) != null) {
            response.append(output);
        }
        in.close();

        System.out.println(response.toString());
    }
}


6.2 Python Example for MatrixGate HTTP API

import http.client

class MxgateExample(object):
    def __init__(self):
        # mxgate listens on port 8086 of localhost
        self.url = "localhost:8086"

        self.postData = "public.testtable\n/" \
                        "1603777821|1|101|201|301\n/" \
                        "1603777822|2|102|202|302\n/" \
                        "1603777823|3|103|203|303"
        self.headers = {"Content-Type": "text/plain"}

    # HTTP Post request
    def sending_post_request(self):

        conn = http.client.HTTPConnection(self.url)
        conn.request("POST", "/", self.postData, self.headers)

        response = conn.getresponse()
        response_code = response.getcode()
        print(f"Sending 'POST' request to URL : {self.url}")
        print(f"Post Data : {self.postData}")
        print(f"Response Code : {response_code}")

        output = response.read()
        print(output)


if __name__ == '__main__':
    gate_post = MxgateExample()
    gate_post.sending_post_request()


6.3 C# Example for MatrixGate HTTP API

Recommended: Use C# Core development environment.

using System;
using System.IO;
using System.Net;
using System.Text;

namespace HttpPostTest
{
class Program
    {
        static void Main(string[] args)
        {
            var url = "http://10.13.2.177:8086/";
            var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";

           HttpPost(url,txt);
        }

public static string HttpPost(string url, string content){
    string result = "";
    HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url);
    req.Method = "POST";
    req.ContentType = "text/plain";

    #region Add Post Parameters
    byte[] data = Encoding.UTF8.GetBytes(content);
    req.ContentLength = data.Length;
    using (Stream reqStream = req.GetRequestStream()){
        reqStream.Write(data, 0, data.Length);
        reqStream.Close();
    }
    #endregion

    HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
    Stream stream = resp.GetResponseStream();
    // Get response content
    using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
        result = reader.ReadToEnd();
        }
        return result;
    }

  }
}

If you encounter error when serving connection ***** body size exceeds the given limit, increase the max-body-bytes value in mxgate.conf.

6.4 Golang Example for MatrixGate HTTP API

package main

import (
    "bytes"
    "net/http"
)

func PostDataToServer(URL string) error {
    data := `public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
`
    resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
    if err != nil {
        return err
    }
    if resp.StatusCode != 200 {
        // Handle response body
        return nil
    }

    // Handle response body
    return nil
}

func main()  {
    err := PostDataToServer("http://127.0.0.1:8086")
    if err != nil{
        panic(err)
    }

}

--- SPLIT ---

7 MatrixGate Loading Special Data Types

7.1 MatrixGate CSV File Loading Example

  • Create table csvtable in the demo database:

    CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
    DISTRIBUTED BY (tagid);
  • Edit data file data.csv with the following content:

    1603777821|1|101|201|301
    1603777822|2|102|202|302
    1603777823|3|103|203|303
  • Start mxgate, set source to stdin, target table as existing csvtable, and parallelism to 2:

    mxgate \
    --source stdin \
    --db-database demo \
    --db-master-host 127.0.0.1 \
    --db-master-port 5432 \
    --db-user mxadmin \
    --time-format unix-second \
    --delimiter "|" \
    --target csvtable \
    --parallel 2 < data.csv
  • Connect to the database and verify successful data load:

    
    demo=# SELECT * FROM csvtable ;
            time          | tagid | c1  | c2  | c3
    ------------------------+-------+-----+-----+-----
    2020-10-27 05:50:23+08 |     3 | 103 | 203 | 303
    2020-10-27 05:50:22+08 |     2 | 102 | 202 | 302
    2020-10-27 05:50:21+08 |     1 | 101 | 201 | 301

(3 rows)


### 7.2 MatrixGate JSON Field Loading Example  

#### 7.2.1 json  

- Create table:  
```sql
create table json_test(id int, j json);
  • Prepare data file:

    1|"{""a"":10, ""b"":""xyz""}"
  • Load data
    Use stdin mode as an example; other modes are similar.
    The key is using --format csv.

    mxgated \
    --source stdin \
    --db-database postgres \
    --db-master-host 127.0.0.1 \
    --db-master-port 7000 \
    --db-user mxadmin \
    --time-format raw \
    --format csv \
    --delimiter "|" \
    --target json_test < ~/json.csv
  • Check loaded data:

    postgres=# select * from json_test;
    id |           j
    ----+-----------------------
    1 | {"a":10, "b":"xyz"}
    (1 row)

7.2.2 json array

  • Create table:

    create table json_array_test(id int, j _json);
  • Prepare data file:

    1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"
  • Load data:

    mxgate \
    --source stdin \
    --db-database postgres \
    --db-master-host 127.0.0.1 \
    --db-master-port 7000 \
    --db-user mxadmin \
    --time-format raw \
    --format csv \
    --delimiter "|" \
    --target json_array_test < ~/json_array.csv
  • Verify result:

    postgres=# select * from json_array_test ;
    id |                      j
    ----+---------------------------------------------
    1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"}
    (1 row)

    Note: Since JSON columns contain special characters such as quotes, the --format parameter of mxgate must be set to csv.

8 Monitoring mxgate Runtime Metrics

watch is a subcommand of mxgate that displays various metrics describing the status of the mxgate daemon.
It supports two modes:

  • Real-time mode: Outputs metrics every 3 seconds in a format similar to sar.
  • Historical mode: Allows querying import speed over customizable time ranges and intervals (e.g., hourly for yesterday, daily for last month, monthly for last year).

8.1 Real-Time Monitoring

mxgate watch

Collects mxgate runtime metrics every three seconds. Sample output:

                 Time          WCount          ICount        WSpeed/s        ISpeed/s  WBandWidth MB/S     BlocakItems
  2022-04-28 15:20:58        14478858        14527011         2598081         2627887            2395               0
  2022-04-28 15:21:01        22231035        22633254         2584059         2702081            2222               0
  2022-04-28 15:21:04        30494310        30500874         2754425         2622540            3551               0
  2022-04-28 15:21:07        38004210        38032956         2503300         2510694            2862               0
  2022-04-28 15:21:10        46188696        46298223         2728162         2755089            2227               0
  ...

Use the --info parameter to get descriptions of each metric:

mxgate watch --info

By default, only speed metrics are shown. Use --watch-latency to monitor latency metrics for troubleshooting:

mxgate watch --watch-latency

8.2 Historical Data Monitoring

mxgate watch --history

Calculates average ingestion speed per hour over the past 24 hours. Example output:

                TIME RANGE                | SPEED/S  | BANDWIDTH MB/S  | BLOCK ITEMS
  2022-04-28 16:00:00-2022-04-28 17:00:00 |  2208010 |         1254.48 |           0
  2022-04-28 17:00:00-2022-04-28 18:00:00 |  1157920 |         1327.00 |           0
  2022-04-28 18:00:00-2022-04-28 19:00:00 |  2228666 |         2162.32 |           0
  2022-04-28 19:00:00-2022-04-28 20:00:00 |  1371092 |         2881.30 |           0
  2022-04-28 20:00:00-2022-04-28 21:00:00 |  1575320 |         2608.20 |           0
  • SPEED/S: Number of entries ingested per second.
  • BANDWIDTH MB/S: Ingestion bandwidth in MB/s.
  • BLOCK ITEMS: Amount of data blocked in mxgate. This value increases when the database consumption rate cannot keep up with the data source (e.g., HTTP, Kafka) production rate.

You can use --watch-start, --watch-end, and --watch-duration to control the time range and interval of historical observations.
For example:

mxgate watch --history --watch-start '2022-03-27 00:00:00' --watch-end '2022-04-27 00:00:00' --watch-duration '168h'

This returns the weekly (every 168 hours) average ingestion speed from March 27 to April 27.
The --watch-duration parameter supports units: s (seconds), m (minutes), h (hours).

9 Updating Parallel Write Parameters Without Downtime

mxgate allows dynamic updates to parallel write parameters without stopping the service. Supported parameters include "interval" and "stream-prepared".

  • interval: Duration of each write transaction from mxgate to the target table.
  • stream-prepared: Number of active write connections.

In mxgate, only one write connection performs transactions for a given table at any time. To achieve high-speed, efficient data loading, multiple connections are used across different time intervals. You can adjust the interval parameter to optimize write performance.

Examples:

  • Set the number of write connections per table to 3:

    mxgate set --stream-prepared-cli 3

    mxgate

  • Get the current number of active write connections per table:

    mxgate get --stream-prepared-get

    mxgate

  • Set the write interval for all tables to 200ms:

    mxgate set --job-interval 200

    mxgate

  • Get the current write interval for all tables:

    mxgate get --job-interval-get

    mxgate

Note!
To set or get parameters for a specific table, append --job <schema.table_name> to the command. Each job corresponds to a database table. For example, if the table name is test_table in schema public, use --job public.test_table.

10 Updating Table Schema Without Downtime

During data loading, you may find that the original table schema no longer fits evolving time-series data patterns. mxgate supports schema updates without stopping the service. This section describes how to pause writes, reload updated table metadata, and resume ingestion.

Steps:

  • First, run mxgate pause -X to stop all write connections and prepare for schema changes. The -X flag is required—it terminates connections between mxgate and the database. Without this, schema modifications will fail. Use -S to make pause wait synchronously until all connections are terminated before returning.
    mxgate

  • After pausing writes, modify the target table structure—add or drop columns, drop and recreate the table with the same name.

Note!
The new table structure can differ, but the table name must remain unchanged.

  • Finally, run mxgate resume -R to resume write operations and reload the updated table metadata. The -R flag is mandatory. Together, resume and -R trigger the metadata reload.
    mxgate

  • In environments with multiple running mxgate processes, use the -p parameter to specify the process ID. This applies to all commands above.
    mxgate

Note!
Reloading metadata requires that all write connections for the target table are paused first. Otherwise, the following error occurs:
mxgate