MatrixGate, abbreviated as mxgate, is a high-performance streaming data loading server located at bin/mxgate in the MatrixDB installation directory. MatrixGate currently supports data ingestion via HTTP and STDIN interfaces, with support for TEXT and CSV data formats.
The data loading logic of MatrixGate is illustrated below:
mxgate) in concurrent micro-batches. mxgate process communicates efficiently with the MatrixDB master process to exchange transaction and control information. 
mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf
This command generates a configuration file named mxgate.conf. It allows customized loading settings for testtable and testtable2, while also enabling default global settings for loading into other tables.
mxgate.conf file as needed (e.g., set field delimiters). Skip this step if using defaults. The generated configuration includes entries like: [[job.target]]
# delimiter = "|"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable"
# null-as = ""
table = "public.testtable"
# time-format = "unix-second"
# use-auto-increment = true
[[job.target]]
# delimiter = "|"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable2"
# null-as = ""
table = "public.testtable2"
# time-format = "unix-second"
# use-auto-increment = true
If testtable uses @ as delimiter and testtable2 uses %, update the config accordingly:
[[job.target]]
delimiter = "@"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable"
# null-as = ""
table = "public.testtable"
# time-format = "unix-second"
# use-auto-increment = true
[[job.target]]
delimiter = "%"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable2"
# null-as = ""
table = "public.testtable2"
# time-format = "unix-second"
# use-auto-increment = true
By default, mxgate listens on port 8086. This can be seen in the mxgate.conf under [source.http]:
[source]
## Source plugin is the data entrance to MatrixGate
## Types restricted to: http
source = "http"
[source.http]
## Port of http push
# http-port = 8086
## Maximum request body size (after gzip)
## The server rejects requests with bodies exceeding this limit.
# max-body-bytes = 4194304
## The maximum number of concurrent HTTP connections to the server
## The server response with 503 after exceed this limit.
# max-concurrency = 40000
To change the listening port, modify the http-port value.
demo database to prepare for data loading:mxgate start --config mxgate.conf
mxgate status
mxgate stop
To force stop in case of timeout or other issues:
mxgate stop --force
| Parameter | Default Value | Description |
|---|---|---|
| [general] | ||
--job-interval-get |
false | Get job interval for write transactions |
--job-list |
false | List all write jobs |
--job-state |
false | Get state of all write jobs |
--stream-prepared-get |
11 | Get number of active write connections in a transaction |
--stream-status-get |
false | Get status of write connections in a transaction |
--stream-prepared-cli int |
0 | Number of active write connections in a transaction |
--pause |
false | Pause data writing |
--resume |
false | Resume paused writing |
| [database] | ||
--db-database |
postgres | Target MatrixDB database name |
--db-master-host |
localhost hostname | Hostname of MatrixDB master |
--db-master-port |
5432 | Port of MatrixDB master |
--db-user |
current OS user | Username to connect to MatrixDB Note: This user must have permission to create external tables. For non-superusers, grant privileges using: GRANT CREATE ON DATABASE demo TO username; |
--db-password |
empty | Password for MatrixDB user |
--db-max-conn |
10 | Maximum number of connections from MatrixGate to MatrixDB |
| [job] | ||
--allow-dynamic |
false | When set to true, enables dynamic table mapping based on POST content (first line). Use only when target table is unknown at startup. For known tables, use --target. |
--delimiter |
| | Field delimiter within each row |
--error-handling |
accurate | Error handling mode:accurate: Skip malformed rows, log errors, continue processing.legacy: Reject entire batch on any error. |
--exclude-columns |
empty | Columns excluded from input data. Input columns must match table order for included fields. Auto-increment columns skipped via --use-auto-increment need not be listed here. |
--format |
text | Data format: text or csv. text is faster but does not allow newlines in text fields. csv is more robust; text fields must be quoted. |
--null-as |
empty string | String representing NULL values. Default is unquoted empty string. If column has NOT NULL constraint and receives null, load fails. To use \N, escape backslash: --null-as \\N. |
--time-format |
unix-second | Timestamp unit: unix-second, unix-ms, unix-nano, or raw. MatrixGate treats first column as Unix timestamp by default. Use raw if timestamp is not in first column or already in DB format. |
--upsert-key |
empty | Key(s) for upsert operations. Table must have UNIQUE constraint on specified key(s). |
--deduplicate-key |
empty | Similar to --upsert-key, but only updates NULL fields. New values are discarded if old value is non-NULL. Mutually exclusive with --upsert-key. |
--use-auto-increment |
true | Whether to skip auto-increment columns in input and use system-generated values. |
--target |
schemaName.tableName | Target table name. Schema defaults to public. Multiple tables allowed: --target table1 --target table2 .... If omitted, use --allow-dynamic for dynamic table resolution. |
| [misc] | ||
--log-archive-hours |
72 | Hours after which unchanged log files are compressed |
--log-compress |
true | Enable automatic log compression |
--log-dir |
/home/mxadmin/gpAdminLogs | Log directory |
--log-max-archive-files |
0 | Max number of archived logs to keep. Oldest deleted when exceeded. 0 means no deletion. |
--log-remove-after-days |
0 | Days after compression before log files are deleted. 0 means never delete. |
--log-rotate-size-mb |
100 | Rotate log file when size exceeds this value (in MB) |
| [source] | ||
--source |
http | Data source type: http, stdin, kafka, transfer |
| [source][http] | ||
--http-port |
8086 | HTTP port for data ingestion |
--max-body-bytes |
4194304 | Maximum size (bytes) of HTTP request body |
--max-concurrency |
40000 | Maximum concurrent HTTP connections |
--request-timeout |
0 | Request timeout in milliseconds. 0 means no timeout. Non-zero values return HTTP 408 on timeout. |
--disable-keep-alive |
false | Close connection after each HTTP request |
--http-debug |
false | Enable verbose HTTP diagnostic logging |
| [source][transfer] | ||
--src-host |
— | Source database master host IP |
--src-port |
— | Source database master port |
--src-user |
— | Username to connect to source (superuser recommended) |
--src-password |
— | Password for source connection |
--src-schema |
— | Schema of source table |
--src-table |
— | Name of source table |
--src-sql |
— | SQL filter for data migration |
--compress |
— | Compression method for transfer: "" (no compression) gzip (requires gzip installed on segments)lz4 (requires lz4 installed on segments)Recommended: lz4 > gzip > none |
--port-base |
— | Base port for transfer (range starts at 9129) |
--local-ip |
— | Local IP address reachable from source database |
| [writer] | ||
--interval |
100ms | Batch write interval (milliseconds) |
--stream-prepared |
10 | Parallelism level for insert workers |
--use-gzip |
auto | Whether to compress data sent to segments: auto, yes, or no |
--max-seg-conn |
128 | Number of segment connections used by external tables to pull data. Increasing consumes more network resources. |
--timing |
false | Include timing info for each INSERT in logs |
--insert-timeout |
0 | Timeout (ms) for INSERT execution. 0 means no timeout. Non-zero values abort after specified duration. |
| Other | ||
--help |
— | Show usage and parameter list |
MatrixGate provides an HTTP API that allows data ingestion into MatrixDB from any programming language.
| Field | Format | Usage and Example |
|---|---|---|
| URL | http://mxgate-host:port |
Address to connect to mxgate |
| PATH | / |
Only root path / is supported; any suffix is ignored |
| HTTP Method | POST | Only POST is supported for data loading |
| HTTP Header | Content-Encoding: gzip |
Supports gzip compression of request body |
Content-Type: text/plain |
Only text/plain is supported |
|
| HTTP Body | SchemaName.TableNameTimestamp\|ID\|C1\|C2\|...\|Cn |
First line specifies target table. Schema can be omitted (defaults to public). Subsequent lines contain time-series data. Each line corresponds to one row. Fields are separated by |, rows by \n. First field is Unix timestamp (seconds), see --time-format. Second field is TagID (integer). Remaining fields map to table columns. Recommended table DDL follows (Timestamp, TagID, C1, C2, ..., Cn) order. |
| Code | Meaning | Notes |
|---|---|---|
| 200 | StatusOK | Partial success. Response body may include error details for failed rows, e.g.:{"error":"invalid format","line":2}{"error":"column mismatch","line":3} |
| 204 | StatusNoContent | All data successfully ingested |
| 400 | StatusBadRequest | Bad request: invalid POST body, table not found, header-content mismatch, etc. |
| 405 | StatusMethodNotAllowed | Non-POST HTTP method used |
| 408 | StatusTimeout | Request timed out |
| 500 | StatusInternalServerError | Database-side error; load failed. Response body contains detailed error message. |
| 503 | StatusServiceUnavailable | Service rejected: too many connections, shutting down, etc. |
testtable in the demo database:CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
data.txt:public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
mxgate --config mxgate.conf
curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
date_part | time | tagid | c1 | c2 | c3
------------+------------------------+-------+-----+-----+-----
1603777821 | 2020-10-27 13:50:21+08 | 1 | 101 | 201 | 301
1603777822 | 2020-10-27 13:50:22+08 | 2 | 102 | 202 | 302
1603777823 | 2020-10-27 13:50:23+08 | 3 | 103 | 203 | 303
(3 rows)
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
public class MxgateExample {
public static void main(String[] args) throws Exception {
MxgateExample http = new MxgateExample();
http.sendingPostRequest();
}
// HTTP Post request
private void sendingPostRequest() throws Exception {
// mxgate listens on port 8086 of localhost
String url = "http://localhost:8086/";
URL obj = new URL(url);
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
// Setting basic post request
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type","text/plain");
String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";
con.setDoOutput(true);
DataOutputStream wr = new DataOutputStream(con.getOutputStream());
// Encode using UTF-8 if data contains Chinese characters
wr.write(postJsonData.toString().getBytes("UTF-8"));
wr.flush();
wr.close();
int responseCode = con.getResponseCode();
System.out.println("Sending 'POST' request to URL : " + url);
System.out.println("Post Data : " + postJsonData);
System.out.println("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String output;
StringBuffer response = new StringBuffer();
while ((output = in.readLine()) != null) {
response.append(output);
}
in.close();
System.out.println(response.toString());
}
}
import http.client
class MxgateExample(object):
def __init__(self):
# mxgate listens on port 8086 of localhost
self.url = "localhost:8086"
self.postData = "public.testtable\n/" \
"1603777821|1|101|201|301\n/" \
"1603777822|2|102|202|302\n/" \
"1603777823|3|103|203|303"
self.headers = {"Content-Type": "text/plain"}
# HTTP Post request
def sending_post_request(self):
conn = http.client.HTTPConnection(self.url)
conn.request("POST", "/", self.postData, self.headers)
response = conn.getresponse()
response_code = response.getcode()
print(f"Sending 'POST' request to URL : {self.url}")
print(f"Post Data : {self.postData}")
print(f"Response Code : {response_code}")
output = response.read()
print(output)
if __name__ == '__main__':
gate_post = MxgateExample()
gate_post.sending_post_request()
Recommended: Use C# Core development environment.
using System;
using System.IO;
using System.Net;
using System.Text;
namespace HttpPostTest
{
class Program
{
static void Main(string[] args)
{
var url = "http://10.13.2.177:8086/";
var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";
HttpPost(url,txt);
}
public static string HttpPost(string url, string content){
string result = "";
HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url);
req.Method = "POST";
req.ContentType = "text/plain";
#region Add Post Parameters
byte[] data = Encoding.UTF8.GetBytes(content);
req.ContentLength = data.Length;
using (Stream reqStream = req.GetRequestStream()){
reqStream.Write(data, 0, data.Length);
reqStream.Close();
}
#endregion
HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
Stream stream = resp.GetResponseStream();
// Get response content
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
result = reader.ReadToEnd();
}
return result;
}
}
}
If you encounter
error when serving connection ***** body size exceeds the given limit, increase themax-body-bytesvalue inmxgate.conf.
package main
import (
"bytes"
"net/http"
)
func PostDataToServer(URL string) error {
data := `public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
`
resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
if err != nil {
return err
}
if resp.StatusCode != 200 {
// Handle response body
return nil
}
// Handle response body
return nil
}
func main() {
err := PostDataToServer("http://127.0.0.1:8086")
if err != nil{
panic(err)
}
}
--- SPLIT ---
7 MatrixGate Loading Special Data Types
Create table csvtable in the demo database:
CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
Edit data file data.csv with the following content:
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
Start mxgate, set source to stdin, target table as existing csvtable, and parallelism to 2:
mxgate \
--source stdin \
--db-database demo \
--db-master-host 127.0.0.1 \
--db-master-port 5432 \
--db-user mxadmin \
--time-format unix-second \
--delimiter "|" \
--target csvtable \
--parallel 2 < data.csv
Connect to the database and verify successful data load:
demo=# SELECT * FROM csvtable ;
time | tagid | c1 | c2 | c3
------------------------+-------+-----+-----+-----
2020-10-27 05:50:23+08 | 3 | 103 | 203 | 303
2020-10-27 05:50:22+08 | 2 | 102 | 202 | 302
2020-10-27 05:50:21+08 | 1 | 101 | 201 | 301
(3 rows)
### 7.2 MatrixGate JSON Field Loading Example
#### 7.2.1 json
- Create table:
```sql
create table json_test(id int, j json);
Prepare data file:
1|"{""a"":10, ""b"":""xyz""}"
Load data
Use stdin mode as an example; other modes are similar.
The key is using --format csv.
mxgated \
--source stdin \
--db-database postgres \
--db-master-host 127.0.0.1 \
--db-master-port 7000 \
--db-user mxadmin \
--time-format raw \
--format csv \
--delimiter "|" \
--target json_test < ~/json.csv
Check loaded data:
postgres=# select * from json_test;
id | j
----+-----------------------
1 | {"a":10, "b":"xyz"}
(1 row)
Create table:
create table json_array_test(id int, j _json);
Prepare data file:
1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"
Load data:
mxgate \
--source stdin \
--db-database postgres \
--db-master-host 127.0.0.1 \
--db-master-port 7000 \
--db-user mxadmin \
--time-format raw \
--format csv \
--delimiter "|" \
--target json_array_test < ~/json_array.csv
Verify result:
postgres=# select * from json_array_test ;
id | j
----+---------------------------------------------
1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"}
(1 row)
Note: Since JSON columns contain special characters such as quotes, the
--formatparameter of mxgate must be set tocsv.
watch is a subcommand of mxgate that displays various metrics describing the status of the mxgate daemon.
It supports two modes:
sar. mxgate watch
Collects mxgate runtime metrics every three seconds. Sample output:
Time WCount ICount WSpeed/s ISpeed/s WBandWidth MB/S BlocakItems
2022-04-28 15:20:58 14478858 14527011 2598081 2627887 2395 0
2022-04-28 15:21:01 22231035 22633254 2584059 2702081 2222 0
2022-04-28 15:21:04 30494310 30500874 2754425 2622540 3551 0
2022-04-28 15:21:07 38004210 38032956 2503300 2510694 2862 0
2022-04-28 15:21:10 46188696 46298223 2728162 2755089 2227 0
...
Use the --info parameter to get descriptions of each metric:
mxgate watch --info
By default, only speed metrics are shown. Use --watch-latency to monitor latency metrics for troubleshooting:
mxgate watch --watch-latency
mxgate watch --history
Calculates average ingestion speed per hour over the past 24 hours. Example output:
TIME RANGE | SPEED/S | BANDWIDTH MB/S | BLOCK ITEMS
2022-04-28 16:00:00-2022-04-28 17:00:00 | 2208010 | 1254.48 | 0
2022-04-28 17:00:00-2022-04-28 18:00:00 | 1157920 | 1327.00 | 0
2022-04-28 18:00:00-2022-04-28 19:00:00 | 2228666 | 2162.32 | 0
2022-04-28 19:00:00-2022-04-28 20:00:00 | 1371092 | 2881.30 | 0
2022-04-28 20:00:00-2022-04-28 21:00:00 | 1575320 | 2608.20 | 0
SPEED/S: Number of entries ingested per second. BANDWIDTH MB/S: Ingestion bandwidth in MB/s. BLOCK ITEMS: Amount of data blocked in mxgate. This value increases when the database consumption rate cannot keep up with the data source (e.g., HTTP, Kafka) production rate.You can use --watch-start, --watch-end, and --watch-duration to control the time range and interval of historical observations.
For example:
mxgate watch --history --watch-start '2022-03-27 00:00:00' --watch-end '2022-04-27 00:00:00' --watch-duration '168h'
This returns the weekly (every 168 hours) average ingestion speed from March 27 to April 27.
The --watch-duration parameter supports units: s (seconds), m (minutes), h (hours).
mxgate allows dynamic updates to parallel write parameters without stopping the service. Supported parameters include "interval" and "stream-prepared".
interval: Duration of each write transaction from mxgate to the target table. stream-prepared: Number of active write connections. In mxgate, only one write connection performs transactions for a given table at any time. To achieve high-speed, efficient data loading, multiple connections are used across different time intervals. You can adjust the interval parameter to optimize write performance.
Examples:
Set the number of write connections per table to 3:
mxgate set --stream-prepared-cli 3

Get the current number of active write connections per table:
mxgate get --stream-prepared-get

Set the write interval for all tables to 200ms:
mxgate set --job-interval 200

Get the current write interval for all tables:
mxgate get --job-interval-get

Note!
To set or get parameters for a specific table, append--job <schema.table_name>to the command. Each job corresponds to a database table. For example, if the table name istest_tablein schemapublic, use--job public.test_table.
During data loading, you may find that the original table schema no longer fits evolving time-series data patterns. mxgate supports schema updates without stopping the service. This section describes how to pause writes, reload updated table metadata, and resume ingestion.
Steps:
First, run mxgate pause -X to stop all write connections and prepare for schema changes. The -X flag is required—it terminates connections between mxgate and the database. Without this, schema modifications will fail. Use -S to make pause wait synchronously until all connections are terminated before returning.

After pausing writes, modify the target table structure—add or drop columns, drop and recreate the table with the same name.
Note!
The new table structure can differ, but the table name must remain unchanged.
Finally, run mxgate resume -R to resume write operations and reload the updated table metadata. The -R flag is mandatory. Together, resume and -R trigger the metadata reload.

In environments with multiple running mxgate processes, use the -p parameter to specify the process ID. This applies to all commands above.

Note!
Reloading metadata requires that all write connections for the target table are paused first. Otherwise, the following error occurs: