Сервер загрузки данных MatrixGate

MatrixGate сокращённо называется mxgate. Это высокопроизводительный сервер потоковой загрузки данных, расположенный в каталоге bin/mxgate установочного каталога MatrixDB. В настоящее время MatrixGate предоставляет интерфейсы HTTP и STDIN для загрузки данных, форматы данных поддерживают TEXT и CSV.

1 Принцип работы MatrixGate

Логика загрузки данных через MatrixGate показана на рисунке ниже:

  1. Система сбора данных собирает данные устройств или принимает данные, отправленные устройствами
  2. Система сбора непрерывно передаёт данные в процесс службы mxgate MatrixGate в режиме параллельных микропакетов
  3. Процесс mxgate эффективно взаимодействует с основным процессом MatrixDB, обмениваясь информацией о транзакциях и управлении
  4. Данные напрямую отправляются на сегментные узлы и параллельно записываются с высокой скоростью.

Схема MatrixGate

2 Использование MatrixGate

  • Укажите целевую базу данных и целевую таблицу для генерации конфигурационного файла mxgate

    mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf

    Вышеприведённые параметры создадут конфигурационный файл mxgate.conf, позволяя пользователям настраивать загрузку данных в testtable и testtable2, а также использовать глобальные настройки по умолчанию для загрузки данных в другие таблицы.

  • При необходимости измените конфигурационный файл mxgate, например, задайте разделитель данных и т.д., шаг можно пропустить, если используются настройки по умолчанию. Ниже приведены параметры для testtable и testtable2 из этого файла конфигурации:

      [[job.target]]
        # delimiter = "|"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable"
        # null-as = ""
        table = "public.testtable"
        # time-format = "unix-second"
        # use-auto-increment = true
    
      [[job.target]]
        # delimiter = "|"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable2"
        # null-as = ""
        table = "public.testtable2"
        # time-format = "unix-second"
        # use-auto-increment = true

    Если разделителем для testtable является @, а для testtable2 — %, то вышеуказанную конфигурацию можно изменить следующим образом:

      [[job.target]]
        delimiter = "@"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable"
        # null-as = ""
        table = "public.testtable"
        # time-format = "unix-second"
        # use-auto-increment = true
    
      [[job.target]]
        delimiter = "%"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable2"
        # null-as = ""
        table = "public.testtable2"
        # time-format = "unix-second"
        # use-auto-increment = true

    По умолчанию mxgate прослушивает порт 8086 для приёма данных. В файле mxgate.conf можно увидеть, что подпункт http-port в source.http установлен в 8086. При необходимости его можно изменить на другой порт:

    [source]
    
    ## Source plugin is the data entrance to MatrixGate
    ## Types restricted to: http
    source = "http"
    
    [source.http]
    
      ## Port of http push
      # http-port = 8086
    
      ## Maximum request body size (after gzip)
      ## The server rejects requests with bodies exceeding this limit.
      # max-body-bytes = 4194304
    
      ## The maximum number of concurrent HTTP connections to the server
      ## The server response with 503 after exceed this limit.
      # max-concurrency = 40000
  • Запустите mxgate, загрузите конфигурационный файл, подключитесь к базе данных demo и подготовьтесь к приёму запросов на загрузку данных

    mxgate start --config mxgate.conf
  • Проверьте состояние фонового сервиса

    mxgate status
  • Остановите фоновый сервис

    mxgate stop

    При возникновении таймаута или других проблем необходимо принудительно остановить процесс, это можно сделать так:

    mxgate stop --force

3 Подробное описание параметров командной строки MatrixGate

Имя параметра Значение параметра Описание параметра
[database] категория
--db-database по умолчанию postgres Имя базы данных MatrixDB, к которой подключается MatrixGate
--db-master-host по умолчанию имя локального хоста Имя хоста MatrixDB, к которому подключается MatrixGate
--db-master-port по умолчанию 5432 Порт хоста MatrixDB, к которому подключается MatrixGate
--db-user по умолчанию текущее имя пользователя системы Имя пользователя для подключения MatrixGate к MatrixDB
Примечание: этот пользователь должен иметь права на создание внешних таблиц. Если используется не суперпользователь, добавьте права с помощью следующей команды:
alter user {username} CREATEEXTTABLE;
--db-password по умолчанию пусто Пароль пользователя MatrixDB для подключения MatrixGate
--db-max-conn по умолчанию 10 Максимальное количество соединений MatrixGate с MatrixDB
[job] категория
--allow-dynamic по умолчанию false При указании --allow-dynamic=true разрешается динамически определять целевую таблицу на основе содержимого данных POST (первая строка). Эта опция должна использоваться только в тех случаях, когда имя целевой таблицы заранее неизвестно при запуске MatrixGate. Если вставка осуществляется в конкретную известную таблицу, рекомендуется явно указать имя таблицы с помощью параметра --target
--delimiter по умолчанию | Символ, используемый для разделения столбцов в каждой строке данных
--error-handling по умолчанию 'accurate' Способ обработки строк с ошибками формата
'accurate': ошибочные данные не попадают в БД, но записываются в журнал ошибок; остальные данные в пакете загружаются нормально
'legacy': весь пакет отклоняется при наличии ошибки
--exclude-columns по умолчанию пусто По умолчанию количество и порядок столбцов в загружаемых данных должны соответствовать определению таблицы. Когда загружаются только некоторые столбцы, --exclude-columns указывает имена исключаемых столбцов, остальные столбцы должны сохранять порядок, как в определении таблицы. Примечание: если уже включен --use-auto-increment для пропуска автоинкрементных полей, их не нужно перечислять здесь. Этот параметр нужен только для указания других столбцов, которые следует исключить
--format по умолчанию text Формат исходных данных: text или csv. Формат text самый быстрый, но не поддерживает переводы строк внутри текстовых значений. Формат csv более универсален, текстовые поля должны быть заключены в двойные кавычки.
--null-as по умолчанию пустая строка Строка, обозначающая значение NULL. По умолчанию — пустая строка без кавычек. Если столбец таблицы имеет ограничение NOT NULL, а в данных указано пустое значение, произойдёт ошибка загрузки. Примечание: чтобы использовать \N как NULL, необходимо экранировать обратный слэш, например: --null-as \N
--time-format по умолчанию unix-second Единица измерения временной метки: unix-second|unix-ms|unix-nano|raw.
MatrixGate по умолчанию считает первый столбец каждой строки временной меткой в формате Unix и автоматически преобразует её в формат времени базы данных. Если временная метка не в первом столбце или уже преобразована в формат БД, используйте raw — тогда MatrixGate не будет выполнять преобразование типа времени.
--upsert-key по умолчанию пусто Ключ(и) для операции upsert.
Таблица, для которой выполняется upsert, должна иметь UNIQUE-ограничение, и все ключи ограничения должны быть указаны в параметре.
--deduplicate-key по умолчанию пусто Аналогично upsert, но обновляет только значения NULL. Если старое значение не NULL, новое игнорируется.
Взаимоисключающий с --upsert-key, можно использовать только один из них.
--use-auto-increment по умолчанию true Пропускать ли автоинкрементные поля при загрузке данных и использовать системные значения по умолчанию, если целевая таблица содержит такие поля
--target schemaName.tableName Целевая таблица. schemaName можно опустить, по умолчанию public. Можно указать несколько таблиц: "--target таблица1 --target таблица2 …". Если параметр не указан, можно использовать --allow-dynamic для динамического определения имени таблицы.
[misc] категория
--log-archive-hours по умолчанию 72 В каталоге логов файлы matrixgate, не изменявшиеся более заданного времени, автоматически сжимаются
--log-compress по умолчанию true Глобальный переключатель автоматического сжатия логов
--log-dir по умолчанию /home/mxadmin/gpAdminLogs Каталог логов
--log-max-archive-files по умолчанию 0 Максимальное количество архивных файлов логов. При превышении самые старые файлы удаляются. 0 — не удалять
--log-remove-after-days по умолчанию 0 Через сколько дней после сжатия файлы логов будут удалены. 0 — не удалять
--log-rotate-size-mb по умолчанию 100 Текущий файл лога автоматически заменяется новым при превышении указанного размера, старый файл немедленно сжимается
[source] категория
--source по умолчанию http Источник данных для MatrixGate: поддерживаются http, stdin, kafka, transfer
[source] категория [http]
--http-port по умолчанию 8086 HTTP-интерфейс для отправки данных пользователями в MatrixGate
--max-body-bytes по умолчанию 4194304 Максимальный размер тела каждого HTTP-запроса
--max-concurrency по умолчанию 40000 Максимальное количество одновременных HTTP-соединений
--request-timeout по умолчанию 0 Таймаут запроса, по умолчанию 0 — бесконечное ожидание. При значении >0 запрос завершится по истечении указанного количества миллисекунд с кодом HTTP 408.
--disable-keep-alive по умолчанию false Принудительное разъединение после каждого HTTP-запроса
--http-debug по умолчанию false Вывод дополнительной диагностической информации HTTP
[source] категория [transfer]
--src-host IP-адрес master-узла исходной базы данных
--src-port Порт master-узла исходной базы данных
--src-user Имя пользователя для подключения к исходной базе данных (рекомендуется использовать суперпользователя)
--src-password Пароль подключения
--src-schema Имя схемы исходной таблицы
--src-table Имя исходной таблицы
--src-sql SQL-фильтр для миграции данных
--compress Метод передачи данных с сегментов исходной БД:
пустая строка "" — без сжатия, передача в открытом виде
gzip — сжатие gzip, требует наличия команды gzip на сегментах исходной БД
lz4 — сжатие lz4, требует наличия команды lz4 на сегментах исходной БД
рекомендуется: lz4 > gzip > без сжатия
--port-base Диапазон портов, используемых при передаче: 9129~
--local-ip Необходимо указать IP-адрес, по которому исходная БД может подключиться к этому хосту
[writer] категория
--interval по умолчанию 100 мс Период выполнения пакетной загрузки данных MatrixGate
--stream-prepared по умолчанию 10 Степень параллелизма рабочих процессов вставки
--use-gzip по умолчанию 'auto' Включать ли сжатие при отправке данных в segment, возможные значения: auto/yes/no
--max-seg-conn по умолчанию 128 Количество сегментов, запускаемых при выборке данных из MatrixGate через внешнюю таблицу. Увеличение параметра повышает нагрузку на сетевые соединения
--timing по умолчанию false При включении в логах MatrixGate для каждой операции INSERT будет указываться время выполнения
--insert-timeout по умолчанию 0 Таймаут выполнения оператора INSERT в MatrixGate, по умолчанию 0 — бесконечное ожидание.
При значении >0 операция завершится по истечении указанного количества миллисекунд.
Другие
--help Отображение справки и списка параметров


4 API MatrixGate

MatrixGate предоставляет HTTP API, позволяя языкам программирования импортировать данные в базу данных MatrixDB через HTTP-интерфейс.

Формат протокола HTTP MatrixGate

Тип протокола Формат протокола Использование и примеры
URL http://mxgate-host:port Указание адреса подключения к mxgate
PATH / Поддерживаются /, любые пути после / игнорируются
HTTP Method POST Поддерживается только метод POST для загрузки данных
HTTP Header Content-Encoding: gzip Поддерживается сжатие содержимого тела HTTP с помощью gzip
Content-Type: text/plain Поддерживается text/plain
HTTP Body SchemaName.TableName
Timestamp|ID|C1|C2|..|Cn
Первая строка тела — целевая таблица для загрузки данных. SchemaName можно опустить, по умолчанию public. TableName — обязательный параметр. Начиная со второй строки — строки временных рядов. Каждая строка соответствует строке целевой таблицы. Между столбцами используется разделитель |, между строками — \n. Первое поле каждой строки — временная метка в формате Unix (в секундах), см. описание --time-format. Второе поле каждой строки — TagID (целое число). С третьего до последнего поля — столбцы, соответствующие целевой таблице. Рекомендуется, чтобы DDL-определение целевой таблицы также следовало порядку столбцов (Timestamp, TagID, C1, C2,…, Cn)

Коды ответов HTTP MatrixGate

Код ответа Значение кода ответа Примечания
200 StatusOK Некоторые данные имеют неверный формат, в теле ответа будет указаны строки с ошибками и сообщениями, например:
At line: 2
missing data for column "c3"
204 StatusNoContent Данные успешно загружены в MatrixGate
400 StatusBadRequest Ошибка запроса данных, например: ошибка формата тела POST, таблица не существует, формат сжатия данных не соответствует заголовку HTTP-запроса и т.д.
405 StatusMethodNotAllowed HTTP-запрос не POST
408 StatusTimeout Таймаут запроса
500 StatusIntervalServerError Ошибка базы данных, загрузка данных не удалась, в теле ответа содержится подробная информация об ошибке
503 StatusServiceUnavailable MatrixGate отклоняет запросы, например: превышено максимальное количество соединений, MatrixGate завершает работу и т.д.

5 Пример командной строки для HTTP API MatrixGate

  • Создайте таблицу testtable в базе данных demo
    CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
    DISTRIBUTED BY (tagid);
  • Отредактируйте файл загрузки данных data.txt, содержимое следующее
    public.testtable
    1603777821|1|101|201|301
    1603777822|2|102|202|302
    1603777823|3|103|203|303
  • Запустите mxgate и укажите сгенерированный конфигурационный файл mxgate.conf
    mxgate --config mxgate.conf
  • Отправьте HTTP-запрос для загрузки данных
    curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
  • Подключитесь к базе данных и проверьте успешность загрузки данных
    demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
    date_part  |          time          | tagid | c1  | c2  | c3
    ----------------------------------------------------------------------------------------------------------------------------------
    1603777821 | 2020-10-27 13:50:21+08 |     1 | 101 | 201 | 301
    1603777822 | 2020-10-27 13:50:22+08 |     2 | 102 | 202 | 302
    1603777823 | 2020-10-27 13:50:23+08 |     3 | 103 | 203 | 303
    (3 rows)

6 Подключение языков программирования к MatrixGate


6.1 Пример HTTP API MatrixGate на Java

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

public class MxgateExample {
    public static void main(String[] args) throws Exception {
        MxgateExample http = new MxgateExample();
        http.sendingPostRequest();
    }

    // HTTP Post request
    private void sendingPostRequest() throws Exception {
        // mxgate listens on port 8086 of localhost
        String url = "http://localhost:8086/";
        URL obj = new URL(url);
        HttpURLConnection con = (HttpURLConnection) obj.openConnection();

        // Setting basic post request
        con.setRequestMethod("POST");
        con.setRequestProperty("Content-Type","text/plain");
        String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";

        con.setDoOutput(true);
        DataOutputStream wr = new DataOutputStream(con.getOutputStream());
        // When the data is in Chinese, it can be encoded by postJsonData.getBytes("UTF-8")
        wr.write(postJsonData.toString().getBytes("UTF-8"));
        wr.flush();
        wr.close();

        int responseCode = con.getResponseCode();
        System.out.println("Sending 'POST' request to URL : " + url);
        System.out.println("Post Data : " + postJsonData);
        System.out.println("Response Code : " + responseCode);

        BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
        String output;
        StringBuffer response = new StringBuffer();

        while ((output = in.readLine()) != null) {
            response.append(output);
        }
        in.close();

        System.out.println(response.toString());
    }
}


6.2 Пример HTTP API MatrixGate на Python

import http.client

class MxgateExample(object):
    def __init__(self):
        # mxgate listens on port 8086 of localhost
        self.url = "localhost:8086"

        self.postData = "public.testtable\n/" \
                        "1603777821|1|101|201|301\n/" \
                        "1603777822|2|102|202|302\n/" \
                        "1603777823|3|103|203|303"
        self.headers = {"Content-Type": "text/plain"}

    # HTTP Post request
    def sending_post_request(self):

        conn = http.client.HTTPConnection(self.url)
        conn.request("POST", "/", self.postData, self.headers)

        response = conn.getresponse()
        response_code = response.getcode()
        print(f"Sending 'POST' request to URL : {self.url}")
        print(f"Post Data : {self.postData}")
        print(f"Response Code : {response_code}")

        output = response.read()
        print(output)


if __name__ == '__main__':
    gate_post = MxgateExample()
    gate_post.sending_post_request()


6.3 Пример HTTP API MatrixGate на C

Рекомендуется использовать среду разработки C# Core


using System;
using System.IO;
using System.Net;
using System.Text;

namespace HttpPostTest { class Program { static void Main(string[] args) { var url = "http://10.13.2.177:8086/"; var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";

       HttpPost(url,txt);
    }

public static string HttpPost(string url, string content){ string result = ""; HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url); req.Method = "POST"; req.ContentType = "text/plain";

#region Add Post parameters
byte[] data = Encoding.UTF8.GetBytes(content);
req.ContentLength = data.Length;
using (Stream reqStream = req.GetRequestStream()){
    reqStream.Write(data, 0, data.Length);
    reqStream.Close();
}
#endregion

HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
Stream stream = resp.GetResponseStream();
//Get the response content
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
    result = reader.ReadToEnd();
    }
    return result;
}

} }

> Если возникает ошибка "connection ***** body size exceeds the given limit", увеличьте параметр max-body-bytes в файле mxgate.conf

### 6.4 Пример HTTP API MatrixGate на Golang

package main

import ( "bytes" "net/http" )

func PostDataToServer(URL string) error { data := public.testtable 1603777821|1|101|201|301 1603777822|2|102|202|302 1603777823|3|103|203|303 resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data))) if err != nil { return err } if resp.StatusCode != 200 { // Обработка тела ответа. return nil }

// Обработка тела ответа.
return nil

}

func main() { err := PostDataToServer("http://127.0.0.1:8086") if err != nil{ panic(err) }

}

## 7 MatrixGate Loading Special Types
### 7.1 MatrixGate loading CSV file example
- Create table csvtable in demo database

CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT) DISTRIBUTED BY (tagid);

- Edit the data load file data.csv, the content is as follows

1603777821|1|101|201|301 1603777822|2|102|202|302 1603777823|3|103|203|303

- Start mxgate, specify the source parameter to stdin, the target table is an existing csvtable, and the loading parallelism is 2

mxgate \ --source stdin \ --db-database demo \ --db-master-host 127.0.0.1 \ --db-master-port 5432 \ --db-user mxadmin \ --time-format unix-second \ --delimiter "|" \ --target csvtable \ --parallel 2 < data.csv

- Connect to the database to query whether the data is loaded successfully

demo=# SELECT * FROM csvtable ; time | tagid | c1 | c2 | c3 -----------------------+-------+-------------------------------------------------------------------------------------------------- 2020-10-27 05:50:23+08 | 3 | 103 | 203 | 303 2020-10-27 05:50:22+08 | 2 | 102 | 202 | 302 2020-10-27 05:50:21+08 | 1 | 101 | 201 | 301

(3 rows)

### 7.2 MatrixGate loading json field example

### 7.2.1 json
- Create table

create table json_test(id int, j json);

- Создание файлов данных
`~/json.csv`

1|"{""a"":10, ""b"":""xyz""}"

- Загрузка
Здесь в качестве примера используется режим stdin, другие режимы аналогичны.
Ключевой момент — `--format csv`

mxgated \ --source stdin \ --db-database postgres \ --db-master-host 127.0.0.1 \ --db-master-port 7000 \ --db-user mxadmin \ --time-format raw \ --format csv \ --delimiter "|" \ --target json_test < ~/json.csv

- Просмотр загруженных данных

postgres=# select * from json_test; id | j ----+----------------------------------------------------------------------------------------------------------------------------- 1 | {"a":10, "b":"xyz"} (1 row)

### 7.2.2 Массив JSON

- Создание таблицы

create table json_array_test(id int, j _json);

- Создание файлов данных
`~/json_array.csv`

1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"

- Загрузка

mxgate \ --source stdin \ --db-database postgres \ --db-master-host 127.0.0.1 \ --db-master-port 7000 \ --db-user mxadmin \ --time-format raw \ --format csv \ --delimiter "|" \ --target json_array_test < ~/json_array.csv

- Проверка

postgres=# select * from json_array_test ; id | j ----+----------------------------------------------------------------------------------------------------------------------------- 1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"} (1 row)


> Примечание: Поскольку столбец JSON содержит специальные символы, такие как кавычки, параметр --format в mxgate должен быть установлен в csv