MatrixGate, сокращённо mxgate — это высокопроизводительный сервер потоковой загрузки данных, расположенный в каталоге установки MatrixDB по пути bin/mxgate. В настоящее время MatrixGate поддерживает прием данных через интерфейсы HTTP и STDIN, а также форматы данных TEXT и CSV.
Логика загрузки данных в MatrixGate показана ниже:
mxgate) пакетами малого размера с параллельной обработкой. mxgate эффективно взаимодействует с главным процессом MatrixDB для обмена информацией о транзакциях и управления. 
mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf
Эта команда создает конфигурационный файл с именем mxgate.conf. Он позволяет пользователям настраивать поведение при загрузке данных для testtable и testtable2, а также поддерживает глобальные настройки по умолчанию для загрузки в другие таблицы.
mxgate.conf (например, задайте разделители полей). Этот шаг можно пропустить, если используются значения по умолчанию. В созданный конфигурационный файл включены такие параметры: [[job.target]]
# delimiter = "|"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable"
# null-as = ""
table = "public.testtable"
# time-format = "unix-second"
# use-auto-increment = true
[[job.target]]
# delimiter = "|"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable2"
# null-as = ""
table = "public.testtable2"
# time-format = "unix-second"
# use-auto-increment = true
Если testtable использует @ в качестве разделителя, а testtable2 использует %, соответствующим образом обновите конфигурацию:
[[job.target]]
delimiter = "@"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable"
# null-as = ""
table = "public.testtable"
# time-format = "unix-second"
# use-auto-increment = true
[[job.target]]
delimiter = "%"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable2"
# null-as = ""
table = "public.testtable2"
# time-format = "unix-second"
# use-auto-increment = true
По умолчанию mxgate прослушивает порт 8086. Это можно увидеть в параметре mxgate.conf внутри секции [source.http]:
[source]
## Source plugin is the data entrance to MatrixGate
## Types restricted to: http
source = "http"
[source.http]
## Port of http push
# http-port = 8086
## Maximum request body size (after gzip)
## The server rejects requests with bodies exceeding this limit.
# max-body-bytes = 4194304
## The maximum number of concurrent HTTP connections to the server
## The server response with 503 after exceed this limit.
# max-concurrency = 40000
При необходимости вы можете изменить номер порта.
demo, чтобы подготовиться к загрузке данных:mxgate start --config mxgate.conf
mxgate status
mxgate stop
Для принудительной остановки в случае таймаута или других проблем:
mxgate stop --force
| Параметр | Значение по умолчанию | Описание |
|---|---|---|
--db-database |
postgres | Имя базы данных, к которой подключается MatrixGate в MatrixDB |
--db-master-host |
localhost hostname | Имя хоста главного узла MatrixDB |
--db-master-port |
5432 | Номер порта главного узла MatrixDB |
--db-user |
текущее имя пользователя ОС | Имя пользователя, используемое MatrixGate для подключения к MatrixDB Примечание: этот пользователь должен иметь разрешение на создание внешних таблиц. Для несуперпользователей предоставьте права с помощью: alter user {username} CREATEEXTTABLE; |
--db-password |
пусто | Пароль пользователя MatrixDB |
--db-max-conn |
10 | Максимальное количество соединений от MatrixGate к MatrixDB |
--interval |
100 миллисекунд | Интервал, с которым MatrixGate выполняет пакетную загрузку данных |
--source |
http | Тип источника данных; поддерживаются http и stdin |
--target |
schemaName.tableName | Целевая таблица. Указание схемы необязательно, по умолчанию используется public. Можно указать несколько таблиц с помощью нескольких опций --target. Если параметр не указан, используйте --allow-dynamic, чтобы разрешить динамическое определение таблицы на основе входящих данных. |
--allow-dynamic |
false | При установке в true включается динамическое сопоставление данных POST с целевыми таблицами на основе первой строки входных данных. Используйте только тогда, когда целевые таблицы неизвестны при запуске. Для фиксированных целей укажите явно с помощью --target. |
--format |
text | Формат входных данных: text или csv. Формат text быстрее, но не поддерживает символы новой строки внутри полей. Формат csv более гибкий; строковые поля должны быть заключены в двойные кавычки. |
--delimiter |
| | Символ, используемый для разделения столбцов в каждой строке |
--null-as |
пустая строка | Строка, представляющая значения NULL. По умолчанию — неэкранированная пустая строка. Если столбец имеет ограничение NOT NULL и получает значение null, загрузка завершится ошибкой. Чтобы использовать \N как null, экранируйте обратную косую черту: --null-as \\N |
--time-format |
unix-second | Единица измерения времени: unix-second, unix-ms, unix-nano или raw.MatrixGate предполагает, что первый столбец — это временная метка Unix, и преобразует её в формат времени базы данных. Используйте raw, если временная метка не в первом столбце или уже отформатирована. |
--use-auto-increment |
true | Пропускать ли столбцы с автоинкрементом при загрузке и использовать системные значения |
--exclude-columns |
пусто | Список столбцов, исключаемых при загрузке. Оставшиеся столбцы должны соответствовать порядку определения таблицы. Столбцы с автоинкрементом, пропущенные через --use-auto-increment, здесь указывать не нужно. |
--upsert-key |
пусто | Ключ(и) для операций upsert. Целевая таблица должна иметь уникальное ограничение, и все ключи ограничения должны быть указаны. |
--help |
— | Отображение справки и списка параметров |
MatrixGate предоставляет HTTP API, позволяющее загружать данные в MatrixDB из любого языка программирования.
| Тип протокола | Формат | Использование и пример |
|---|---|---|
| URL | http://mxgate-host:port |
Адрес для подключения к mxgate |
| PATH | / |
Поддерживается только корневой путь /; любые суффиксы игнорируются |
| HTTP-метод | POST | Поддерживается только метод POST |
| HTTP-заголовок | Content-Encoding: gzip |
Поддерживает сжатие тела запроса с помощью gzip |
Content-Type: text/plain |
Поддерживается только text/plain |
|
| Тело HTTP | SchemaName.TableNameTimestamp\|ID\|C1\|C2\|...\|Cn |
Первая строка указывает целевую таблицу. Указание схемы необязательно (по умолчанию public). Последующие строки содержат данные временных рядов. Каждая строка соответствует одной строке в целевой таблице, столбцы разделяются символом |, строки — символом \n. Первое поле — временная метка Unix в секундах (см. --time-format). Второе поле — TagID (целое число). Остальные поля соответствуют столбцам таблицы. Рекомендуется, чтобы DDL целевой таблицы следовал порядку столбцов: (Timestamp, TagID, C1, C2, ..., Cn) |
| Код ответа | Значение | Примечания |
|---|---|---|
| 204 | StatusNoContent | Данные успешно загружены в MatrixGate |
| 400 | StatusBadRequest | Некорректный запрос: например, неверный формат тела POST, таблица не найдена, несоответствие кодировки содержимого |
| 405 | StatusMethodNotAllowed | Метод запроса не является POST |
| 500 | StatusInternalServerError | Ошибка на стороне базы данных; загрузка не удалась. В теле ответа содержится подробное сообщение об ошибке |
| 503 | StatusServiceUnavailable | MatrixGate отклоняет запрос: например, превышено максимальное количество соединений, сервис завершает работу |
testtable в базе данных demo:CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
data.txt:public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
mxgate --config mxgate.conf
curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
date_part | time | tagid | c1 | c2 | c3
------------+------------------------+-------+-----+-----+-----
1603777821 | 2020-10-27 13:50:21+08 | 1 | 101 | 201 | 301
1603777822 | 2020-10-27 13:50:22+08 | 2 | 102 | 202 | 302
1603777823 | 2020-10-27 13:50:23+08 | 3 | 103 | 203 | 303
(3 rows)
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
public class MxgateExample {
public static void main(String[] args) throws Exception {
MxgateExample http = new MxgateExample();
http.sendingPostRequest();
}
// HTTP Post request
private void sendingPostRequest() throws Exception {
// mxgate listens on port 8086 of localhost
String url = "http://localhost:8086/";
URL obj = new URL(url);
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
// Setting basic post request
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type","text/plain");
String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";
con.setDoOutput(true);
DataOutputStream wr = new DataOutputStream(con.getOutputStream());
// Encode using UTF-8 if data contains Chinese characters
wr.write(postJsonData.toString().getBytes("UTF-8"));
wr.flush();
wr.close();
int responseCode = con.getResponseCode();
System.out.println("Sending 'POST' request to URL : " + url);
System.out.println("Post Data : " + postJsonData);
System.out.println("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String output;
StringBuffer response = new StringBuffer();
while ((output = in.readLine()) != null) {
response.append(output);
}
in.close();
System.out.println(response.toString());
}
}
import http.client
class MxgateExample(object):
def __init__(self):
# mxgate listens on port 8086 of localhost
self.url = "localhost:8086"
self.postData = "public.testtable\n/" \
"1603777821|1|101|201|301\n/" \
"1603777822|2|102|202|302\n/" \
"1603777823|3|103|203|303"
self.headers = {"Content-Type": "text/plain"}
# HTTP Post request
def sending_post_request(self):
conn = http.client.HTTPConnection(self.url)
conn.request("POST", "/", self.postData, self.headers)
response = conn.getresponse()
response_code = response.getcode()
print(f"Sending 'POST' request to URL : {self.url}")
print(f"Post Data : {self.postData}")
print(f"Response Code : {response_code}")
output = response.read()
print(output)
if __name__ == '__main__':
gate_post = MxgateExample()
gate_post.sending_post_request()
Рекомендуется разработка в среде C# Core
using System;
using System.IO;
using System.Net;
using System.Text;
namespace HttpPostTest
{
class Program
{
static void Main(string[] args)
{
var url = "http://10.13.2.177:8086/";
var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";
HttpPost(url,txt);
}
public static string HttpPost(string url, string content){
string result = "";
HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url);
req.Method = "POST";
req.ContentType = "text/plain";
#region Add Post Parameters
byte[] data = Encoding.UTF8.GetBytes(content);
req.ContentLength = data.Length;
using (Stream reqStream = req.GetRequestStream()){
reqStream.Write(data, 0, data.Length);
reqStream.Close();
}
#endregion
HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
Stream stream = resp.GetResponseStream();
// Get response content
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
result = reader.ReadToEnd();
}
return result;
}
}
}
Если возникает ошибка "error when serving connection ***** body size exceeds the given limit", увеличьте значение
max-body-bytesвmxgate.conf.
package main
import (
"bytes"
"net/http"
)
func PostDataToServer(URL string) error {
data := `public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
`
resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
if err != nil {
return err
}
if resp.StatusCode != 200 {
// Обработка тела ответа
return nil
}
// Обработка тела ответа
return nil
}
func main() {
err := PostDataToServer("http://127.0.0.1:8086")
if err != nil{
panic(err)
}
}
csvtable в базе данных demo:CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
data.csv:1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
stdin, направляя данные в существующую таблицу csvtable, с уровнем параллелизма 2:mxgate \
--source stdin \
--db-database demo \
--db-master-host 127.0.0.1 \
--db-master-port 5432 \
--db-user mxadmin \
--time-format unix-second \
--delimiter "|" \
--target csvtable \
--parallel 2 < data.csv
demo=# SELECT * FROM csvtable ;
time | tagid | c1 | c2 | c3
------------------------+-------+-----+-----+-----
2020-10-27 05:50:23+08 | 3 | 103 | 203 | 303
2020-10-27 05:50:22+08 | 2 | 102 | 202 | 302
2020-10-27 05:50:21+08 | 1 | 101 | 201 | 301
(3 rows)
create table json_test(id int, j json);
1|"{""a"":10, ""b"":""xyz""}"
stdin (другие режимы аналогичны):--format csv из-за наличия специальных символов в JSONmxgated \
--source stdin \
--db-database postgres \
--db-master-host 127.0.0.1 \
--db-master-port 7000 \
--db-user mxadmin \
--time-format raw \
--format csv \
--delimiter "|" \
--target json_test < ~/json.csv
postgres=# select * from json_test;
id | j
----+-----------------------
1 | {"a":10, "b":"xyz"}
(1 row)
create table json_array_test(id int, j _json);
1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"
mxgate \
--source stdin \
--db-database postgres \
--db-master-host 127.0.0.1 \
--db-master-port 7000 \
--db-user mxadmin \
--time-format raw \
--format csv \
--delimiter "|" \
--target json_array_test < ~/json_array.csv
postgres=# select * from json_array_test ;
id | j
----+---------------------------------------------
1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"}
(1 row)
Примечание: поскольку столбцы JSON содержат кавычки и другие специальные символы, параметр
--formatдолжен быть установлен в значениеcsv.