MatrixGate сокращённо называется mxgate. Это высокопроизводительный сервер потоковой загрузки данных, расположенный в каталоге bin/mxgate установочного каталога MatrixDB. В настоящее время MatrixGate предоставляет интерфейсы HTTP и STDIN для загрузки данных, форматы данных поддерживают TEXT и CSV.
Логика загрузки данных через MatrixGate показана на рисунке ниже:

Укажите целевую базу данных и целевую таблицу для генерации конфигурационного файла mxgate
mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf
Вышеприведённые параметры создадут конфигурационный файл mxgate.conf, позволяя пользователям настраивать загрузку данных в testtable и testtable2, а также использовать глобальные настройки по умолчанию для загрузки данных в другие таблицы.
При необходимости измените конфигурационный файл mxgate, например, задайте разделитель данных и т.д., шаг можно пропустить, если используются настройки по умолчанию. Ниже приведены параметры для testtable и testtable2 из этого файла конфигурации:
[[job.target]]
# delimiter = "|"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable"
# null-as = ""
table = "public.testtable"
# time-format = "unix-second"
# use-auto-increment = true
[[job.target]]
# delimiter = "|"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable2"
# null-as = ""
table = "public.testtable2"
# time-format = "unix-second"
# use-auto-increment = true
Если разделителем для testtable является @, а для testtable2 — %, то вышеуказанную конфигурацию можно изменить следующим образом:
[[job.target]]
delimiter = "@"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable"
# null-as = ""
table = "public.testtable"
# time-format = "unix-second"
# use-auto-increment = true
[[job.target]]
delimiter = "%"
# exclude-columns = []
# format = "text"
name = "job_text_to_public.testtable2"
# null-as = ""
table = "public.testtable2"
# time-format = "unix-second"
# use-auto-increment = true
По умолчанию mxgate прослушивает порт 8086 для приёма данных. В файле mxgate.conf можно увидеть, что подпункт http-port в source.http установлен в 8086. При необходимости его можно изменить на другой порт:
[source]
## Source plugin is the data entrance to MatrixGate
## Types restricted to: http
source = "http"
[source.http]
## Port of http push
# http-port = 8086
## Maximum request body size (after gzip)
## The server rejects requests with bodies exceeding this limit.
# max-body-bytes = 4194304
## The maximum number of concurrent HTTP connections to the server
## The server response with 503 after exceed this limit.
# max-concurrency = 40000
Запустите mxgate, загрузите конфигурационный файл, подключитесь к базе данных demo и подготовьтесь к приёму запросов на загрузку данных
mxgate start --config mxgate.conf
Проверьте состояние фонового сервиса
mxgate status
Остановите фоновый сервис
mxgate stop
При возникновении таймаута или других проблем необходимо принудительно остановить процесс, это можно сделать так:
mxgate stop --force
| Имя параметра | Значение параметра | Описание параметра |
|---|---|---|
| [database] категория | ||
| --db-database | по умолчанию postgres | Имя базы данных MatrixDB, к которой подключается MatrixGate |
| --db-master-host | по умолчанию имя локального хоста | Имя хоста MatrixDB, к которому подключается MatrixGate |
| --db-master-port | по умолчанию 5432 | Порт хоста MatrixDB, к которому подключается MatrixGate |
| --db-user | по умолчанию текущее имя пользователя системы | Имя пользователя для подключения MatrixGate к MatrixDB Примечание: этот пользователь должен иметь права на создание внешних таблиц. Если используется не суперпользователь, добавьте права с помощью следующей команды: alter user {username} CREATEEXTTABLE; |
| --db-password | по умолчанию пусто | Пароль пользователя MatrixDB для подключения MatrixGate |
| --db-max-conn | по умолчанию 10 | Максимальное количество соединений MatrixGate с MatrixDB |
| [job] категория | ||
| --allow-dynamic | по умолчанию false | При указании --allow-dynamic=true разрешается динамически определять целевую таблицу на основе содержимого данных POST (первая строка). Эта опция должна использоваться только в тех случаях, когда имя целевой таблицы заранее неизвестно при запуске MatrixGate. Если вставка осуществляется в конкретную известную таблицу, рекомендуется явно указать имя таблицы с помощью параметра --target |
| --delimiter | по умолчанию | | Символ, используемый для разделения столбцов в каждой строке данных |
| --error-handling | по умолчанию 'accurate' | Способ обработки строк с ошибками формата 'accurate': ошибочные данные не попадают в БД, но записываются в журнал ошибок; остальные данные в пакете загружаются нормально 'legacy': весь пакет отклоняется при наличии ошибки |
| --exclude-columns | по умолчанию пусто | По умолчанию количество и порядок столбцов в загружаемых данных должны соответствовать определению таблицы. Когда загружаются только некоторые столбцы, --exclude-columns указывает имена исключаемых столбцов, остальные столбцы должны сохранять порядок, как в определении таблицы. Примечание: если уже включен --use-auto-increment для пропуска автоинкрементных полей, их не нужно перечислять здесь. Этот параметр нужен только для указания других столбцов, которые следует исключить |
| --format | по умолчанию text | Формат исходных данных: text или csv. Формат text самый быстрый, но не поддерживает переводы строк внутри текстовых значений. Формат csv более универсален, текстовые поля должны быть заключены в двойные кавычки. |
| --null-as | по умолчанию пустая строка | Строка, обозначающая значение NULL. По умолчанию — пустая строка без кавычек. Если столбец таблицы имеет ограничение NOT NULL, а в данных указано пустое значение, произойдёт ошибка загрузки. Примечание: чтобы использовать \N как NULL, необходимо экранировать обратный слэш, например: --null-as \N |
| --time-format | по умолчанию unix-second | Единица измерения временной метки: unix-second|unix-ms|unix-nano|raw. MatrixGate по умолчанию считает первый столбец каждой строки временной меткой в формате Unix и автоматически преобразует её в формат времени базы данных. Если временная метка не в первом столбце или уже преобразована в формат БД, используйте raw — тогда MatrixGate не будет выполнять преобразование типа времени. |
| --upsert-key | по умолчанию пусто | Ключ(и) для операции upsert. Таблица, для которой выполняется upsert, должна иметь UNIQUE-ограничение, и все ключи ограничения должны быть указаны в параметре. |
| --deduplicate-key | по умолчанию пусто | Аналогично upsert, но обновляет только значения NULL. Если старое значение не NULL, новое игнорируется. Взаимоисключающий с --upsert-key, можно использовать только один из них. |
| --use-auto-increment | по умолчанию true | Пропускать ли автоинкрементные поля при загрузке данных и использовать системные значения по умолчанию, если целевая таблица содержит такие поля |
| --target | schemaName.tableName | Целевая таблица. schemaName можно опустить, по умолчанию public. Можно указать несколько таблиц: "--target таблица1 --target таблица2 …". Если параметр не указан, можно использовать --allow-dynamic для динамического определения имени таблицы. |
| [misc] категория | ||
| --log-archive-hours | по умолчанию 72 | В каталоге логов файлы matrixgate, не изменявшиеся более заданного времени, автоматически сжимаются |
| --log-compress | по умолчанию true | Глобальный переключатель автоматического сжатия логов |
| --log-dir | по умолчанию /home/mxadmin/gpAdminLogs | Каталог логов |
| --log-max-archive-files | по умолчанию 0 | Максимальное количество архивных файлов логов. При превышении самые старые файлы удаляются. 0 — не удалять |
| --log-remove-after-days | по умолчанию 0 | Через сколько дней после сжатия файлы логов будут удалены. 0 — не удалять |
| --log-rotate-size-mb | по умолчанию 100 | Текущий файл лога автоматически заменяется новым при превышении указанного размера, старый файл немедленно сжимается |
| [source] категория | ||
| --source | по умолчанию http | Источник данных для MatrixGate: поддерживаются http, stdin, kafka, transfer |
| [source] категория | [http] | |
| --http-port | по умолчанию 8086 | HTTP-интерфейс для отправки данных пользователями в MatrixGate |
| --max-body-bytes | по умолчанию 4194304 | Максимальный размер тела каждого HTTP-запроса |
| --max-concurrency | по умолчанию 40000 | Максимальное количество одновременных HTTP-соединений |
| --request-timeout | по умолчанию 0 | Таймаут запроса, по умолчанию 0 — бесконечное ожидание. При значении >0 запрос завершится по истечении указанного количества миллисекунд с кодом HTTP 408. |
| --disable-keep-alive | по умолчанию false | Принудительное разъединение после каждого HTTP-запроса |
| --http-debug | по умолчанию false | Вывод дополнительной диагностической информации HTTP |
| [source] категория | [transfer] | |
| --src-host | IP-адрес master-узла исходной базы данных | |
| --src-port | Порт master-узла исходной базы данных | |
| --src-user | Имя пользователя для подключения к исходной базе данных (рекомендуется использовать суперпользователя) | |
| --src-password | Пароль подключения | |
| --src-schema | Имя схемы исходной таблицы | |
| --src-table | Имя исходной таблицы | |
| --src-sql | SQL-фильтр для миграции данных | |
| --compress | Метод передачи данных с сегментов исходной БД: пустая строка "" — без сжатия, передача в открытом виде gzip — сжатие gzip, требует наличия команды gzip на сегментах исходной БД lz4 — сжатие lz4, требует наличия команды lz4 на сегментах исходной БД рекомендуется: lz4 > gzip > без сжатия |
|
| --port-base | Диапазон портов, используемых при передаче: 9129~ | |
| --local-ip | Необходимо указать IP-адрес, по которому исходная БД может подключиться к этому хосту | |
| [writer] категория | ||
| --interval | по умолчанию 100 мс | Период выполнения пакетной загрузки данных MatrixGate |
| --stream-prepared | по умолчанию 10 | Степень параллелизма рабочих процессов вставки |
| --use-gzip | по умолчанию 'auto' | Включать ли сжатие при отправке данных в segment, возможные значения: auto/yes/no |
| --max-seg-conn | по умолчанию 128 | Количество сегментов, запускаемых при выборке данных из MatrixGate через внешнюю таблицу. Увеличение параметра повышает нагрузку на сетевые соединения |
| --timing | по умолчанию false | При включении в логах MatrixGate для каждой операции INSERT будет указываться время выполнения |
| --insert-timeout | по умолчанию 0 | Таймаут выполнения оператора INSERT в MatrixGate, по умолчанию 0 — бесконечное ожидание. При значении >0 операция завершится по истечении указанного количества миллисекунд. |
| Другие | ||
| --help | Отображение справки и списка параметров |
MatrixGate предоставляет HTTP API, позволяя языкам программирования импортировать данные в базу данных MatrixDB через HTTP-интерфейс.
Формат протокола HTTP MatrixGate
| Тип протокола | Формат протокола | Использование и примеры |
|---|---|---|
| URL | http://mxgate-host:port | Указание адреса подключения к mxgate |
| PATH | / | Поддерживаются /, любые пути после / игнорируются |
| HTTP Method | POST | Поддерживается только метод POST для загрузки данных |
| HTTP Header | Content-Encoding: gzip | Поддерживается сжатие содержимого тела HTTP с помощью gzip |
| Content-Type: text/plain | Поддерживается text/plain | |
| HTTP Body | SchemaName.TableName Timestamp|ID|C1|C2|..|Cn |
Первая строка тела — целевая таблица для загрузки данных. SchemaName можно опустить, по умолчанию public. TableName — обязательный параметр. Начиная со второй строки — строки временных рядов. Каждая строка соответствует строке целевой таблицы. Между столбцами используется разделитель |, между строками — \n. Первое поле каждой строки — временная метка в формате Unix (в секундах), см. описание --time-format. Второе поле каждой строки — TagID (целое число). С третьего до последнего поля — столбцы, соответствующие целевой таблице. Рекомендуется, чтобы DDL-определение целевой таблицы также следовало порядку столбцов (Timestamp, TagID, C1, C2,…, Cn) |
Коды ответов HTTP MatrixGate
| Код ответа | Значение кода ответа | Примечания |
|---|---|---|
| 200 | StatusOK | Некоторые данные имеют неверный формат, в теле ответа будет указаны строки с ошибками и сообщениями, например:At line: 2missing data for column "c3" |
| 204 | StatusNoContent | Данные успешно загружены в MatrixGate |
| 400 | StatusBadRequest | Ошибка запроса данных, например: ошибка формата тела POST, таблица не существует, формат сжатия данных не соответствует заголовку HTTP-запроса и т.д. |
| 405 | StatusMethodNotAllowed | HTTP-запрос не POST |
| 408 | StatusTimeout | Таймаут запроса |
| 500 | StatusIntervalServerError | Ошибка базы данных, загрузка данных не удалась, в теле ответа содержится подробная информация об ошибке |
| 503 | StatusServiceUnavailable | MatrixGate отклоняет запросы, например: превышено максимальное количество соединений, MatrixGate завершает работу и т.д. |
CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
mxgate --config mxgate.conf
curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
date_part | time | tagid | c1 | c2 | c3
----------------------------------------------------------------------------------------------------------------------------------
1603777821 | 2020-10-27 13:50:21+08 | 1 | 101 | 201 | 301
1603777822 | 2020-10-27 13:50:22+08 | 2 | 102 | 202 | 302
1603777823 | 2020-10-27 13:50:23+08 | 3 | 103 | 203 | 303
(3 rows)
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
public class MxgateExample {
public static void main(String[] args) throws Exception {
MxgateExample http = new MxgateExample();
http.sendingPostRequest();
}
// HTTP Post request
private void sendingPostRequest() throws Exception {
// mxgate listens on port 8086 of localhost
String url = "http://localhost:8086/";
URL obj = new URL(url);
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
// Setting basic post request
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type","text/plain");
String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";
con.setDoOutput(true);
DataOutputStream wr = new DataOutputStream(con.getOutputStream());
// When the data is in Chinese, it can be encoded by postJsonData.getBytes("UTF-8")
wr.write(postJsonData.toString().getBytes("UTF-8"));
wr.flush();
wr.close();
int responseCode = con.getResponseCode();
System.out.println("Sending 'POST' request to URL : " + url);
System.out.println("Post Data : " + postJsonData);
System.out.println("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String output;
StringBuffer response = new StringBuffer();
while ((output = in.readLine()) != null) {
response.append(output);
}
in.close();
System.out.println(response.toString());
}
}
import http.client
class MxgateExample(object):
def __init__(self):
# mxgate listens on port 8086 of localhost
self.url = "localhost:8086"
self.postData = "public.testtable\n/" \
"1603777821|1|101|201|301\n/" \
"1603777822|2|102|202|302\n/" \
"1603777823|3|103|203|303"
self.headers = {"Content-Type": "text/plain"}
# HTTP Post request
def sending_post_request(self):
conn = http.client.HTTPConnection(self.url)
conn.request("POST", "/", self.postData, self.headers)
response = conn.getresponse()
response_code = response.getcode()
print(f"Sending 'POST' request to URL : {self.url}")
print(f"Post Data : {self.postData}")
print(f"Response Code : {response_code}")
output = response.read()
print(output)
if __name__ == '__main__':
gate_post = MxgateExample()
gate_post.sending_post_request()
Рекомендуется использовать среду разработки C# Core
using System; using System.IO; using System.Net; using System.Text;
namespace HttpPostTest { class Program { static void Main(string[] args) { var url = "http://10.13.2.177:8086/"; var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";
HttpPost(url,txt);
}
public static string HttpPost(string url, string content){ string result = ""; HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url); req.Method = "POST"; req.ContentType = "text/plain";
#region Add Post parameters
byte[] data = Encoding.UTF8.GetBytes(content);
req.ContentLength = data.Length;
using (Stream reqStream = req.GetRequestStream()){
reqStream.Write(data, 0, data.Length);
reqStream.Close();
}
#endregion
HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
Stream stream = resp.GetResponseStream();
//Get the response content
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
result = reader.ReadToEnd();
}
return result;
}
} }
> Если возникает ошибка "connection ***** body size exceeds the given limit", увеличьте параметр max-body-bytes в файле mxgate.conf
### 6.4 Пример HTTP API MatrixGate на Golang
package main
import ( "bytes" "net/http" )
func PostDataToServer(URL string) error {
data := public.testtable 1603777821|1|101|201|301 1603777822|2|102|202|302 1603777823|3|103|203|303
resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
if err != nil {
return err
}
if resp.StatusCode != 200 {
// Обработка тела ответа.
return nil
}
// Обработка тела ответа.
return nil
}
func main() { err := PostDataToServer("http://127.0.0.1:8086") if err != nil{ panic(err) }
}
## 7 MatrixGate Loading Special Types
### 7.1 MatrixGate loading CSV file example
- Create table csvtable in demo database
CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT) DISTRIBUTED BY (tagid);
- Edit the data load file data.csv, the content is as follows
1603777821|1|101|201|301 1603777822|2|102|202|302 1603777823|3|103|203|303
- Start mxgate, specify the source parameter to stdin, the target table is an existing csvtable, and the loading parallelism is 2
mxgate \ --source stdin \ --db-database demo \ --db-master-host 127.0.0.1 \ --db-master-port 5432 \ --db-user mxadmin \ --time-format unix-second \ --delimiter "|" \ --target csvtable \ --parallel 2 < data.csv
- Connect to the database to query whether the data is loaded successfully
demo=# SELECT * FROM csvtable ; time | tagid | c1 | c2 | c3 -----------------------+-------+-------------------------------------------------------------------------------------------------- 2020-10-27 05:50:23+08 | 3 | 103 | 203 | 303 2020-10-27 05:50:22+08 | 2 | 102 | 202 | 302 2020-10-27 05:50:21+08 | 1 | 101 | 201 | 301
(3 rows)
### 7.2 MatrixGate loading json field example
### 7.2.1 json
- Create table
create table json_test(id int, j json);
- Создание файлов данных
`~/json.csv`
1|"{""a"":10, ""b"":""xyz""}"
- Загрузка
Здесь в качестве примера используется режим stdin, другие режимы аналогичны.
Ключевой момент — `--format csv`
mxgated \ --source stdin \ --db-database postgres \ --db-master-host 127.0.0.1 \ --db-master-port 7000 \ --db-user mxadmin \ --time-format raw \ --format csv \ --delimiter "|" \ --target json_test < ~/json.csv
- Просмотр загруженных данных
postgres=# select * from json_test; id | j ----+----------------------------------------------------------------------------------------------------------------------------- 1 | {"a":10, "b":"xyz"} (1 row)
### 7.2.2 Массив JSON
- Создание таблицы
create table json_array_test(id int, j _json);
- Создание файлов данных
`~/json_array.csv`
1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"
- Загрузка
mxgate \ --source stdin \ --db-database postgres \ --db-master-host 127.0.0.1 \ --db-master-port 7000 \ --db-user mxadmin \ --time-format raw \ --format csv \ --delimiter "|" \ --target json_array_test < ~/json_array.csv
- Проверка
postgres=# select * from json_array_test ; id | j ----+----------------------------------------------------------------------------------------------------------------------------- 1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"} (1 row)
> Примечание: Поскольку столбец JSON содержит специальные символы, такие как кавычки, параметр --format в mxgate должен быть установлен в csv