MatrixGate, сокращённо mxgate — это высокопроизводительный сервер потоковой загрузки данных, расположенный в каталоге установки MatrixDB по адресу bin/mxgate. В настоящее время MatrixGate поддерживает прием данных через интерфейсы HTTP и STDIN, а также форматы данных TEXT и CSV.
Логика загрузки данных в MatrixGate показана ниже:
mxgate) пакетами малого размера с параллельной обработкой. mxgate эффективно взаимодействует с главным процессом MatrixDB для обмена информацией о транзакциях и управления. 
mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf
Эта команда создает конфигурационный файл с именем mxgate.conf. Он позволяет задать пользовательские параметры загрузки для testtable и testtable2, а также включает глобальные параметры по умолчанию для загрузки в другие таблицы.
mxgate.conf (например, установите разделители полей). Этот шаг можно пропустить, если используются значения по умолчанию. В созданный конфигурационный файл включены такие записи: [[job.target]]
# delimiter = "|"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable"
# null-as = ""
table = "public.testtable"
# time-format = "unix-second"
# use-auto-increment = true
[[job.target]]
# delimiter = "|"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable2"
# null-as = ""
table = "public.testtable2"
# time-format = "unix-second"
# use-auto-increment = true
Если testtable использует @ в качестве разделителя, а testtable2 использует %, соответствующим образом обновите конфигурацию:
[[job.target]]
delimiter = "@"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable"
# null-as = ""
table = "public.testtable"
# time-format = "unix-second"
# use-auto-increment = true
[[job.target]]
delimiter = "%"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable2"
# null-as = ""
table = "public.testtable2"
# time-format = "unix-second"
# use-auto-increment = true
По умолчанию mxgate прослушивает порт 8086. Это можно увидеть в параметре mxgate.conf внутри секции [source.http]:
[source]
## Source plugin is the data entrance to MatrixGate
## Types restricted to: http
source = "http"
[source.http]
## Port of http push
# http-port = 8086
## Maximum request body size (after gzip)
## The server rejects requests with bodies exceeding this limit.
# max-body-bytes = 4194304
## The maximum number of concurrent HTTP connections to the server
## The server response with 503 after exceed this limit.
# max-concurrency = 40000
Чтобы изменить порт прослушивания, отредактируйте значение http-port.
demo, чтобы подготовиться к загрузке данных:mxgate start --config mxgate.conf
mxgate status
mxgate stop
Для принудительной остановки в случае таймаута или других проблем:
mxgate stop --force
| Параметр | Значение по умолчанию | Описание |
|---|---|---|
| [general] | ||
--job-interval-get |
false | Получить интервал заданий для операций записи |
--job-list |
false | Перечислить все задания записи |
--job-state |
false | Получить состояние всех заданий записи |
--stream-prepared-get |
11 | Получить количество активных соединений записи в транзакции |
--stream-status-get |
false | Получить статус соединений записи в транзакции |
--stream-prepared-cli int |
0 | Количество активных соединений записи в транзакции |
--pause |
false | Приостановить запись данных |
--resume |
false | Возобновить приостановленную запись |
| [database] | ||
--db-database |
postgres | Имя целевой базы данных MatrixDB |
--db-master-host |
localhost hostname | Имя хоста главного узла MatrixDB |
--db-master-port |
5432 | Порт главного узла MatrixDB |
--db-user |
текущий пользователь ОС | Имя пользователя для подключения к MatrixDB Примечание: этот пользователь должен иметь разрешение на создание внешних таблиц. Для несуперпользователей предоставьте права с помощью: GRANT CREATE ON DATABASE demo TO username; |
--db-password |
пусто | Пароль пользователя MatrixDB |
--db-max-conn |
10 | Максимальное количество соединений от MatrixGate к MatrixDB |
| [job] | ||
--allow-dynamic |
false | При значении true включает динамическое сопоставление таблиц на основе содержимого POST-запроса (первая строка). Используйте только когда целевая таблица неизвестна при запуске. Для известных таблиц используйте --target. |
--delimiter |
| | Разделитель полей в каждой строке |
--error-handling |
accurate | Режим обработки ошибок:accurate: пропускать некорректные строки, записывать ошибки в лог, продолжать обработку.legacy: отклонять весь пакет при любой ошибке. |
--exclude-columns |
пусто | Столбцы, исключенные из входных данных. Входные столбцы должны соответствовать порядку столбцов таблицы для включенных полей. Автоинкрементные столбцы, пропущенные через --use-auto-increment, здесь указывать не нужно. |
--format |
text | Формат данных: text или csv. Формат text быстрее, но не допускает переводов строк в текстовых полях. Формат csv более надёжен; текстовые поля должны быть заключены в кавычки. |
--null-as |
пустая строка | Строка, представляющая значения NULL. По умолчанию — незаключённая в кавычки пустая строка. Если столбец имеет ограничение NOT NULL и получает null, загрузка завершится с ошибкой. Чтобы использовать \N, экранируйте обратный слэш: --null-as \\N. |
--time-format |
unix-second | Единица измерения времени: unix-second, unix-ms, unix-nano или raw. По умолчанию MatrixGate считает первый столбец временной меткой Unix. Используйте raw, если временная метка не в первом столбце или уже в формате БД. |
--upsert-key |
пусто | Ключ(и) для операций upsert. Таблица должна иметь уникальное ограничение (UNIQUE) по указанным ключам. |
--deduplicate-key |
пусто | Аналогично --upsert-key, но обновляет только поля со значением NULL. Новые значения игнорируются, если старое значение не NULL. Взаимоисключающий с --upsert-key. |
--use-auto-increment |
true | Пропускать автоинкрементные столбцы во входных данных и использовать системные значения. |
--target |
schemaName.tableName | Имя целевой таблицы. Схема по умолчанию — public. Поддерживается несколько таблиц: --target table1 --target table2 .... Если опущено, используйте --allow-dynamic для динамического определения таблицы. |
| [misc] | ||
--log-archive-hours |
72 | Часы, после которых неизменённые файлы журналов сжимаются |
--log-compress |
true | Включить автоматическое сжатие журналов |
--log-dir |
/home/mxadmin/gpAdminLogs | Каталог журналов |
--log-max-archive-files |
0 | Максимальное количество архивных журналов для хранения. Самые старые удаляются при превышении. 0 означает отсутствие удаления. |
--log-remove-after-days |
0 | Дни после сжатия, по истечении которых файлы журналов удаляются. 0 означает, что удаление никогда не происходит. |
--log-rotate-size-mb |
100 | Ротация файла журнала при превышении указанного размера (в МБ) |
| [source] | ||
--source |
http | Тип источника данных: http, stdin, kafka, transfer |
| [source][http] | ||
--http-port |
8086 | Порт HTTP для приема данных |
--max-body-bytes |
4194304 | Максимальный размер тела HTTP-запроса (в байтах) |
--max-concurrency |
40000 | Максимальное количество одновременных HTTP-соединений |
--request-timeout |
0 | Таймаут запроса в миллисекундах. 0 означает отсутствие таймаута. При ненулевом значении по истечении времени возвращается HTTP 408. |
--disable-keep-alive |
false | Закрывать соединение после каждого HTTP-запроса |
--http-debug |
false | Включить подробное диагностическое логирование HTTP |
| [source][transfer] | ||
--src-host |
— | IP-адрес главного узла исходной базы данных |
--src-port |
— | Порт главного узла исходной базы данных |
--src-user |
— | Имя пользователя для подключения к источнику (рекомендуется суперпользователь) |
--src-password |
— | Пароль для подключения к источнику |
--src-schema |
— | Схема исходной таблицы |
--src-table |
— | Имя исходной таблицы |
--src-sql |
— | SQL-фильтр для миграции данных |
--compress |
— | Метод сжатия при передаче: "" (без сжатия) gzip (требуется установленный gzip на сегментах)lz4 (требуется установленный lz4 на сегментах)Рекомендуется: lz4 > gzip > none |
--port-base |
— | Базовый порт для передачи (диапазон начинается с 9129) |
--local-ip |
— | Локальный IP-адрес, доступный из исходной базы данных |
| [writer] | ||
--interval |
100ms | Интервал пакетной записи (в миллисекундах) |
--stream-prepared |
10 | Уровень параллелизма рабочих процессов вставки |
--use-gzip |
auto | Сжимать ли данные, отправляемые на сегменты: auto, yes или no |
--max-seg-conn |
128 | Количество соединений с сегментами, используемых внешними таблицами для получения данных. Увеличение потребляет больше сетевых ресурсов. |
--timing |
false | Включать информацию о времени выполнения для каждой операции INSERT в логах |
--insert-timeout |
0 | Таймаут выполнения INSERT (в мс). 0 означает отсутствие таймаута. При ненулевом значении операция прерывается по истечении указанного времени. |
| Другое | ||
--help |
— | Показать справку и список параметров |
MatrixGate предоставляет HTTP API, позволяющее загружать данные в MatrixDB из любого языка программирования.
| Поле | Формат | Использование и пример |
|---|---|---|
| URL | http://mxgate-host:port |
Адрес для подключения к mxgate |
| PATH | / |
Поддерживается только корневой путь /; любые суффиксы игнорируются |
| HTTP-метод | POST | Поддерживается только POST для загрузки данных |
| HTTP-заголовок | Content-Encoding: gzip |
Поддерживает сжатие тела запроса с помощью gzip |
Content-Type: text/plain |
Поддерживается только text/plain |
|
| Тело HTTP | SchemaName.TableNameTimestamp\|ID\|C1\|C2\|...\|Cn |
Первая строка указывает целевую таблицу. Схему можно опустить (по умолчанию public). Последующие строки содержат данные временных рядов. Каждая строка соответствует одной строке таблицы. Поля разделяются |, строки — \n. Первое поле — временная метка Unix (в секундах), см. --time-format. Второе поле — TagID (целое число). Остальные поля сопоставляются со столбцами таблицы. Рекомендуемый DDL таблицы следует порядку (Timestamp, TagID, C1, C2, ..., Cn). |
| Код | Значение | Примечания |
|---|---|---|
| 200 | StatusOK | Частичный успех. Тело ответа может содержать детали ошибок для неудачных строк, например:{"error":"invalid format","line":2}{"error":"column mismatch","line":3} |
| 204 | StatusNoContent | Все данные успешно загружены |
| 400 | StatusBadRequest | Некорректный запрос: неверное тело POST, таблица не найдена, несоответствие заголовка и содержимого и т.д. |
| 405 | StatusMethodNotAllowed | Использован метод HTTP, отличный от POST |
| 408 | StatusTimeout | Превышен таймаут запроса |
| 500 | StatusInternalServerError | Ошибка на стороне базы данных; загрузка не удалась. Тело ответа содержит подробное сообщение об ошибке. |
| 503 | StatusServiceUnavailable | Сервис отклонил запрос: слишком много соединений, завершение работы и т.д. |
testtable в базе данных demo:CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
data.txt:public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
mxgate --config mxgate.conf
curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
date_part | time | tagid | c1 | c2 | c3
------------+------------------------+-------+-----+-----+-----
1603777821 | 2020-10-27 13:50:21+08 | 1 | 101 | 201 | 301
1603777822 | 2020-10-27 13:50:22+08 | 2 | 102 | 202 | 302
1603777823 | 2020-10-27 13:50:23+08 | 3 | 103 | 203 | 303
(3 rows)
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
public class MxgateExample {
public static void main(String[] args) throws Exception {
MxgateExample http = new MxgateExample();
http.sendingPostRequest();
}
// HTTP Post request
private void sendingPostRequest() throws Exception {
// mxgate listens on port 8086 of localhost
String url = "http://localhost:8086/";
URL obj = new URL(url);
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
// Setting basic post request
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type","text/plain");
String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";
con.setDoOutput(true);
DataOutputStream wr = new DataOutputStream(con.getOutputStream());
// Encode using UTF-8 if data contains Chinese characters
wr.write(postJsonData.toString().getBytes("UTF-8"));
wr.flush();
wr.close();
int responseCode = con.getResponseCode();
System.out.println("Sending 'POST' request to URL : " + url);
System.out.println("Post Data : " + postJsonData);
System.out.println("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String output;
StringBuffer response = new StringBuffer();
while ((output = in.readLine()) != null) {
response.append(output);
}
in.close();
System.out.println(response.toString());
}
}
import http.client
class MxgateExample(object):
def __init__(self):
# mxgate listens on port 8086 of localhost
self.url = "localhost:8086"
self.postData = "public.testtable\n/" \
"1603777821|1|101|201|301\n/" \
"1603777822|2|102|202|302\n/" \
"1603777823|3|103|203|303"
self.headers = {"Content-Type": "text/plain"}
# HTTP Post request
def sending_post_request(self):
conn = http.client.HTTPConnection(self.url)
conn.request("POST", "/", self.postData, self.headers)
response = conn.getresponse()
response_code = response.getcode()
print(f"Sending 'POST' request to URL : {self.url}")
print(f"Post Data : {self.postData}")
print(f"Response Code : {response_code}")
output = response.read()
print(output)
if __name__ == '__main__':
gate_post = MxgateExample()
gate_post.sending_post_request()
Рекомендуется использовать среду разработки C# Core.
using System;
using System.IO;
using System.Net;
using System.Text;
namespace HttpPostTest
{
class Program
{
static void Main(string[] args)
{
var url = "http://10.13.2.177:8086/";
var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";
HttpPost(url,txt);
}
public static string HttpPost(string url, string content){
string result = "";
HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url);
req.Method = "POST";
req.ContentType = "text/plain";
#region Add Post Parameters
byte[] data = Encoding.UTF8.GetBytes(content);
req.ContentLength = data.Length;
using (Stream reqStream = req.GetRequestStream()){
reqStream.Write(data, 0, data.Length);
reqStream.Close();
}
#endregion
HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
Stream stream = resp.GetResponseStream();
// Get response content
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
result = reader.ReadToEnd();
}
return result;
}
}
}
Если вы столкнулись с
error when serving connection ***** body size exceeds the given limit, увеличьте значениеmax-body-bytesвmxgate.conf.
package main
import (
"bytes"
"net/http"
)
func PostDataToServer(URL string) error {
data := `public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
`
resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
if err != nil {
return err
}
if resp.StatusCode != 200 {
// Обработка тела ответа
return nil
}
// Обработка тела ответа
return nil
}
func main() {
err := PostDataToServer("http://127.0.0.1:8086")
if err != nil{
panic(err)
}
}
Создайте таблицу csvtable в базе данных demo:
CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
Отредактируйте файл данных data.csv со следующим содержимым:
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
Запустите mxgate, установите источник stdin, целевую таблицу как существующую csvtable и уровень параллелизма 2:
mxgate \
--source stdin \
--db-database demo \
--db-master-host 127.0.0.1 \
--db-master-port 5432 \
--db-user mxadmin \
--time-format unix-second \
--delimiter "|" \
--target csvtable \
--parallel 2 < data.csv
Подключитесь к базе данных и проверьте успешность загрузки данных:
demo=# SELECT * FROM csvtable ;
time | tagid | c1 | c2 | c3
------------------------+-------+-----+-----+-----
2020-10-27 05:50:23+08 | 3 | 103 | 203 | 303
2020-10-27 05:50:22+08 | 2 | 102 | 202 | 302
2020-10-27 05:50:21+08 | 1 | 101 | 201 | 301
(3 rows)
### 7.2 Пример загрузки полей JSON в MatrixGate
#### 7.2.1 json
- Создайте таблицу:
```sql
create table json_test(id int, j json);
Подготовьте файл данных:
1|"{""a"":10, ""b"":""xyz""}"
Загрузите данные
В качестве примера используется режим stdin; другие режимы аналогичны.
Ключевой момент — использование параметра --format csv.
mxgated \
--source stdin \
--db-database postgres \
--db-master-host 127.0.0.1 \
--db-master-port 7000 \
--db-user mxadmin \
--time-format raw \
--format csv \
--delimiter "|" \
--target json_test < ~/json.csv
Проверьте загруженные данные:
postgres=# select * from json_test;
id | j
----+-----------------------
1 | {"a":10, "b":"xyz"}
(1 row)
Создайте таблицу:
create table json_array_test(id int, j _json);
Подготовьте файл данных:
1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"
Загрузите данные:
mxgate \
--source stdin \
--db-database postgres \
--db-master-host 127.0.0.1 \
--db-master-port 7000 \
--db-user mxadmin \
--time-format raw \
--format csv \
--delimiter "|" \
--target json_array_test < ~/json_array.csv
Проверьте результат:
postgres=# select * from json_array_test ;
id | j
----+---------------------------------------------
1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"}
(1 row)
Примечание: Поскольку столбцы JSON содержат специальные символы, такие как кавычки, параметр
--formatmxgate должен быть установлен в значениеcsv.
watch — это подкоманда mxgate, отображающая различные метрики, описывающие состояние демона mxgate.
Поддерживает два режима:
sar. mxgate watch
Собирает метрики времени выполнения mxgate каждые три секунды. Пример вывода:
Time WCount ICount WSpeed/s ISpeed/s WBandWidth MB/S BlocakItems
2022-04-28 15:20:58 14478858 14527011 2598081 2627887 2395 0
2022-04-28 15:21:01 22231035 22633254 2584059 2702081 2222 0
2022-04-28 15:21:04 30494310 30500874 2754425 2622540 3551 0
2022-04-28 15:21:07 38004210 38032956 2503300 2510694 2862 0
2022-04-28 15:21:10 46188696 46298223 2728162 2755089 2227 0
...
Используйте параметр --info, чтобы получить описания каждой метрики:
mxgate watch --info
По умолчанию отображаются только метрики скорости. Используйте --watch-latency для мониторинга метрик задержки при отладке:
mxgate watch --watch-latency
mxgate watch --history
Вычисляет среднюю скорость загрузки в час за последние 24 часа. Пример вывода:
TIME RANGE | SPEED/S | BANDWIDTH MB/S | BLOCK ITEMS
2022-04-28 16:00:00-2022-04-28 17:00:00 | 2208010 | 1254.48 | 0
2022-04-28 17:00:00-2022-04-28 18:00:00 | 1157920 | 1327.00 | 0
2022-04-28 18:00:00-2022-04-28 19:00:00 | 2228666 | 2162.32 | 0
2022-04-28 19:00:00-2022-04-28 20:00:00 | 1371092 | 2881.30 | 0
2022-04-28 20:00:00-2022-04-28 21:00:00 | 1575320 | 2608.20 | 0
SPEED/S: количество записей, загружаемых в секунду. BANDWIDTH MB/S: пропускная способность загрузки в МБ/с. BLOCK ITEMS: объем данных, заблокированных в mxgate. Это значение возрастает, когда скорость потребления базы данных не успевает за скоростью генерации данных источником (например, HTTP, Kafka).Можно использовать параметры --watch-start, --watch-end и --watch-duration для управления диапазоном и интервалом исторических наблюдений.
Например:
mxgate watch --history --watch-start '2022-03-27 00:00:00' --watch-end '2022-04-27 00:00:00' --watch-duration '168h'
Это возвращает среднюю скорость загрузки за неделю (каждые 168 часов) с 27 марта по 27 апреля.
Параметр --watch-duration поддерживает единицы: s (секунды), m (минуты), h (часы).
mxgate позволяет динамически обновлять параметры параллельной записи без остановки службы. Поддерживаемые параметры включают "interval" и "stream-prepared".
interval: Продолжительность каждой транзакции записи из mxgate в целевую таблицу. stream-prepared: Количество активных соединений для записи. В mxgate только одно соединение для записи выполняет транзакции для заданной таблицы в любой момент времени. Для достижения высокой скорости и эффективной загрузки данных используются несколько соединений в разные временные интервалы. Вы можете настроить параметр interval, чтобы оптимизировать производительность записи.
Примеры:
Установить количество соединений записи на одну таблицу равным 3:
mxgate set --stream-prepared-cli 3

Получить текущее количество активных соединений записи на таблицу:
mxgate get --stream-prepared-get

Установить интервал записи для всех таблиц в 200 мс:
mxgate set --job-interval 200

Получить текущий интервал записи для всех таблиц:
mxgate get --job-interval-get

Примечание!
Чтобы установить или получить параметры для конкретной таблицы, добавьте к команде--job <schema.table_name>. Каждая задача соответствует таблице базы данных. Например, если имя таблицы —test_tableв схемеpublic, используйте--job public.test_table.
Во время загрузки данных может оказаться, что исходная схема таблицы больше не соответствует изменяющимся шаблонам временных рядов. mxgate поддерживает обновление схемы без остановки службы. В этом разделе описано, как приостановить запись, перезагрузить обновлённые метаданные таблицы и возобновить вставку данных.
Шаги:
Сначала выполните mxgate pause -X, чтобы остановить все соединения записи и подготовиться к изменению схемы. Флаг -X является обязательным — он завершает соединения между mxgate и базой данных. Без этого изменения схемы завершатся ошибкой. Используйте флаг -S, чтобы команда pause ожидала синхронно завершения всех соединений перед возвратом результата.

После приостановки записи измените структуру целевой таблицы — добавьте или удалите столбцы, либо удалите и повторно создайте таблицу с тем же именем.
Примечание!
Новая структура таблицы может отличаться, но имя таблицы должно остаться прежним.
Наконец, выполните mxgate resume -R, чтобы возобновить операции записи и перезагрузить обновлённые метаданные таблицы. Флаг -R является обязательным. Вместе resume и -R запускают перезагрузку метаданных.

В средах с несколькими запущенными процессами mxgate используйте параметр -p, чтобы указать идентификатор процесса. Это относится ко всем командам выше.

Примечание!
Для перезагрузки метаданных необходимо сначала приостановить все соединения записи для целевой таблицы. В противном случае возникнет следующая ошибка: