MatrixGate, сокращённо mxgate, — это высокопроизводительный сервер потоковой загрузки данных, расположенный в каталоге bin/mxgate установки MatrixDB. В настоящее время MatrixGate поддерживает прием данных через интерфейсы HTTP и STDIN, а также форматы данных TEXT и CSV.
Логика загрузки данных в MatrixGate показана ниже:

Создайте конфигурационный файл mxgate, указав целевую базу данных и таблицу:
mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf
Эта команда создает конфигурационный файл с именем mxgate.conf. Она позволяет пользователям настраивать загрузку данных для testtable и testtable2, а также поддерживает глобальные настройки по умолчанию для загрузки в другие таблицы.
При необходимости измените файл mxgate.conf (например, задайте разделители полей). Этот шаг можно пропустить, если используются настройки по умолчанию. В созданный конфигурационный файл включены параметры testtable и testtable2, как показано ниже:
[[job.target]]
# delimiter = "|"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable"
# null-as = ""
table = "public.testtable"
# time-format = "unix-second"
# use-auto-increment = true
[[job.target]]
# delimiter = "|"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable2"
# null-as = ""
table = "public.testtable2"
# time-format = "unix-second"
# use-auto-increment = true
Если в testtable используется "@" в качестве разделителя, а в testtable2 — "%", соответствующим образом обновите конфигурацию:
[[job.target]]
delimiter = "@"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable"
# null-as = ""
table = "public.testtable"
# time-format = "unix-second"
# use-auto-increment = true
[[job.target]]
delimiter = "%"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable2"
# null-as = ""
table = "public.testtable2"
# time-format = "unix-second"
# use-auto-increment = true
По умолчанию mxgate прослушивает порт 8086 для входящих данных. Это можно увидеть в параметре mxgate.conf внутри секции [source.http] как http-port = 8086. При необходимости вы можете изменить его на другой порт:
[source]
## Source plugin is the data entrance to MatrixGate
## Types restricted to: http
source = "http"
[source.http]
## Port of http push
# http-port = 8086
## Maximum request body size (after gzip)
## The server rejects requests with bodies exceeding this limit.
# max-body-bytes = 4194304
## The maximum number of concurrent HTTP connections to the server
## The server response with 503 after exceed this limit.
# max-concurrency = 40000
Запустите mxgate с конфигурационным файлом, подключитесь к базе данных demo и подготовьтесь к приему запросов на загрузку данных:
mxgate start --config mxgate.conf
Проверьте состояние фонового сервиса:
mxgate status
Остановите фоновый сервис:
mxgate stop
Принудительно остановите при возникновении таймаутов или других проблем:
mxgate stop --force
| Параметр | Значение по умолчанию | Описание |
|---|---|---|
| [database] | ||
--db-database |
postgres | Имя базы данных MatrixDB, к которой подключается MatrixGate |
--db-master-host |
локальное имя хоста | Имя хоста главного узла MatrixDB |
--db-master-port |
5432 | Номер порта главного узла MatrixDB |
--db-user |
текущий пользователь ОС | Имя пользователя для подключения к MatrixDB Примечание: этот пользователь должен иметь разрешение на создание внешних таблиц. Для несуперпользователей предоставьте права с помощью: alter user {username} CREATEEXTTABLE; |
--db-password |
пусто | Пароль для подключения к MatrixDB |
--db-max-conn |
10 | Максимальное количество соединений от MatrixGate к MatrixDB |
| [job] | ||
--allow-dynamic |
false | При значении true включается динамическое определение целевой таблицы на основе содержимого POST-данных (первая строка). Используйте только когда имя целевой таблицы неизвестно при запуске. Для фиксированных таблиц используйте --target, чтобы явно указать имена таблиц |
--delimiter |
| | Символ, используемый для разделения столбцов в каждой строке |
--error-handling |
accurate | Как обрабатывать некорректные строкиaccurate: пропускать недопустимые строки, записывать ошибки; остальные корректные строки в пакете продолжают обработкуlegacy: весь пакет отклоняется при любой ошибке |
--exclude-columns |
пусто | Список имен столбцов, исключаемых при загрузке. Обязательные столбцы должны соответствовать порядку определения таблицы. Примечание: автоинкрементные столбцы, пропущенные через --use-auto-increment, здесь указывать не нужно |
--format |
text | Формат входных данных: text или csv. text быстрее, но не допускает переводов строк в текстовых полях. csv более гибкий; текстовые поля должны быть заключены в кавычки |
--null-as |
пустая строка | Строковое представление значений NULL. По умолчанию — незаключённая в кавычки пустая строка. Если столбец NOT NULL содержит это значение, загрузка завершится ошибкой. Чтобы использовать \N, экранируйте обратный слэш: --null-as \\N |
--time-format |
unix-second | Единица измерения временной метки: unix-second|unix-ms|unix-nano|raw.MatrixGate предполагает, что первый столбец — временная метка Unix, и преобразует её в тип времени БД. Используйте raw, если временная метка не в первом столбце или уже отформатирована |
--upsert-key |
пусто | Ключ(и) для операций upsert. Целевая таблица должна иметь UNIQUE-ограничение, и все ключи ограничения должны быть указаны |
--use-auto-increment |
true | Пропускать ли автоинкрементные столбцы во входных данных и использовать системные значения |
--target |
schemaName.tableName | Имя целевой таблицы. Схема по умолчанию — public. Поддерживаются несколько таблиц: --target table1 --target table2 .... Без этого параметра используйте --allow-dynamic для динамического определения таблицы |
| [misc] | ||
--log-archive-hours |
72 | Журналы, не изменявшиеся дольше указанного периода (в часах), автоматически сжимаются |
--log-compress |
true | Глобальный переключатель включения сжатия журналов |
--log-dir |
/home/mxadmin/gpAdminLogs | Каталог для файлов журналов |
--log-max-archive-files |
0 | Максимальное количество сжатых файлов журналов для хранения. Самые старые удаляются при превышении. 0 означает отсутствие удаления |
--log-remove-after-days |
0 | Количество дней, после которых сжатые журналы удаляются. 0 означает, что удаление никогда не происходит |
--log-rotate-size-mb |
100 | Ротация файла журнала при превышении размера (в МБ); старый файл немедленно сжимается |
| [source] | ||
--source |
http | Тип источника данных: поддерживает http, stdin, kafka, transfer |
| [source][http] | ||
--http-port |
8086 | Порт HTTP, на который клиенты отправляют данные |
--max-body-bytes |
4194304 | Максимальный размер (в байтах) тела одного HTTP-запроса |
--max-concurrency |
40000 | Максимальное количество одновременных HTTP-соединений |
| [source][transfer] | ||
--src-host |
IP-адрес главного узла исходной базы данных | |
--src-port |
Порт главного узла исходной базы данных | |
--src-user |
Имя пользователя для подключения к исходной базе данных (рекомендуется суперпользователь) | |
--src-password |
Пароль | |
--src-schema |
Имя схемы исходной таблицы | |
--src-table |
Имя исходной таблицы | |
--src-sql |
SQL-фильтр для миграции данных | |
--compress |
Метод сжатия данных с исходных сегментных хостов: "" (пусто): без сжатия, обычный текст gzip: требует установленного gzip на исходных сегментах lz4: требует установленного lz4 на исходных сегментах Рекомендация: lz4 > gzip > без сжатия |
|
--port-base |
Базовый порт для передачи; диапазон начинается с 9129 | |
--local-ip |
IP-адрес, доступный из исходной базы данных | |
| [writer] | ||
--interval |
100ms | Интервал (в миллисекундах) между операциями массовой вставки |
--stream-prepared |
10 | Уровень параллелизма рабочих процессов вставки |
--use-gzip |
auto | Сжимать ли данные, отправляемые на сегменты: auto/yes/no |
--timing |
false | Включать информацию о времени выполнения в журнал для каждого оператора INSERT |
| Другое | ||
--help |
Отображение справки и списка параметров |
MatrixGate предоставляет HTTP API, позволяющий языкам программирования импортировать данные в MatrixDB по протоколу HTTP.
Формат протокола HTTP API MatrixGate
| Элемент протокола | Формат | Использование и пример |
|---|---|---|
| URL | http://mxgate-host:port | Адрес для подключения к mxgate |
| PATH | / | Поддерживается только /; суффиксы пути игнорируются |
| HTTP-метод | POST | На данный момент поддерживается только POST |
| HTTP-заголовок | Content-Encoding: gzip | Необязательно: сжатие тела запроса с помощью gzip |
| Content-Type: text/plain | Обязательно: MIME-тип | |
| Тело HTTP | SchemaName.TableName Timestamp|ID|C1|C2|...|Cn |
Первая строка указывает целевую таблицу (SchemaName необязательно, по умолчанию public). Последующие строки содержат данные временного ряда. Каждая строка соответствует одной строке в целевой таблице, столбцы разделены символом |, строки — символом \n. Первое поле — временная метка Unix в секундах (см. --time-format). Второе поле — TagID (целое число). Остальные поля соответствуют столбцам таблицы. Порядок столбцов в DDL таблицы должен соответствовать (Timestamp, TagID, C1, C2, ..., Cn) |
Коды ответов HTTP API MatrixGate
| Код | Значение | Примечания |
|---|---|---|
| 200 | StatusOK | Частичный успех: некоторые строки не прошли проверку формата. Тело ответа содержит подробности об ошибках, например:At line: 2missing data for column "c3" |
| 204 | StatusNoContent | Все данные успешно приняты MatrixGate |
| 400 | StatusBadRequest | Некорректный запрос: неверное тело POST, таблица не найдена, несоответствие заголовков и содержимого и т.д. |
| 405 | StatusMethodNotAllowed | Метод запроса не является POST |
| 500 | StatusInternalServerError | Ошибка на стороне базы данных; загрузка не удалась. Тело ответа содержит подробное сообщение об ошибке |
| 503 | StatusServiceUnavailable | MatrixGate отклонил запрос (например, достигнуто максимальное количество соединений, служба завершает работу) |
Создайте таблицу testtable в базе данных demo:
CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
Отредактируйте файл данных data.txt:
public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
Запустите mxgate с конфигурационным файлом:
mxgate --config mxgate.conf
Отправьте HTTP-запрос для загрузки данных:
curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
Выполните запрос к базе данных для проверки загрузки:
demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
date_part | time | tagid | c1 | c2 | c3
------------+------------------------+-------+-----+-----+-----
1603777821 | 2020-10-27 13:50:21+08 | 1 | 101 | 201 | 301
1603777822 | 2020-10-27 13:50:22+08 | 2 | 102 | 202 | 302
1603777823 | 2020-10-27 13:50:23+08 | 3 | 103 | 203 | 303
(3 rows)
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
public class MxgateExample {
public static void main(String[] args) throws Exception {
MxgateExample http = new MxgateExample();
http.sendingPostRequest();
}
// HTTP Post request
private void sendingPostRequest() throws Exception {
// mxgate listens on port 8086 of localhost
String url = "http://localhost:8086/";
URL obj = new URL(url);
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
// Setting basic post request
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type","text/plain");
String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";
con.setDoOutput(true);
DataOutputStream wr = new DataOutputStream(con.getOutputStream());
// Encode Chinese characters using UTF-8
wr.write(postJsonData.toString().getBytes("UTF-8"));
wr.flush();
wr.close();
int responseCode = con.getResponseCode();
System.out.println("Sending 'POST' request to URL : " + url);
System.out.println("Post Data : " + postJsonData);
System.out.println("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String output;
StringBuffer response = new StringBuffer();
while ((output = in.readLine()) != null) {
response.append(output);
}
in.close();
System.out.println(response.toString());
}
}
import http.client
class MxgateExample(object):
def __init__(self):
# mxgate listens on port 8086 of localhost
self.url = "localhost:8086"
self.postData = "public.testtable\n/" \
"1603777821|1|101|201|301\n/" \
"1603777822|2|102|202|302\n/" \
"1603777823|3|103|203|303"
self.headers = {"Content-Type": "text/plain"}
# HTTP Post request
def sending_post_request(self):
conn = http.client.HTTPConnection(self.url)
conn.request("POST", "/", self.postData, self.headers)
response = conn.getresponse()
response_code = response.getcode()
print(f"Sending 'POST' request to URL : {self.url}")
print(f"Post Data : {self.postData}")
print(f"Response Code : {response_code}")
output = response.read()
print(output)
if __name__ == '__main__':
gate_post = MxgateExample()
gate_post.sending_post_request()
Рекомендуется разработка в среде C# Core
using System;
using System.IO;
using System.Net;
using System.Text;
namespace HttpPostTest
{
class Program
{
static void Main(string[] args)
{
var url = "http://10.13.2.177:8086/";
var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";
HttpPost(url,txt);
}
public static string HttpPost(string url, string content){
string result = "";
HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url);
req.Method = "POST";
req.ContentType = "text/plain";
#region 添加Post 参数
byte[] data = Encoding.UTF8.GetBytes(content);
req.ContentLength = data.Length;
using (Stream reqStream = req.GetRequestStream()){
reqStream.Write(data, 0, data.Length);
reqStream.Close();
}
#endregion
HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
Stream stream = resp.GetResponseStream();
//获取响应内容
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
result = reader.ReadToEnd();
}
return result;
}
}
}
Если вы столкнулись с ошибкой "error when serving connection ***** body size exceeds the given limit", увеличьте значение
max-body-bytesвmxgate.conf.
package main
import (
"bytes"
"net/http"
)
func PostDataToServer(URL string) error {
data := `public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
`
resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
if err != nil {
return err
}
if resp.StatusCode != 200 {
// Обработка тела ответа.
return nil
}
// Обработка тела ответа.
return nil
}
func main() {
err := PostDataToServer("http://127.0.0.1:8086")
if err != nil{
panic(err)
}
}
Создайте таблицу csvtable в базе данных demo:
CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
Отредактируйте файл данных data.csv:
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
Запустите mxgate с источником stdin, направляя данные в существующую таблицу csvtable, используя 2 параллельных потока:
mxgate \
--source stdin \
--db-database demo \
--db-master-host 127.0.0.1 \
--db-master-port 5432 \
--db-user mxadmin \
--time-format unix-second \
--delimiter "|" \
--target csvtable \
--parallel 2 < data.csv
Выполните запрос к базе данных, чтобы подтвердить успешную загрузку:
demo=# SELECT * FROM csvtable ;
time | tagid | c1 | c2 | c3
------------------------+-------+-----+-----+-----
2020-10-27 05:50:23+08 | 3 | 103 | 203 | 303
2020-10-27 05:50:22+08 | 2 | 102 | 202 | 302
2020-10-27 05:50:21+08 | 1 | 101 | 201 | 301
(3 rows)
Создайте таблицу
create table json_test(id int, j json);
Создайте файл данных
~/json.csv
1|"{""a"":10, ""b"":""xyz""}"
Загрузите данные
В качестве примера используется режим stdin; другие режимы аналогичны.
Ключевой момент — --format csv
mxgated \
--source stdin \
--db-database postgres \
--db-master-host 127.0.0.1 \
--db-master-port 7000 \
--db-user mxadmin \
--time-format raw \
--format csv \
--delimiter "|" \
--target json_test < ~/json.csv
Просмотрите загруженные данные
postgres=# select * from json_test;
id | j
----+-----------------------
1 | {"a":10, "b":"xyz"}
(1 row)
Создайте таблицу
create table json_array_test(id int, j _json);
Создайте файл данных
~/json_array.csv
1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"
Загрузите данные
mxgate \
--source stdin \
--db-database postgres \
--db-master-host 127.0.0.1 \
--db-master-port 7000 \
--db-user mxadmin \
--time-format raw \
--format csv \
--delimiter "|" \
--target json_array_test < ~/json_array.csv
Проверьте данные
postgres=# select * from json_array_test ;
id | j
----+---------------------------------------------
1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"}
(1 row)
Примечание: поскольку столбец JSON содержит специальные символы, такие как кавычки, параметр
--formatв mxgate должен быть установлен вcsv.