MatrixGate, abbreviated as mxgate, is a high-performance streaming data loading server located in the bin/mxgate directory of the MatrixDB installation. Currently, MatrixGate supports data ingestion via HTTP and STDIN interfaces, with support for TEXT and CSV data formats.
The data loading logic of MatrixGate is illustrated below:

Generate an mxgate configuration file by specifying the target database and table:
mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf
This command generates a configuration file named mxgate.conf. It allows users to customize data loading for testtable and testtable2, while also supporting default global settings for loading into other tables.
Modify the mxgate.conf file as needed (e.g., set field delimiters). Skip this step if using default settings. The generated configuration includes entries for testtable and testtable2 like so:
[[job.target]]
# delimiter = "|"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable"
# null-as = ""
table = "public.testtable"
# time-format = "unix-second"
# use-auto-increment = true
[[job.target]]
# delimiter = "|"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable2"
# null-as = ""
table = "public.testtable2"
# time-format = "unix-second"
# use-auto-increment = true
If testtable uses "@" as delimiter and testtable2 uses "%", update the configuration accordingly:
[[job.target]]
delimiter = "@"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable"
# null-as = ""
table = "public.testtable"
# time-format = "unix-second"
# use-auto-increment = true
[[job.target]]
delimiter = "%"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable2"
# null-as = ""
table = "public.testtable2"
# time-format = "unix-second"
# use-auto-increment = true
By default, mxgate listens on port 8086 for incoming data. This can be seen in the mxgate.conf under [source.http] as http-port = 8086. You may change it to another port if necessary:
[source]
## Source plugin is the data entrance to MatrixGate
## Types restricted to: http
source = "http"
[source.http]
## Port of http push
# http-port = 8086
## Maximum request body size (after gzip)
## The server rejects requests with bodies exceeding this limit.
# max-body-bytes = 4194304
## The maximum number of concurrent HTTP connections to the server
## The server response with 503 after exceed this limit.
# max-concurrency = 40000
Start mxgate with the configuration file, connect to the demo database, and prepare to receive data load requests:
mxgate start --config mxgate.conf
Check background service status:
mxgate status
Stop the background service:
mxgate stop
To force stop when encountering timeouts or other issues:
mxgate stop --force
| Parameter | Default Value | Description |
|---|---|---|
| [database] | ||
--db-database |
postgres | Name of the MatrixDB database that MatrixGate connects to |
--db-master-host |
local hostname | Hostname of the MatrixDB master node |
--db-master-port |
5432 | Port number of the MatrixDB master node |
--db-user |
current OS user | Username used to connect to MatrixDB Note: This user must have permission to create external tables. For non-superusers, grant privileges using: alter user {username} CREATEEXTTABLE; |
--db-password |
empty | Password for connecting to MatrixDB |
--db-max-conn |
10 | Maximum number of connections from MatrixGate to MatrixDB |
| [job] | ||
--allow-dynamic |
false | When set to true, enables dynamic matching of target tables based on POST data content (first line). Use only when target table names are not known at startup. For fixed tables, use --target to explicitly specify table names |
--delimiter |
| | Character used to separate columns within each row |
--error-handling |
accurate | How to handle malformed rowsaccurate: Skip invalid rows, log errors; other valid rows in batch proceedlegacy: Entire batch fails on any error |
--exclude-columns |
empty | List of column names to exclude during load. Required columns must still match table definition order. Note: Auto-increment columns skipped via --use-auto-increment do not need to be listed here |
--format |
text | Input data format: text or csv. text is faster but does not allow newlines in string fields. csv is more flexible; string fields must be quoted |
--null-as |
empty string | String representation of NULL values. Default is unquoted empty string. If a NOT NULL column contains this value, load will fail. To use \N, escape backslash: --null-as \\N |
--time-format |
unix-second | Timestamp unit: unix-second|unix-ms|unix-nano|raw.MatrixGate assumes first column is Unix timestamp and converts it to DB time type. Use raw if timestamp is not in first column or already formatted |
--upsert-key |
empty | Key(s) for upsert operations. Target table must have UNIQUE constraint, and all constraint keys must be specified |
--use-auto-increment |
true | Whether to skip auto-increment columns in input data and use system-generated values |
--target |
schemaName.tableName | Target table name. Schema defaults to public. Multiple tables allowed: --target table1 --target table2 .... Without this, use --allow-dynamic for dynamic table resolution |
| [misc] | ||
--log-archive-hours |
72 | Log files unchanged for longer than this period are automatically compressed |
--log-compress |
true | Global switch for enabling log compression |
--log-dir |
/home/mxadmin/gpAdminLogs | Directory for log files |
--log-max-archive-files |
0 | Maximum number of compressed log files to retain. Oldest deleted when exceeded. 0 means no deletion |
--log-remove-after-days |
0 | Number of days after which compressed logs are deleted. 0 means never delete |
--log-rotate-size-mb |
100 | Rotate log file when size exceeds this value (in MB); old file immediately compressed |
| [source] | ||
--source |
http | Data source type: supports http, stdin, kafka, transfer |
| [source][http] | ||
--http-port |
8086 | HTTP port for clients to submit data |
--max-body-bytes |
4194304 | Maximum size (bytes) per HTTP request body |
--max-concurrency |
40000 | Maximum concurrent HTTP connections |
| [source][transfer] | ||
--src-host |
IP address of source database master | |
--src-port |
Port of source database master | |
--src-user |
Username to connect to source database (superuser recommended) | |
--src-password |
Password | |
--src-schema |
Schema name of source table | |
--src-table |
Source table name | |
--src-sql |
SQL filter for migrating data | |
--compress |
Compression method from source segment hosts: "" (empty): No compression, plain text gzip: Requires gzip installed on source segments lz4: Requires lz4 installed on source segments Recommendation: lz4 > gzip > no compression |
|
--port-base |
Base port for transfer; range starts at 9129 | |
--local-ip |
IP address reachable from source database | |
| [writer] | ||
--interval |
100ms | Interval (milliseconds) between bulk insert operations |
--stream-prepared |
10 | Parallelism level for insert worker processes |
--use-gzip |
auto | Whether to compress data sent to segments: auto/yes/no |
--timing |
false | Include timing info in logs for each INSERT statement |
| Other | ||
--help |
Display usage and parameter list |
MatrixGate provides an HTTP API allowing various programming languages to import data into MatrixDB via HTTP.
MatrixGate HTTP Protocol Format
| Protocol Element | Format | Usage and Example |
|---|---|---|
| URL | http://mxgate-host:port | Address to connect to mxgate |
| PATH | / | Only / is supported; any path suffix is ignored |
| HTTP Method | POST | Only POST is currently supported |
| HTTP Header | Content-Encoding: gzip | Optional: Compress body with gzip |
| Content-Type: text/plain | Required: MIME type | |
| HTTP Body | SchemaName.TableName Timestamp|ID|C1|C2|...|Cn |
First line specifies target table (SchemaName optional, default public). Subsequent lines contain time-series data. Each line corresponds to one row in the target table, with columns separated by | and rows by \n. First field is Unix timestamp in seconds (see --time-format). Second field is TagID (integer). Remaining fields map to table columns. Table DDL should follow (Timestamp, TagID, C1, C2, ..., Cn) column order |
MatrixGate HTTP Response Codes
| Code | Meaning | Notes |
|---|---|---|
| 200 | StatusOK | Partial success: Some rows failed format check. Response body includes error details, e.g.:At line: 2missing data for column "c3" |
| 204 | StatusNoContent | All data successfully accepted by MatrixGate |
| 400 | StatusBadRequest | Bad request: Invalid POST body, table not found, header-content mismatch, etc. |
| 405 | StatusMethodNotAllowed | Request method is not POST |
| 500 | StatusInternalServerError | Database-side error; load failed. Response body contains detailed error message |
| 503 | StatusServiceUnavailable | MatrixGate rejected request (e.g., max connections reached, shutting down) |
Create table testtable in the demo database:
CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
Edit data file data.txt:
public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
Start mxgate with configuration file:
mxgate --config mxgate.conf
Send HTTP request to load data:
curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
Query database to verify load:
demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
date_part | time | tagid | c1 | c2 | c3
------------+------------------------+-------+-----+-----+-----
1603777821 | 2020-10-27 13:50:21+08 | 1 | 101 | 201 | 301
1603777822 | 2020-10-27 13:50:22+08 | 2 | 102 | 202 | 302
1603777823 | 2020-10-27 13:50:23+08 | 3 | 103 | 203 | 303
(3 rows)
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
public class MxgateExample {
public static void main(String[] args) throws Exception {
MxgateExample http = new MxgateExample();
http.sendingPostRequest();
}
// HTTP Post request
private void sendingPostRequest() throws Exception {
// mxgate listens on port 8086 of localhost
String url = "http://localhost:8086/";
URL obj = new URL(url);
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
// Setting basic post request
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type","text/plain");
String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";
con.setDoOutput(true);
DataOutputStream wr = new DataOutputStream(con.getOutputStream());
// Encode Chinese characters using UTF-8
wr.write(postJsonData.toString().getBytes("UTF-8"));
wr.flush();
wr.close();
int responseCode = con.getResponseCode();
System.out.println("Sending 'POST' request to URL : " + url);
System.out.println("Post Data : " + postJsonData);
System.out.println("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String output;
StringBuffer response = new StringBuffer();
while ((output = in.readLine()) != null) {
response.append(output);
}
in.close();
System.out.println(response.toString());
}
}
import http.client
class MxgateExample(object):
def __init__(self):
# mxgate listens on port 8086 of localhost
self.url = "localhost:8086"
self.postData = "public.testtable\n/" \
"1603777821|1|101|201|301\n/" \
"1603777822|2|102|202|302\n/" \
"1603777823|3|103|203|303"
self.headers = {"Content-Type": "text/plain"}
# HTTP Post request
def sending_post_request(self):
conn = http.client.HTTPConnection(self.url)
conn.request("POST", "/", self.postData, self.headers)
response = conn.getresponse()
response_code = response.getcode()
print(f"Sending 'POST' request to URL : {self.url}")
print(f"Post Data : {self.postData}")
print(f"Response Code : {response_code}")
output = response.read()
print(output)
if __name__ == '__main__':
gate_post = MxgateExample()
gate_post.sending_post_request()
Recommended to develop using C# Core environment
using System;
using System.IO;
using System.Net;
using System.Text;
namespace HttpPostTest
{
class Program
{
static void Main(string[] args)
{
var url = "http://10.13.2.177:8086/";
var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";
HttpPost(url,txt);
}
public static string HttpPost(string url, string content){
string result = "";
HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url);
req.Method = "POST";
req.ContentType = "text/plain";
#region 添加Post 参数
byte[] data = Encoding.UTF8.GetBytes(content);
req.ContentLength = data.Length;
using (Stream reqStream = req.GetRequestStream()){
reqStream.Write(data, 0, data.Length);
reqStream.Close();
}
#endregion
HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
Stream stream = resp.GetResponseStream();
//获取响应内容
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
result = reader.ReadToEnd();
}
return result;
}
}
}
If you encounter "error when serving connection ***** body size exceeds the given limit", increase
max-body-bytesinmxgate.conf.
package main
import (
"bytes"
"net/http"
)
func PostDataToServer(URL string) error {
data := `public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
`
resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
if err != nil {
return err
}
if resp.StatusCode != 200 {
// Deal with the response body.
return nil
}
// Deal with the response body.
return nil
}
func main() {
err := PostDataToServer("http://127.0.0.1:8086")
if err != nil{
panic(err)
}
}
Create table csvtable in the demo database:
CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
Edit data file data.csv:
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
Start mxgate with stdin source, targeting existing csvtable, using 2 parallel streams:
mxgate \
--source stdin \
--db-database demo \
--db-master-host 127.0.0.1 \
--db-master-port 5432 \
--db-user mxadmin \
--time-format unix-second \
--delimiter "|" \
--target csvtable \
--parallel 2 < data.csv
Query database to confirm successful load:
demo=# SELECT * FROM csvtable ;
time | tagid | c1 | c2 | c3
------------------------+-------+-----+-----+-----
2020-10-27 05:50:23+08 | 3 | 103 | 203 | 303
2020-10-27 05:50:22+08 | 2 | 102 | 202 | 302
2020-10-27 05:50:21+08 | 1 | 101 | 201 | 301
(3 rows)
--- SPLIT ---
7.2 MatrixGate JSON Field Loading Examples
Create table
create table json_test(id int, j json);
Create data file
~/json.csv
1|"{""a"":10, ""b"":""xyz""}"
Load data
Use stdin mode as an example; other modes are similar.
The key is --format csv
mxgated \
--source stdin \
--db-database postgres \
--db-master-host 127.0.0.1 \
--db-master-port 7000 \
--db-user mxadmin \
--time-format raw \
--format csv \
--delimiter "|" \
--target json_test < ~/json.csv
View loaded data
postgres=# select * from json_test;
id | j
----+-----------------------
1 | {"a":10, "b":"xyz"}
(1 row)
Create table
create table json_array_test(id int, j _json);
Create data file
~/json_array.csv
1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"
Load data
mxgate \
--source stdin \
--db-database postgres \
--db-master-host 127.0.0.1 \
--db-master-port 7000 \
--db-user mxadmin \
--time-format raw \
--format csv \
--delimiter "|" \
--target json_array_test < ~/json_array.csv
Verify data
postgres=# select * from json_array_test ;
id | j
----+---------------------------------------------
1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"}
(1 row)
Note: Because the JSON column contains special characters such as quotes, the
--formatparameter in mxgate must be set tocsv.