MatrixGate сокращённо называется mxgate. Это высокопроизводительный сервер потоковой загрузки данных, расположенный в каталоге bin/mxgate установочного директория MatrixDB. В настоящее время MatrixGate предоставляет интерфейсы HTTP и STDIN для загрузки данных, форматы данных поддерживают TEXT и CSV.
Логика загрузки данных через MatrixGate показана на рисунке ниже:

Укажите целевую базу данных и целевую таблицу для генерации конфигурационного файла mxgate
mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf
Вышеуказанные параметры создадут конфигурационный файл mxgate.conf, позволяя пользователям настраивать загрузку для testtable и testtable2, а также использовать глобальные настройки по умолчанию для загрузки данных в другие таблицы.
При необходимости измените конфигурационный файл mxgate, например, задайте разделитель данных и т.д., шаг можно пропустить, если используются настройки по умолчанию. Ниже приведены настройки для testtable и testtable2 в этом конфигурационном файле:
[[job.target]]
# delimiter = "|"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable"
# null-as = ""
table = "public.testtable"
# time-format = "unix-second"
# use-auto-increment = true
[[job.target]]
# delimiter = "|"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable2"
# null-as = ""
table = "public.testtable2"
# time-format = "unix-second"
# use-auto-increment = true
Если разделителем для testtable является @, а для testtable2 — %, вышеуказанную конфигурацию можно изменить следующим образом:
[[job.target]]
delimiter = "@"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable"
# null-as = ""
table = "public.testtable"
# time-format = "unix-second"
# use-auto-increment = true
[[job.target]]
delimiter = "%"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable2"
# null-as = ""
table = "public.testtable2"
# time-format = "unix-second"
# use-auto-increment = true
По умолчанию mxgate прослушивает порт 8086 для приема данных. В файле mxgate.conf можно увидеть, что подпункт http-port в source.http установлен в 8086. При необходимости его можно изменить на другой порт:
[source]
## Source plugin is the data entrance to MatrixGate
## Types restricted to: http
source = "http"
[source.http]
## Port of http push
# http-port = 8086
## Maximum request body size (after gzip)
## The server rejects requests with bodies exceeding this limit.
# max-body-bytes = 4194304
## The maximum number of concurrent HTTP connections to the server
## The server response with 503 after exceed this limit.
# max-concurrency = 40000
Запустите mxgate, загрузите конфигурационные файлы, подключитесь к базе данных demo и подготовьтесь к приему запросов на загрузку данных
mxgate start --config mxgate.conf
Проверьте состояние фонового сервиса
mxgate status
Остановите фоновый сервис
mxgate stop
При возникновении таймаута или других проблем необходимо принудительно остановить процесс, это можно сделать так:
mxgate stop --force
| Имя параметра | Значение параметра | Описание параметра |
|---|---|---|
| [database] категория | ||
| --db-database | по умолчанию postgres | Имя базы данных MatrixDB, к которой подключается MatrixGate |
| --db-master-host | по умолчанию имя локального хоста | Имя хоста MatrixDB, к которому подключается MatrixGate |
| --db-master-port | по умолчанию 5432 | Порт хоста MatrixDB, к которому подключается MatrixGate |
| --db-user | по умолчанию текущее имя пользователя системы | Имя пользователя MatrixDB, к которому подключается MatrixGate Примечание: этот пользователь должен иметь права на создание внешних таблиц. Если используется не суперпользователь, добавьте права следующей командой: alter user {username} CREATEEXTTABLE; |
| --db-password | по умолчанию пусто | Пароль пользователя MatrixDB, к которому подключается MatrixGate |
| --db-max-conn | по умолчанию 10 | Максимальное количество соединений MatrixGate с MatrixDB |
| [job] категория | ||
| --allow-dynamic | по умолчанию false | При указании --allow-dynamic=true разрешается динамически определять целевую таблицу на основе содержимого данных POST (первая строка). Эта опция должна использоваться только в случаях, когда имя целевой таблицы неизвестно при запуске MatrixGate. Если вставка осуществляется в конкретную известную таблицу, рекомендуется явно указать имя таблицы с помощью --target |
| --delimiter | по умолчанию | | Символ, используемый для разделения столбцов в каждой строке данных |
| --error-handling | по умолчанию 'accurate' | Способ обработки строк с ошибками формата 'accurate': ошибочные данные не попадают в БД, но фиксируются в журнале ошибок; остальные данные в пакете не затрагиваются 'legacy': весь пакет отклоняется при наличии ошибки |
| --exclude-columns | по умолчанию пусто | По умолчанию количество и порядок столбцов в загружаемых данных должны соответствовать определению таблицы. Когда загружаются только некоторые столбцы, --exclude-columns указывает имена исключаемых столбцов; остальные столбцы должны сохранять порядок, как в определении таблицы. Примечание: если уже включен --use-auto-increment для пропуска автоинкрементных полей, их не нужно перечислять здесь. Этот параметр нужен только для указания других столбцов, которые следует исключить |
| --format | по умолчанию text | Формат исходных данных: text или csv. Формат text самый быстрый, но не поддерживает символы перевода строки внутри текстовых полей. Формат csv более универсален, текстовые поля должны быть заключены в двойные кавычки |
| --null-as | по умолчанию пустая строка | Строка, обозначающая значение NULL. По умолчанию — пустая строка без кавычек. Если столбец таблицы имеет ограничение NOT NULL, а в данных указано пустое значение, загрузка завершится ошибкой. Примечание: чтобы использовать \N как NULL, необходимо экранировать обратный слэш, например: --null-as \N |
| --time-format | по умолчанию unix-second | Единица измерения временной метки: unix-second|unix-ms|unix-nano|raw. MatrixGate по умолчанию считает первый столбец каждой строки временной меткой в формате Unix и автоматически преобразует её в формат времени базы данных. Если временная метка находится не в первом столбце или уже преобразована в формат БД, используйте raw — тогда MatrixGate не будет выполнять преобразование типа времени |
| --upsert-key | по умолчанию пусто | Ключ(и) для операции upsert. Для таблицы, требующей upsert, должно быть создано ограничение UNIQUE, и все ключи ограничения должны быть указаны в параметре |
| --deduplicate-key | по умолчанию пусто | Аналогично upsert, но обновляет только пустые значения: если старое значение не NULL, новое игнорируется. Взаимоисключающий с --upsert-key, можно использовать только один из них |
| --use-auto-increment | по умолчанию true | При наличии автоинкрементного поля в целевой таблице пропускать ли его при загрузке данных и использовать системное автоинкрементное значение |
| --target | schemaName.tableName | Целевая таблица. schemaName можно опустить, по умолчанию public. Можно указать несколько таблиц: "--target таблица1 --target таблица2 …". Если параметр не указан, можно использовать --allow-dynamic для динамического определения имени таблицы |
| [misc] категория | ||
| --log-archive-hours | по умолчанию 72 | В каталоге журналов файлы matrixgate, не изменявшиеся более указанного времени, автоматически сжимаются |
| --log-compress | по умолчанию true | Глобальный переключатель включения автоматического сжатия журналов |
| --log-dir | по умолчанию /home/mxadmin/gpAdminLogs | Каталог журналов |
| --log-max-archive-files | по умолчанию 0 | Максимальное количество сжатых файлов журналов. При превышении самые старые файлы удаляются. 0 — не удалять |
| --log-remove-after-days | по умолчанию 0 | Через сколько дней после сжатия файлы журналов будут автоматически удалены. 0 — не удалять |
| --log-rotate-size-mb | по умолчанию 100 | Текущий файл журнала автоматически заменяется новым при превышении указанного размера, старый файл немедленно сжимается |
| [source] категория | ||
| --source | по умолчанию http | Источник данных для MatrixGate: поддерживается http, stdin, kafka, transfer |
| [source] категория | [http] | |
| --http-port | по умолчанию 8086 | HTTP-интерфейс для отправки данных пользователями в MatrixGate |
| --max-body-bytes | по умолчанию 4194304 | Максимальный размер тела каждого HTTP-запроса |
| --max-concurrency | по умолчанию 40000 | Максимальное количество одновременных HTTP-соединений |
| --request-timeout | по умолчанию 0 | Таймаут запроса, по умолчанию 0 — бесконечное ожидание. При значении >0 запрос завершится по истечении указанного количества миллисекунд с кодом ответа HTTP408 |
| --disable-keep-alive | по умолчанию false | Принудительное закрытие соединения после каждого HTTP-запроса |
| --http-debug | по умолчанию false | Вывод дополнительной диагностической информации HTTP |
| [source] категория | [transfer] | |
| --src-host | IP-адрес master-узла исходной базы данных | |
| --src-port | Порт master-узла исходной базы данных | |
| --src-user | Имя пользователя для подключения к исходной базе данных (рекомендуется использовать суперпользователя) | |
| --src-password | Пароль подключения | |
| --src-schema | Имя схемы исходной таблицы | |
| --src-table | Имя исходной таблицы | |
| --src-sql | SQL-запрос для фильтрации мигрируемых данных | |
| --compress | Метод передачи данных с сегментов исходной базы данных: пустая строка "" — без сжатия, передача в открытом виде gzip — сжатие gzip, требует наличия команды gzip на сегментах исходной базы данных lz4 — сжатие lz4, требует наличия команды lz4 на сегментах исходной базы данных Рекомендуется: lz4 > gzip > без сжатия |
|
| --port-base | Диапазон портов, используемых при передаче: 9129~ | |
| --local-ip | Необходимо указать IP-адрес, по которому исходная база данных может подключиться к этому хосту | |
| [writer] категория | ||
| --interval | по умолчанию 100 мс | Период выполнения пакетной загрузки данных MatrixGate |
| --stream-prepared | по умолчанию 10 | Степень параллелизма рабочих процессов вставки |
| --use-gzip | по умолчанию 'auto' | Включать ли сжатие при отправке данных в сегменты. Возможные значения: auto/yes/no |
| --max-seg-conn | по умолчанию 128 | Количество сегментов, запускаемых при выборке данных из MatrixGate через внешнюю таблицу. Увеличение этого параметра повышает нагрузку на сетевые соединения |
| --timing | по умолчанию false | При включении в логах MatrixGate для каждой операции INSERT будет выводиться информация о времени выполнения |
| --insert-timeout | по умолчанию 0 | Таймаут выполнения оператора INSERT в MatrixGate, по умолчанию 0 — бесконечное ожидание. При значении >0 операция завершится по истечении указанного количества миллисекунд |
|Другие| | --help | | Отображение справки и списка параметров |
MatrixGate предоставляет HTTP API, позволяя языкам программирования импортировать данные в базу данных MatrixDB через HTTP-интерфейс.
Формат протокола HTTP MatrixGate
| Тип протокола | Формат протокола | Использование и примеры |
|---|---|---|
| URL | http://mxgate-host:port | Укажите адрес подключения mxgate |
| PATH | / | Поддерживаются /, любые пути после / игнорируются |
| HTTP Method | POST | Поддерживается только метод POST для загрузки данных |
| HTTP Header | Content-Encoding: gzip | Поддерживается сжатие тела HTTP с помощью gzip |
| Content-Type: text/plain | Поддерживается text/plain | |
| HTTP Body | SchemaName.TableName Timestamp|ID|C1|C2|..|Cn |
Первая строка тела — целевая таблица для загрузки данных. SchemaName можно опустить, по умолчанию public. TableName — обязательный параметр. Начиная со второй строки — строки временных рядов. Каждая строка соответствует строке целевой таблицы. Между столбцами используется разделитель |, между строками — \n. Первое поле каждой строки — временная метка, формат — UNIX-временная метка в секундах (см. описание --time-format). Второе поле каждой строки — TagID (целое число). С третьего до последнего поля — столбцы, соответствующие целевой таблице. Рекомендуется, чтобы DDL-определение целевой таблицы также следовало порядку столбцов (Timestamp, TagID, C1, C2,…, Cn) |
Коды ответа HTTP MatrixGate
| Код ответа | Значение кода ответа | Примечания |
|---|---|---|
| 200 | StatusOK | Некоторые данные имеют неверный формат, тело ответа содержит строки с ошибками и сообщениями, например:At line: 2missing data for column "c3" |
| 204 | StatusNoContent | Данные успешно загружены в MatrixGate |
| 400 | StatusBadRequest | Ошибка запроса данных, например, неверный формат тела POST, отсутствие целевой таблицы, несоответствие формата сжатия данным в заголовке HTTP и т.д. |
| 405 | StatusMethodNotAllowed | HTTP-запрос не POST |
| 408 | StatusTimeout | Таймаут запроса |
| 500 | StatusIntervalServerError | Ошибка базы данных, загрузка данных не удалась, тело ответа содержит подробную информацию об ошибке |
| 503 | StatusServiceUnavailable | MatrixGate отклоняет запросы, например, превышено максимальное количество соединений или MatrixGate завершает работу и т.д. |
CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
mxgate --config mxgate.conf
curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
date_part | time | tagid | c1 | c2 | c3
----------------------------------------------------------------------------------------------------------------------------------
1603777821 | 2020-10-27 13:50:21+08 | 1 | 101 | 201 | 301
1603777822 | 2020-10-27 13:50:22+08 | 2 | 102 | 202 | 302
1603777823 | 2020-10-27 13:50:23+08 | 3 | 103 | 203 | 303
(3 rows)
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
public class MxgateExample {
public static void main(String[] args) throws Exception {
MxgateExample http = new MxgateExample();
http.sendingPostRequest();
}
// HTTP Post request
private void sendingPostRequest() throws Exception {
// mxgate listens on port 8086 of localhost
String url = "http://localhost:8086/";
URL obj = new URL(url);
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
// Setting basic post request
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type","text/plain");
String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";
con.setDoOutput(true);
DataOutputStream wr = new DataOutputStream(con.getOutputStream());
// When the data is in Chinese, it can be encoded by postJsonData.getBytes("UTF-8")
wr.write(postJsonData.toString().getBytes("UTF-8"));
wr.flush();
wr.close();
int responseCode = con.getResponseCode();
System.out.println("Sending 'POST' request to URL : " + url);
System.out.println("Post Data : " + postJsonData);
System.out.println("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String output;
StringBuffer response = new StringBuffer();
while ((output = in.readLine()) != null) {
response.append(output);
}
in.close();
System.out.println(response.toString());
}
}
import http.client
class MxgateExample(object):
def __init__(self):
# mxgate listens on port 8086 of localhost
self.url = "localhost:8086"
self.postData = "public.testtable\n/" \
"1603777821|1|101|201|301\n/" \
"1603777822|2|102|202|302\n/" \
"1603777823|3|103|203|303"
self.headers = {"Content-Type": "text/plain"}
# HTTP Post request
def sending_post_request(self):
conn = http.client.HTTPConnection(self.url)
conn.request("POST", "/", self.postData, self.headers)
response = conn.getresponse()
response_code = response.getcode()
print(f"Sending 'POST' request to URL : {self.url}")
print(f"Post Data : {self.postData}")
print(f"Response Code : {response_code}")
output = response.read()
print(output)
if __name__ == '__main__':
gate_post = MxgateExample()
gate_post.sending_post_request()
Рекомендуется использовать среду разработки C# Core
using System; using System.IO; using System.Net; using System.Text;
namespace HttpPostTest { class Program { static void Main(string[] args) { var url = "http://10.13.2.177:8086/"; var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";
HttpPost(url,txt);
}
public static string HttpPost(string url, string content){ string result = ""; HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url); req.Method = "POST"; req.ContentType = "text/plain";
#region Add Post parameters
byte[] data = Encoding.UTF8.GetBytes(content);
req.ContentLength = data.Length;
using (Stream reqStream = req.GetRequestStream()){
reqStream.Write(data, 0, data.Length);
reqStream.Close();
}
#endregion
HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
Stream stream = resp.GetResponseStream();
//Get the response content
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
result = reader.ReadToEnd();
}
return result;
}
} }
> Если возникает ошибка "connection ***** body size exceeds the given limit", увеличьте параметр max-body-bytes в файле mxgate.conf
### 6.4 Пример HTTP API MatrixGate на Golang
package main
import ( "bytes" "net/http" )
func PostDataToServer(URL string) error {
data := public.testtable 1603777821|1|101|201|301 1603777822|2|102|202|302 1603777823|3|103|203|303
resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
if err != nil {
return err
}
if resp.StatusCode != 200 {
// Обработка тела ответа.
return nil
}
// Обработка тела ответа.
return nil
}
func main() { err := PostDataToServer("http://127.0.0.1:8086") if err != nil{ panic(err) }
}
## 7 MatrixGate Loading Special Types
### 7.1 MatrixGate loading CSV file example
- Create table csvtable in demo database
CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT) DISTRIBUTED BY (tagid);
- Edit the data load file data.csv, the content is as follows
1603777821|1|101|201|301 1603777822|2|102|202|302 1603777823|3|103|203|303
- Start mxgate, specify the source parameter to stdin, the target table is an existing csvtable, and the loading parallelism is 2
mxgate \ --source stdin \ --db-database demo \ --db-master-host 127.0.0.1 \ --db-master-port 5432 \ --db-user mxadmin \ --time-format unix-second \ --delimiter "|" \ --target csvtable \ --parallel 2 < data.csv
- Connect to the database to query whether the data is loaded successfully
demo=# SELECT * FROM csvtable ; time | tagid | c1 | c2 | c3 -----------------------+-------+-------------------------------------------------------------------------------------------------- 2020-10-27 05:50:23+08 | 3 | 103 | 203 | 303 2020-10-27 05:50:22+08 | 2 | 102 | 202 | 302 2020-10-27 05:50:21+08 | 1 | 101 | 201 | 301
(3 rows)
### 7.2 MatrixGate loading json field example
### 7.2.1 json
- Create table
create table json_test(id int, j json);
- Создайте файлы данных
`~/json.csv`
1|"{""a"":10, ""b"":""xyz""}"
- Загрузка
Здесь в качестве примера используется режим stdin, другие режимы аналогичны.
Ключевой момент — `--format csv`
mxgated \ --source stdin \ --db-database postgres \ --db-master-host 127.0.0.1 \ --db-master-port 7000 \ --db-user mxadmin \ --time-format raw \ --format csv \ --delimiter "|" \ --target json_test < ~/json.csv
- Просмотр загруженных данных
postgres=# select * from json_test; id | j ----+----------------------------------------------------------------------------------------------------------------------------- 1 | {"a":10, "b":"xyz"} (1 row)
### 7.2.2 Массив JSON
- Создание таблицы
create table json_array_test(id int, j _json);
- Создание файлов данных
`~/json_array.csv`
1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"
- Загрузка
mxgate \ --source stdin \ --db-database postgres \ --db-master-host 127.0.0.1 \ --db-master-port 7000 \ --db-user mxadmin \ --time-format raw \ --format csv \ --delimiter "|" \ --target json_array_test < ~/json_array.csv
- Проверка
postgres=# select * from json_array_test ; id | j ----+----------------------------------------------------------------------------------------------------------------------------- 1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"} (1 row)
> Примечание: Поскольку столбец JSON содержит специальные символы, такие как кавычки, параметр --format в mxgate должен быть установлен в csv
## 8 Наблюдение за показателями работы mxgate:
watch — это подкоманда mxgate, которая использует ряд показателей для описания работы демона mxgate.
Существует два режима watch:
- Режим реального времени: каждые 3 секунды выводятся показатели gate в формате, аналогичном sar.
- Режим исторических наблюдений: можно указать любой период времени (например, каждый час вчера, каждый день в прошлом месяце, каждый месяц в прошлом году), чтобы статистически оценить скорость импорта.
### 8.1 Наблюдение в реальном времени
mxgate watch
Показатели работы mxgate будут собираться каждые три секунды, результат вывода следующий:
Time WCount ICount WSpeed/s ISpeed/s WBandWidth MB/S BlocakItems
2022-04-28 15:20:58 14478858 14527011 2598081 2627887 2395 0 2022-04-28 15:21:01 22231035 22633254 2584059 2702081 2222 0 2022-04-28 15:21:04 30494310 30500874 2754425 2622540 3551 0 2022-04-28 15:21:07 38004210 38032956 2503300 2510694 2862 0 2022-04-28 15:21:10 46188696 46298223 2728162 2755089 2227 0 ...
Описание вышеуказанных показателей можно получить с помощью параметра --info
mxgate watch --info
По умолчанию выводятся только показатели скорости. Показатели времени можно наблюдать с помощью параметра --watch-lateency для анализа проблем.
mxgate watch --watch-latency
### 8.2 Наблюдение за историческими данными
mxgate watch --history
Будет рассчитана средняя скорость за каждый час в диапазоне от текущего времени минус 24 часа до текущего времени. Результат вывода следующий:
TIME RANGE | SPEED/S | BANDWIDTH MB/S | BLOCK ITEMS
2022-04-28 16:00:00-2022-04-28 17:00:00 | 2208010 | 1254.48 | 0 2022-04-28 17:00:00-2022-04-28 18:00:00 | 1157920 | 1327.00 | 0 2022-04-28 18:00:00-2022-04-28 19:00:00 | 2228666 | 2162.32 | 0 2022-04-28 19:00:00-2022-04-28 20:00:00 | 1371092 | 2881.30 | 0 2022-04-28 20:00:00-2022-04-28 21:00:00 | 1575320 | 2608.20 | 0
SPEED/S, BANDWIDTH MB/S — скорость и пропускная способность импорта (в МБ/с),
BLOCK ITEMS — объем блокированных данных в mxgate. Это значение возрастает, когда скорость потребления данных базой данных не успевает за скоростью генерации источников данных (http, kafka и т.д.).
Можно добавить параметры `--watch-start`, `--watch-end`, `--watch-duration` для управления интервалом и периодом наблюдения за историческими данными.
Например:
mxgate watch --history --watch-start '2022-03-27 00:00:00' --watch-end '2022-04-27 00:00:00' --watch-duration '168h'
Средняя скорость импорта по неделям (каждые 168 ч) с 27 марта по 27 апреля
Параметр `--watch-duration` поддерживает три единицы измерения: `h``` `m``