Data loading server MatrixGate

MatrixGate is abbreviated as mxgate. It is a high-performance streaming data loading server located in bin/mxgate in the MatrixDB installation directory. MatrixGate currently provides HTTP and STDIN interfaces to load data, and the data format supports TEXT and CSV.

1 How MatrixGate works

The logic of MatrixGate loading data is shown in the figure below, 1) The data acquisition system collects device data or receives data sent by the device 2) The acquisition system continuously sends data to MatrixGate's service process mxgate in a concurrent microbatch mode 3) The mxgate process and the MatrixDB master process communicate efficiently, communicate transactions and control information 4) The data is directly sent to the segment node and written in parallel at high speed.

MatrixGate schematic

2 MatrixGate usage

  • Specify the target database and target table to generate mxgate configuration file

    mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf

    The above parameters will generate a configuration file mxgate.conf, allowing users to personalize the loading of testtable and testtable2, and can also use the global default settings to load data into other tables.

  • Modify mxgate configuration file as needed, such as configuration data separator, etc., and select the default configuration to ignore this step. You can see the settings corresponding to testtable and testtable2 in this configuration file as follows:

      [[job.target]]
        # delimiter = "|"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable"
        # null-as = ""
        table = "public.testtable"
        # time-format = "unix-second"
        # use-auto-increment = true
    
      [[job.target]]
        # delimiter = "|"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable2"
        # null-as = ""
        table = "public.testtable2"
        # time-format = "unix-second"
        # use-auto-increment = true

    If the delimiter of testtable is @ and the delimiter of testtable2 is %, the above configuration can be modified to:

      [[job.target]]
        delimiter = "@"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable"
        # null-as = ""
        table = "public.testtable"
        # time-format = "unix-second"
        # use-auto-increment = true
    
      [[job.target]]
        delimiter = "%"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable2"
        # null-as = ""
        table = "public.testtable2"
        # time-format = "unix-second"
        # use-auto-increment = true

    mxgate listens to port 8086 to receive data by default. You can see in mxgate.conf that the http-port sub-item under source.http is set to 8086. If necessary, you can change it to another port:

    [source]
    
    ## Source plugin is the data entrance to MatrixGate
    ## Types restricted to: http
    source = "http"
    
    [source.http]
    
      ## Port of http push
      # http-port = 8086
    
      ## Maximum request body size (after gzip)
      ## The server rejects requests with bodies exceeding this limit.
      # max-body-bytes = 4194304
    
      ## The maximum number of concurrent HTTP connections to the server
      ## The server response with 503 after exceed this limit.
      # max-concurrency = 40000
  • Start mxgate, load configuration files, connect to demo database, and prepare to receive data loading requests

    mxgate start --config mxgate.conf
  • Check the backend service status

    mxgate status
  • Terminate the background service

    mxgate stop

    When encountering timeout or other problems, you need to force stop, you can do this:

    mxgate stop --force

| Protocol Type | Protocol Format | Usage and Examples | | --- | --- | | URL | http://mxgate-host:port | Specify mxgate connection address | | PATH | / | Currently supported /, ignore / after any PATH | | HTTP Method | POST | Currently supports POST method to load data | | HTTP Header | Content-Encoding: gzip | Currently supports gzip for HTTP Body content compression | | | Content-Type: text/plain | Currently supported text/plain | | HTTP Body | SchemaName.TableName
Timestamp|ID]|C1|C2|..|Cn | The first behavior of the Body format is the target table loaded by data. SchemeName can be omitted. The default is Public. TableName is a required item. The second row starts with a time-series data row. Each row corresponds to a row of the target table. The | separator is used between columns and the \n separator is used between rows. The first field of each line is a timestamp, and the format is UNIX timestamp is accurate to seconds, see the description of --time-format. The second field of each row is TagID, integer. The third field to the last field of each row is the column corresponding to the target table. It is recommended that the DDL definition of the target table also follows the column order of (Timestamp, TagID, C1, C2,…, Cn) |

MatrixGate HTTP response code

| Response Code | Response Code Meaning | Remarks | | --- | --- | | 200 | StatusOK | Some data format is wrong, and the response Body will contain the wrong line with error message, such as:
At line: 2
missing data for column "c3" | | 204 | StatusNoContent | Data loaded successfully to MatrixGate | | 400 | StatusBadRequest | Data request errors, such as POST BODY format error, target table does not exist, data compression format does not match the HTTP request header, etc. | | 405 | StatusMethodNotAllowed | HTTP Non-POST Request | | 408 | StatusTimeout | Request timeout | | 500 | StatusIntervalServerError | Database error, data loading failed, response Body contains detailed error information | | 503 | StatusServiceUnavailable | MatrixGate rejects requests, such as exceeding the maximum number of connections, or MatrixGate is closing, etc |

5 MatrixGate HTTP API Command Line Example

  • Create a table testtable in demo database
    CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
    DISTRIBUTED BY (tagid);
  • Edit the data load file data.txt, the content is as follows
    public.testtable
    1603777821|1|101|201|301
    1603777822|2|102|202|302
    1603777823|3|103|203|303
  • Start mxgate and specify the generated configuration file mxgate.conf
    mxgate --config mxgate.conf
  • Send HTTP request to load data
    curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
  • Connect to the database to query whether the data is loaded successfully
    demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
    date_part  |          time          | tagid | c1  | c2  | c3
    ----------------------------------------------------------------------------------------------------------------------------------
    1603777821 | 2020-10-27 13:50:21+08 |     1 | 101 | 201 | 301
    1603777822 | 2020-10-27 13:50:22+08 |     2 | 102 | 202 | 302
    1603777823 | 2020-10-27 13:50:23+08 |     3 | 103 | 203 | 303
    (3 rows)

    6 Programming Language Connection MatrixGate


    6.1 MatrixGate HTTP API Java Example

    
    import java.io.BufferedReader;
    import java.io.DataOutputStream;
    import java.io.InputStreamReader;
    import java.net.HttpURLConnection;
    import java.net.URL;

public class MxgateExample { public static void main(String[] args) throws Exception { MxgateExample http = new MxgateExample(); http.sendingPostRequest(); }

// HTTP Post request
private void sendingPostRequest() throws Exception {
    // mxgate listens on port 8086 of localhost
    String url = "http://localhost:8086/";
    URL obj = new URL(url);
    HttpURLConnection con = (HttpURLConnection) obj.openConnection();

    // Setting basic post request
    con.setRequestMethod("POST");
    con.setRequestProperty("Content-Type","text/plain");
    String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";

    con.setDoOutput(true);
    DataOutputStream wr = new DataOutputStream(con.getOutputStream());
    // When the data is in Chinese, it can be encoded by postJsonData.getBytes("UTF-8")
    wr.write(postJsonData.toString().getBytes("UTF-8"));
    wr.flush();
    wr.close();

    int responseCode = con.getResponseCode();
    System.out.println("Sending 'POST' request to URL : " + url);
    System.out.println("Post Data : " + postJsonData);
    System.out.println("Response Code : " + responseCode);

    BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
    String output;
    StringBuffer response = new StringBuffer();

    while ((output = in.readLine()) != null) {
        response.append(output);
    }
    in.close();

    System.out.println(response.toString());
}

}

<a name="python"><br/></a>
### 6.2 MatrixGate HTTP API Python Example

import http.client

class MxgateExample(object): def init(self):

mxgate listens on port 8086 of localhost

    self.url = "localhost:8086"

    self.postData = "public.testtable\n/" \
                    "1603777821|1|101|201|301\n/" \
                    "1603777822|2|102|202|302\n/" \
                    "1603777823|3|103|203|303"
    self.headers = {"Content-Type": "text/plain"}

# HTTP Post request
def sending_post_request(self):

    conn = http.client.HTTPConnection(self.url)
    conn.request("POST", "/", self.postData, self.headers)

    response = conn.getresponse()
    response_code = response.getcode()
    print(f"Sending 'POST' request to URL : {self.url}")
    print(f"Post Data : {self.postData}")
    print(f"Response Code : {response_code}")

    output = response.read()
    print(output)

if name == 'main': gate_post = MxgateExample() gate_post.sending_post_request()

<a name="C#"><br/></a>
### 6.3 MatrixGate HTTP API C# Example
> It is recommended to use the C# Core development environment for developing code

using System; using System.IO; using System.Net; using System.Text;

namespace HttpPostTest { class Program { static void Main(string[] args) { var url = "http://10.13.2.177:8086/"; var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";

       HttpPost(url,txt);
    }

public static string HttpPost(string url, string content){ string result = ""; HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url); req.Method = "POST"; req.ContentType = "text/plain";

#region Add Post parameters
byte[] data = Encoding.UTF8.GetBytes(content);
req.ContentLength = data.Length;
using (Stream reqStream = req.GetRequestStream()){
    reqStream.Write(data, 0, data.Length);
    reqStream.Close();
}
#endregion

HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
Stream stream = resp.GetResponseStream();
//Get the response content
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
    result = reader.ReadToEnd();
    }
    return result;
}

} }

> If you encounter the problem of error when serving connection ***** body size exceeds the given limit, increase the max-body-bytes under mxgate.conf


### 6.4 MatrixGate HTTP API Golang Example

package main

import ( "bytes" "net/http" )

func PostDataToServer(URL string) error { data := public.testtable 1603777821|1|101|201|301 1603777822|2|102|202|302 1603777823|3|103|203|303 resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data))) if err != nil { return err } if resp.StatusCode != 200 { // Deal with the response body. return nil }

// Deal with the response body.
return nil

}

func main() { err := PostDataToServer("http://127.0.0.1:8086") if err != nil{ panic(err) }

}

## 7 MatrixGate Loading Special Types
### 7.1 MatrixGate loading CSV file example
- Create table csvtable in demo database

CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT) DISTRIBUTED BY (tagid);

- Edit the data load file data.csv, the content is as follows

1603777821|1|101|201|301 1603777822|2|102|202|302 1603777823|3|103|203|303

- Start mxgate, specify the source parameter to stdin, the target table is an existing csvtable, and the loading parallelism is 2

mxgate \ --source stdin \ --db-database demo \ --db-master-host 127.0.0.1 \ --db-master-port 5432 \ --db-user mxadmin \ --time-format unix-second \ --delimiter "|" \ --target csvtable \ --parallel 2 < data.csv

- Connect to the database to query whether the data is loaded successfully

demo=# SELECT * FROM csvtable ; time | tagid | c1 | c2 | c3 -----------------------+-------+-------------------------------------------------------------------------------------------------- 2020-10-27 05:50:23+08 | 3 | 103 | 203 | 303 2020-10-27 05:50:22+08 | 2 | 102 | 202 | 302 2020-10-27 05:50:21+08 | 1 | 101 | 201 | 301

(3 rows)

### 7.2 MatrixGate loading json field example

### 7.2.1 json
- Create table

create table json_test(id int, j json);

- Create data files
`~/json.csv`

1|"{""a"":10, ""b"":""xyz""}"

- load
Here we use the stdin mode as an example, and the other modes are the same.
The key is `--format csv`

mxgated \ --source stdin \ --db-database postgres \ --db-master-host 127.0.0.1 \ --db-master-port 7000 \ --db-user mxadmin \ --time-format raw \ --format csv \ --delimiter "|" \ --target json_test < ~/json.csv

- View loading data

postgres=# select * from json_test; id | j ----+----------------------------------------------------------------------------------------------------------------------------- 1 | {"a":10, "b":"xyz"} (1 row)

### 7.2.2 json array

- Create table

create table json_array_test(id int, j _json);

- Create data files
`~/json_array.csv`

1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"

- load

mxgate \ --source stdin \ --db-database postgres \ --db-master-host 127.0.0.1 \ --db-master-port 7000 \ --db-user mxadmin \ --time-format raw \ --format csv \ --delimiter "|" \ --target json_array_test < ~/json_array.csv

- verify

postgres=# select * from json_array_test ; id | j ----+----------------------------------------------------------------------------------------------------------------------------- 1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"} (1 row)

> Note: Because the json column contains special characters such as quotes, the --format parameter of mxgate must be csv

## 8 Observe mxgate operation indicators:
watch is a subcommand of mxgate, which uses a series of indicators to describe the operation of mxgate daemon.
There are two modes of watch:
- Real-time observation mode, print gate's indicators every 3 seconds in a sar-like format.
- Historical Observation Mode, you can specify any time period, any time period (such as every hour yesterday, every day last month, every month last year) to statistically import speed.

### 8.1 Real-time observation

mxgate watch

The mxgate run indicators will be collected every three seconds, and the output results are as follows
             Time          WCount          ICount        WSpeed/s        ISpeed/s  WBandWidth MB/S     BlocakItems

2022-04-28 15:20:58 14478858 14527011 2598081 2627887 2395 0 2022-04-28 15:21:01 22231035 22633254 2584059 2702081 2222 0 2022-04-28 15:21:04 30494310 30500874 2754425 2622540 3551 0 2022-04-28 15:21:07 38004210 38032956 2503300 2510694 2862 0 2022-04-28 15:21:10 46188696 46298223 2728162 2755089 2227 0 ...

The description of the above indicators can be obtained through the --info parameter

mxgate watch --info

By default, only speed indicators will be output. The time indicator can be observed through the --watch-lateency parameter to analyze problems.

mxgate watch --watch-latency

### 8.2 Historical data observation

mxgate watch --history

The average speed of each hour from the current time -24 hours to the current time will be calculated. The output result is as follows
            TIME RANGE                | SPEED/S  | BANDWIDTH MB/S  | BLOCK ITEMS

2022-04-28 16:00:00-2022-04-28 17:00:00 | 2208010 | 1254.48 | 0 2022-04-28 17:00:00-2022-04-28 18:00:00 | 1157920 | 1327.00 | 0 2022-04-28 18:00:00-2022-04-28 19:00:00 | 2228666 | 2162.32 | 0 2022-04-28 19:00:00-2022-04-28 20:00:00 | 1371092 | 2881.30 | 0 2022-04-28 20:00:00-2022-04-28 21:00:00 | 1575320 | 2608.20 | 0

Among them, SPEED/S, BANDWIDTH MB/S represents the speed and import bandwidth of the imported entry (MB/s in units),
BLOCK ITEMS represents the amount of data blocking in mxgate. This value will rise when the database consumption speed cannot keep up with the production speed of data sources (http, kafka, etc.).

You can add `--watch-start`, `--watch-end`, `--watch-duration` parameters to control the time interval and period of observation historical data.
For example

mxgate watch --history --watch-start '2022-03-27 00:00:00' --watch-end '2022-04-27 00:00:00' --watch-duration '168h'


Average import speeds per week (every 168h) from March 27 to April 27
Among them, `--watch-duration` supports three units: `h``` `m``

## 9 Update parallel writing related parameters without shutting down
mxgate supports modifying the relevant parameters of parallel loading without stopping the operation: "interval" and "stream-prepared". "interval" represents the working time for each write connection from mxgate to the database table, and "stream-prepared" represents the number of active write connections. In mxgate logic, there is only one write connection for the same database table to execute the transaction of writing data at the same time. Therefore, Each database table needs to have multiple connections to continuously execute its own write transactions in different time intervals, thereby ensuring high-speed and efficient writing of data. In this process, you can use the "interval" parameter to adjust the working time of each write connection, thereby improving the write rate of data and improving load performance in a targeted manner. Specific usage examples are as follows:
- mxgate set --stream-prepared-cli 3
 Set the number of write connections per table to 3
![mxgate](https://image.ymatrix.cn/jsdelivr-web/web/doc-images/tools/mxgate_4.6/set_stream_prepared_cli.png)
- mxgate get --stream-prepared-get Get the number of active write connections per table
![mxgate](https://image.ymatrix.cn/jsdelivr-web/web/doc-images/tools/mxgate_4.6/get_stream_prepared_get.png)
- mxgate set --job-interval 200 Set the time interval for write connections of all tables to 200ms
![mxgate](https://image.ymatrix.cn/jsdelivr-web/web/doc-images/tools/mxgate_4.6/set_job_interval.png)
- mxgate get --job-interval-get Get the current time interval for write connections to all tables
![mxgate](https://image.ymatrix.cn/jsdelivr-web/web/doc-images/tools/mxgate_4.6/get_job_interval_get.png)

> ***Notes!***  
For the above parameters, if you want to set or get the number of write connections or working hours for a specific table, then add "--job <name>" after the above command. Each transaction (job) corresponds to a database table. The job parameter structure consists of a schema name and a table name, which means that if your specific table is called "test_table" and the schema name is "public", then you need to add "-job public.test_table" after the existing command to complete the specification.


## 10 Update table structure without shutting down
During the data loading process, you suddenly find that driven by changing time series data sources, the previously set table structure is no longer applicable to the current scenario, and then you have the need to modify the table structure, which can be met by mxgate. This section will explain how mxgate performs a series of operations such as pausing data writing, reloading modified database table meta information, and restoring data writing without shutting down. The specific steps are as follows:



 * First, use the command "mxgate pause -X" to interrupt the write connection of all tables, preparing to modify the database table structure. Where the "-X" parameter is necessary, it will help interrupt the connection between mxgate and the database. If the connection is not interrupted, the database table cannot be modified. In addition to "-X", using the "-S" parameter can make the pause command synchronously wait for all connection interrupts to complete before returning.
 ![mxgate](https://image.ymatrix.cn/jsdelivr-web/web/doc-images/tools/mxgate_4.6/pause_X.png)

 * Secondly, after interrupting all write connections to the corresponding table, you can perform the modification of the structure of the database table corresponding to the database, such as adding several columns, deleting several columns, deleting existing tables, and recreating a new table with the same name.

> ***Notes!***  
The reconstructed table structure can be different, but the "table name" must be consistent.* Finally, use the command "mxgate resume -R" to restore the write connections of all tables, overloading the meta information of the data table. Where the "-R" parameter is required, "resume" and "-R" will combine to complete the reload operation.
![mxgate](https://image.ymatrix.cn/jsdelivr-web/web/doc-images/tools/mxgate_4.6/reload_laststep.png)


 * In particular, when multiple mxgate processes are running at the same time, the "-p" parameter is required to represent the process number of the corresponding mxgate process. All the above commands are the same.
 ![mxgate](https://image.ymatrix.cn/jsdelivr-web/web/doc-images/tools/mxgate_4.6/_p.png)


 > ***Notes!***  
 The prerequisite for executing the overloaded command is that all write connections to the corresponding table of mxgate must be suspended first, otherwise the following error will occur:
![mxgate](https://image.ymatrix.cn/jsdelivr-web/web/doc-images/tools/mxgate_4.6/Reload.png)