MatrixGate Data Loading Server

MatrixGate, abbreviated as mxgate, is a high-performance streaming data loading server located in the bin/mxgate directory of the MatrixDB installation. Currently, MatrixGate supports data ingestion via HTTP and STDIN interfaces, with support for TEXT and CSV data formats.

1 MatrixGate Working Principle

The data loading logic of MatrixGate is illustrated below:

  1. A data collection system gathers device data or receives data sent from devices.
  2. The collection system continuously sends data to the MatrixGate service process (mxgate) in concurrent micro-batches.
  3. The mxgate process communicates efficiently with the MatrixDB master process to exchange transaction and control information.
  4. Data is directly transmitted to segment nodes and written in parallel at high speed.

MatrixGate Architecture

2 MatrixGate Usage

  • Generate an mxgate configuration file by specifying the target database and table:

    mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf

    This command generates a configuration file named mxgate.conf. It allows users to customize data loading for testtable and testtable2, while also supporting default global settings for loading into other tables.

  • Modify the mxgate.conf file as needed (e.g., set field delimiters). Skip this step if using default settings. The generated configuration includes entries for testtable and testtable2 like so:

      [[job.target]]
        # delimiter = "|"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable"
        # null-as = ""
        table = "public.testtable"
        # time-format = "unix-second"
        # use-auto-increment = true
    
      [[job.target]]
        # delimiter = "|"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable2"
        # null-as = ""
        table = "public.testtable2"
        # time-format = "unix-second"
        # use-auto-increment = true

    If testtable uses "@" as delimiter and testtable2 uses "%", update the configuration accordingly:

      [[job.target]]
        delimiter = "@"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable"
        # null-as = ""
        table = "public.testtable"
        # time-format = "unix-second"
        # use-auto-increment = true
    
      [[job.target]]
        delimiter = "%"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable2"
        # null-as = ""
        table = "public.testtable2"
        # time-format = "unix-second"
        # use-auto-increment = true

    By default, mxgate listens on port 8086 for incoming data. This can be seen in the mxgate.conf under [source.http] as http-port = 8086. You may change it to another port if necessary:

    [source]
    
    ## Source plugin is the data entrance to MatrixGate
    ## Types restricted to: http
    source = "http"
    
    [source.http]
    
      ## Port of http push
      # http-port = 8086
    
      ## Maximum request body size (after gzip)
      ## The server rejects requests with bodies exceeding this limit.
      # max-body-bytes = 4194304
    
      ## The maximum number of concurrent HTTP connections to the server
      ## The server response with 503 after exceed this limit.
      # max-concurrency = 40000
  • Start mxgate with the configuration file, connect to the demo database, and prepare to receive data load requests:

    mxgate start --config mxgate.conf
  • Check background service status:

    mxgate status
  • Stop the background service:

    mxgate stop

    To force stop when encountering timeouts or other issues:

    mxgate stop --force

3 MatrixGate Command-Line Parameters

Parameter Default Value Description
[database]
--db-database postgres Name of the MatrixDB database that MatrixGate connects to
--db-master-host local hostname Hostname of the MatrixDB master node
--db-master-port 5432 Port number of the MatrixDB master node
--db-user current OS user Username used to connect to MatrixDB
Note: This user must have permission to create external tables. For non-superusers, grant privileges using:
alter user {username} CREATEEXTTABLE;
--db-password empty Password for connecting to MatrixDB
--db-max-conn 10 Maximum number of connections from MatrixGate to MatrixDB
[job]
--allow-dynamic false When set to true, enables dynamic matching of target tables based on POST data content (first line). Use only when target table names are not known at startup. For fixed tables, use --target to explicitly specify table names
--delimiter | Character used to separate columns within each row
--error-handling accurate How to handle malformed rows
accurate: Skip invalid rows, log errors; other valid rows in batch proceed
legacy: Entire batch fails on any error
--exclude-columns empty List of column names to exclude during load. Required columns must still match table definition order. Note: Auto-increment columns skipped via --use-auto-increment do not need to be listed here
--format text Input data format: text or csv. text is faster but does not allow newlines in string fields. csv is more flexible; string fields must be quoted
--null-as empty string String representation of NULL values. Default is unquoted empty string. If a NOT NULL column contains this value, load will fail. To use \N, escape backslash: --null-as \\N
--time-format unix-second Timestamp unit: unix-second|unix-ms|unix-nano|raw.
MatrixGate assumes first column is Unix timestamp and converts it to DB time type. Use raw if timestamp is not in first column or already formatted
--upsert-key empty Key(s) for upsert operations.
Target table must have UNIQUE constraint, and all constraint keys must be specified
--use-auto-increment true Whether to skip auto-increment columns in input data and use system-generated values
--target schemaName.tableName Target table name. Schema defaults to public. Multiple tables allowed: --target table1 --target table2 .... Without this, use --allow-dynamic for dynamic table resolution
[misc]
--log-archive-hours 72 Log files unchanged for longer than this period are automatically compressed
--log-compress true Global switch for enabling log compression
--log-dir /home/mxadmin/gpAdminLogs Directory for log files
--log-max-archive-files 0 Maximum number of compressed log files to retain. Oldest deleted when exceeded. 0 means no deletion
--log-remove-after-days 0 Number of days after which compressed logs are deleted. 0 means never delete
--log-rotate-size-mb 100 Rotate log file when size exceeds this value (in MB); old file immediately compressed
[source]
--source http Data source type: supports http, stdin, kafka, transfer
[source][http]
--http-port 8086 HTTP port for clients to submit data
--max-body-bytes 4194304 Maximum size (bytes) per HTTP request body
--max-concurrency 40000 Maximum concurrent HTTP connections
[source][transfer]
--src-host IP address of source database master
--src-port Port of source database master
--src-user Username to connect to source database (superuser recommended)
--src-password Password
--src-schema Schema name of source table
--src-table Source table name
--src-sql SQL filter for migrating data
--compress Compression method from source segment hosts:
"" (empty): No compression, plain text
gzip: Requires gzip installed on source segments
lz4: Requires lz4 installed on source segments
Recommendation: lz4 > gzip > no compression
--port-base Base port for transfer; range starts at 9129
--local-ip IP address reachable from source database
[writer]
--interval 100ms Interval (milliseconds) between bulk insert operations
--stream-prepared 10 Parallelism level for insert worker processes
--use-gzip auto Whether to compress data sent to segments: auto/yes/no
--timing false Include timing info in logs for each INSERT statement
Other
--help Display usage and parameter list


4 MatrixGate API

MatrixGate provides an HTTP API allowing various programming languages to import data into MatrixDB via HTTP.

MatrixGate HTTP Protocol Format

Protocol Element Format Usage and Example
URL http://mxgate-host:port Address to connect to mxgate
PATH / Only / is supported; any path suffix is ignored
HTTP Method POST Only POST is currently supported
HTTP Header Content-Encoding: gzip Optional: Compress body with gzip
Content-Type: text/plain Required: MIME type
HTTP Body SchemaName.TableName
Timestamp|ID|C1|C2|...|Cn
First line specifies target table (SchemaName optional, default public). Subsequent lines contain time-series data. Each line corresponds to one row in the target table, with columns separated by | and rows by \n. First field is Unix timestamp in seconds (see --time-format). Second field is TagID (integer). Remaining fields map to table columns. Table DDL should follow (Timestamp, TagID, C1, C2, ..., Cn) column order

MatrixGate HTTP Response Codes

Code Meaning Notes
200 StatusOK Partial success: Some rows failed format check. Response body includes error details, e.g.:
At line: 2
missing data for column "c3"
204 StatusNoContent All data successfully accepted by MatrixGate
400 StatusBadRequest Bad request: Invalid POST body, table not found, header-content mismatch, etc.
405 StatusMethodNotAllowed Request method is not POST
500 StatusInternalServerError Database-side error; load failed. Response body contains detailed error message
503 StatusServiceUnavailable MatrixGate rejected request (e.g., max connections reached, shutting down)

5 MatrixGate HTTP API Command-Line Examples

  • Create table testtable in the demo database:

    CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
    DISTRIBUTED BY (tagid);
  • Edit data file data.txt:

    public.testtable
    1603777821|1|101|201|301
    1603777822|2|102|202|302
    1603777823|3|103|203|303
  • Start mxgate with configuration file:

    mxgate --config mxgate.conf
  • Send HTTP request to load data:

    curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
  • Query database to verify load:

    demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
    date_part  |          time          | tagid | c1  | c2  | c3
    ------------+------------------------+-------+-----+-----+-----
    1603777821 | 2020-10-27 13:50:21+08 |     1 | 101 | 201 | 301
    1603777822 | 2020-10-27 13:50:22+08 |     2 | 102 | 202 | 302
    1603777823 | 2020-10-27 13:50:23+08 |     3 | 103 | 203 | 303
    (3 rows)

6 Programming Language Integration with MatrixGate


6.1 Java Example for MatrixGate HTTP API

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

public class MxgateExample {
    public static void main(String[] args) throws Exception {
        MxgateExample http = new MxgateExample();
        http.sendingPostRequest();
    }

    // HTTP Post request
    private void sendingPostRequest() throws Exception {
        // mxgate listens on port 8086 of localhost
        String url = "http://localhost:8086/";
        URL obj = new URL(url);
        HttpURLConnection con = (HttpURLConnection) obj.openConnection();

        // Setting basic post request
        con.setRequestMethod("POST");
        con.setRequestProperty("Content-Type","text/plain");
        String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";

        con.setDoOutput(true);
        DataOutputStream wr = new DataOutputStream(con.getOutputStream());
        // Encode Chinese characters using UTF-8
        wr.write(postJsonData.toString().getBytes("UTF-8"));
        wr.flush();
        wr.close();

        int responseCode = con.getResponseCode();
        System.out.println("Sending 'POST' request to URL : " + url);
        System.out.println("Post Data : " + postJsonData);
        System.out.println("Response Code : " + responseCode);

        BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
        String output;
        StringBuffer response = new StringBuffer();

        while ((output = in.readLine()) != null) {
            response.append(output);
        }
        in.close();

        System.out.println(response.toString());
    }
}


6.2 Python Example for MatrixGate HTTP API

import http.client

class MxgateExample(object):
    def __init__(self):
        # mxgate listens on port 8086 of localhost
        self.url = "localhost:8086"

        self.postData = "public.testtable\n/" \
                        "1603777821|1|101|201|301\n/" \
                        "1603777822|2|102|202|302\n/" \
                        "1603777823|3|103|203|303"
        self.headers = {"Content-Type": "text/plain"}

    # HTTP Post request
    def sending_post_request(self):

        conn = http.client.HTTPConnection(self.url)
        conn.request("POST", "/", self.postData, self.headers)

        response = conn.getresponse()
        response_code = response.getcode()
        print(f"Sending 'POST' request to URL : {self.url}")
        print(f"Post Data : {self.postData}")
        print(f"Response Code : {response_code}")

        output = response.read()
        print(output)


if __name__ == '__main__':
    gate_post = MxgateExample()
    gate_post.sending_post_request()


6.3 C# Example for MatrixGate HTTP API

Recommended to develop using C# Core environment

using System;
using System.IO;
using System.Net;
using System.Text;

namespace HttpPostTest
{
class Program
    {
        static void Main(string[] args)
        {
            var url = "http://10.13.2.177:8086/";
            var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";

           HttpPost(url,txt);
        }

public static string HttpPost(string url, string content){
    string result = "";
    HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url);
    req.Method = "POST";
    req.ContentType = "text/plain";

    #region 添加Post 参数
    byte[] data = Encoding.UTF8.GetBytes(content);
    req.ContentLength = data.Length;
    using (Stream reqStream = req.GetRequestStream()){
        reqStream.Write(data, 0, data.Length);
        reqStream.Close();
    }
    #endregion

    HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
    Stream stream = resp.GetResponseStream();
    //获取响应内容
    using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
        result = reader.ReadToEnd();
        }
        return result;
    }

  }
}

If you encounter "error when serving connection ***** body size exceeds the given limit", increase max-body-bytes in mxgate.conf.

6.4 Golang Example for MatrixGate HTTP API

package main

import (
    "bytes"
    "net/http"
)

func PostDataToServer(URL string) error {
    data := `public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
`
    resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
    if err != nil {
        return err
    }
    if resp.StatusCode != 200 {
        // Deal with the response body.
        return nil
    }

    // Deal with the response body.
    return nil
}

func main()  {
    err := PostDataToServer("http://127.0.0.1:8086")
    if err != nil{
        panic(err)
    }

}

7 Loading Special Data Types with MatrixGate

7.1 Example: Loading CSV Files via MatrixGate

  • Create table csvtable in the demo database:

    CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
    DISTRIBUTED BY (tagid);
  • Edit data file data.csv:

    1603777821|1|101|201|301
    1603777822|2|102|202|302
    1603777823|3|103|203|303
  • Start mxgate with stdin source, targeting existing csvtable, using 2 parallel streams:

    mxgate \
    --source stdin \
    --db-database demo \
    --db-master-host 127.0.0.1 \
    --db-master-port 5432 \
    --db-user mxadmin \
    --time-format unix-second \
    --delimiter "|" \
    --target csvtable \
    --parallel 2 < data.csv
  • Query database to confirm successful load:

demo=# SELECT * FROM csvtable ;
          time          | tagid | c1  | c2  | c3
------------------------+-------+-----+-----+-----
 2020-10-27 05:50:23+08 |     3 | 103 | 203 | 303
 2020-10-27 05:50:22+08 |     2 | 102 | 202 | 302
 2020-10-27 05:50:21+08 |     1 | 101 | 201 | 301

(3 rows)

--- SPLIT ---

7.2 MatrixGate JSON Field Loading Examples

7.2.1 json

  • Create table

    create table json_test(id int, j json);
  • Create data file ~/json.csv

    1|"{""a"":10, ""b"":""xyz""}"
  • Load data Use stdin mode as an example; other modes are similar. The key is --format csv

    mxgated \
    --source stdin \
    --db-database postgres \
    --db-master-host 127.0.0.1 \
    --db-master-port 7000 \
    --db-user mxadmin \
    --time-format raw \
    --format csv \
    --delimiter "|" \
    --target json_test < ~/json.csv
  • View loaded data

    postgres=# select * from json_test;
    id |           j
    ----+-----------------------
    1 | {"a":10, "b":"xyz"}
    (1 row)

7.2.2 json array

  • Create table

    create table json_array_test(id int, j _json);
  • Create data file ~/json_array.csv

    1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"
  • Load data

    mxgate \
    --source stdin \
    --db-database postgres \
    --db-master-host 127.0.0.1 \
    --db-master-port 7000 \
    --db-user mxadmin \
    --time-format raw \
    --format csv \
    --delimiter "|" \
    --target json_array_test < ~/json_array.csv
  • Verify data

    postgres=# select * from json_array_test ;
    id |                      j
    ----+---------------------------------------------
    1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"}
    (1 row)

    Note: Because the JSON column contains special characters such as quotes, the --format parameter in mxgate must be set to csv.