Сервер загрузки данных MatrixGate

MatrixGate, сокращённо mxgate — это высокопроизводительный сервер потоковой загрузки данных, расположенный в каталоге установки MatrixDB по пути bin/mxgate. В настоящее время MatrixGate поддерживает прием данных через интерфейсы HTTP и STDIN, а также форматы данных TEXT и CSV.

1 Принцип работы MatrixGate

Логика загрузки данных в MatrixGate показана ниже:

  1. Система сбора данных собирает данные с устройств или принимает данные, отправленные устройствами.
  2. Система сбора непрерывно отправляет данные в процесс службы MatrixGate (mxgate) пакетами малого размера с параллельной обработкой.
  3. Процесс mxgate эффективно взаимодействует с главным процессом MatrixDB для обмена информацией о транзакциях и управления.
  4. Данные напрямую передаются на сегментные узлы для параллельной высокоскоростной записи.

Архитектура MatrixGate

2 Использование MatrixGate

  • Создайте конфигурационный файл MatrixGate, указав целевую базу данных и таблицу:
mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf

Эта команда создает конфигурационный файл с именем mxgate.conf. Он позволяет пользователям настраивать поведение при загрузке данных для testtable и testtable2, а также поддерживает глобальные настройки по умолчанию для загрузки в другие таблицы.

  • При необходимости измените файл mxgate.conf (например, задайте разделители полей). Этот шаг можно пропустить, если используются значения по умолчанию. В созданный конфигурационный файл включены такие параметры:
    [[job.target]]
      # delimiter = "|"
      # exclude-columns = []
      # format = "text"
      name = "job_text_to_public.testtable"
      # null-as = ""
      table = "public.testtable"
      # time-format = "unix-second"
      # use-auto-increment = true

    [[job.target]]
      # delimiter = "|"
      # exclude-columns = []
      # format = "text"
      name = "job_text_to_public.testtable2"
      # null-as = ""
      table = "public.testtable2"
      # time-format = "unix-second"
      # use-auto-increment = true

Если testtable использует @ в качестве разделителя, а testtable2 использует %, соответствующим образом обновите конфигурацию:

    [[job.target]]
      delimiter = "@"
      # exclude-columns = []
      # format = "text"
      name = "job_text_to_public.testtable"
      # null-as = ""
      table = "public.testtable"
      # time-format = "unix-second"
      # use-auto-increment = true

    [[job.target]]
      delimiter = "%"
      # exclude-columns = []
      # format = "text"
      name = "job_text_to_public.testtable2"
      # null-as = ""
      table = "public.testtable2"
      # time-format = "unix-second"
      # use-auto-increment = true

По умолчанию mxgate прослушивает порт 8086. Это можно увидеть в параметре mxgate.conf внутри секции [source.http]:

[source]

  ## Source plugin is the data entrance to MatrixGate
  ## Types restricted to: http
  source = "http"

  [source.http]

    ## Port of http push
    # http-port = 8086

    ## Maximum request body size (after gzip)
    ## The server rejects requests with bodies exceeding this limit.
    # max-body-bytes = 4194304

    ## The maximum number of concurrent HTTP connections to the server
    ## The server response with 503 after exceed this limit.
    # max-concurrency = 40000

При необходимости вы можете изменить номер порта.

  • Запустите mxgate с конфигурационным файлом и подключитесь к базе данных demo, чтобы подготовиться к загрузке данных:
mxgate start --config mxgate.conf
  • Проверьте состояние фонового сервиса:
mxgate status
  • Остановите фоновый сервис:
mxgate stop

Для принудительной остановки в случае таймаута или других проблем:

mxgate stop --force

3 Параметры командной строки для MatrixGate

Параметр Значение по умолчанию Описание
--db-database postgres Имя базы данных, к которой подключается MatrixGate в MatrixDB
--db-master-host localhost hostname Имя хоста главного узла MatrixDB
--db-master-port 5432 Номер порта главного узла MatrixDB
--db-user текущее имя пользователя ОС Имя пользователя, используемое MatrixGate для подключения к MatrixDB
Примечание: этот пользователь должен иметь разрешение на создание внешних таблиц. Для несуперпользователей предоставьте права с помощью:
alter user {username} CREATEEXTTABLE;
--db-password пусто Пароль пользователя MatrixDB
--db-max-conn 10 Максимальное количество соединений от MatrixGate к MatrixDB
--interval 100 миллисекунд Интервал, с которым MatrixGate выполняет пакетную загрузку данных
--source http Тип источника данных; поддерживаются http и stdin
--target schemaName.tableName Целевая таблица. Указание схемы необязательно, по умолчанию используется public. Можно указать несколько таблиц с помощью нескольких опций --target. Если параметр не указан, используйте --allow-dynamic, чтобы разрешить динамическое определение таблицы на основе входящих данных.
--allow-dynamic false При установке в true включается динамическое сопоставление данных POST с целевыми таблицами на основе первой строки входных данных. Используйте только тогда, когда целевые таблицы неизвестны при запуске. Для фиксированных целей укажите явно с помощью --target.
--format text Формат входных данных: text или csv. Формат text быстрее, но не поддерживает символы новой строки внутри полей. Формат csv более гибкий; строковые поля должны быть заключены в двойные кавычки.
--delimiter | Символ, используемый для разделения столбцов в каждой строке
--null-as пустая строка Строка, представляющая значения NULL. По умолчанию — неэкранированная пустая строка. Если столбец имеет ограничение NOT NULL и получает значение null, загрузка завершится ошибкой. Чтобы использовать \N как null, экранируйте обратную косую черту: --null-as \\N
--time-format unix-second Единица измерения времени: unix-second, unix-ms, unix-nano или raw.
MatrixGate предполагает, что первый столбец — это временная метка Unix, и преобразует её в формат времени базы данных. Используйте raw, если временная метка не в первом столбце или уже отформатирована.
--use-auto-increment true Пропускать ли столбцы с автоинкрементом при загрузке и использовать системные значения
--exclude-columns пусто Список столбцов, исключаемых при загрузке. Оставшиеся столбцы должны соответствовать порядку определения таблицы. Столбцы с автоинкрементом, пропущенные через --use-auto-increment, здесь указывать не нужно.
--upsert-key пусто Ключ(и) для операций upsert.
Целевая таблица должна иметь уникальное ограничение, и все ключи ограничения должны быть указаны.
--help Отображение справки и списка параметров


4 API MatrixGate

MatrixGate предоставляет HTTP API, позволяющее загружать данные в MatrixDB из любого языка программирования.

Формат протокола HTTP MatrixGate

Тип протокола Формат Использование и пример
URL http://mxgate-host:port Адрес для подключения к mxgate
PATH / Поддерживается только корневой путь /; любые суффиксы игнорируются
HTTP-метод POST Поддерживается только метод POST
HTTP-заголовок Content-Encoding: gzip Поддерживает сжатие тела запроса с помощью gzip
Content-Type: text/plain Поддерживается только text/plain
Тело HTTP SchemaName.TableName
Timestamp\|ID\|C1\|C2\|...\|Cn
Первая строка указывает целевую таблицу. Указание схемы необязательно (по умолчанию public). Последующие строки содержат данные временных рядов. Каждая строка соответствует одной строке в целевой таблице, столбцы разделяются символом |, строки — символом \n. Первое поле — временная метка Unix в секундах (см. --time-format). Второе поле — TagID (целое число). Остальные поля соответствуют столбцам таблицы. Рекомендуется, чтобы DDL целевой таблицы следовал порядку столбцов: (Timestamp, TagID, C1, C2, ..., Cn)

Коды ответов HTTP API MatrixGate

Код ответа Значение Примечания
204 StatusNoContent Данные успешно загружены в MatrixGate
400 StatusBadRequest Некорректный запрос: например, неверный формат тела POST, таблица не найдена, несоответствие кодировки содержимого
405 StatusMethodNotAllowed Метод запроса не является POST
500 StatusInternalServerError Ошибка на стороне базы данных; загрузка не удалась. В теле ответа содержится подробное сообщение об ошибке
503 StatusServiceUnavailable MatrixGate отклоняет запрос: например, превышено максимальное количество соединений, сервис завершает работу

5 Пример использования HTTP API MatrixGate из командной строки

  • Создайте таблицу testtable в базе данных demo:
CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
  • Отредактируйте файл данных data.txt:
public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
  • Запустите mxgate с конфигурационным файлом:
mxgate --config mxgate.conf
  • Отправьте HTTP-запрос для загрузки данных:
curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
  • Выполните запрос к базе данных, чтобы проверить успешность загрузки:
demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
 date_part  |          time          | tagid | c1  | c2  | c3
------------+------------------------+-------+-----+-----+-----
 1603777821 | 2020-10-27 13:50:21+08 |     1 | 101 | 201 | 301
 1603777822 | 2020-10-27 13:50:22+08 |     2 | 102 | 202 | 302
 1603777823 | 2020-10-27 13:50:23+08 |     3 | 103 | 203 | 303
(3 rows)

6 Подключение к MatrixGate из языков программирования


6.1 Пример на Java для HTTP API MatrixGate

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

public class MxgateExample {
    public static void main(String[] args) throws Exception {
        MxgateExample http = new MxgateExample();
        http.sendingPostRequest();
    }

    // HTTP Post request
    private void sendingPostRequest() throws Exception {
        // mxgate listens on port 8086 of localhost
        String url = "http://localhost:8086/";
        URL obj = new URL(url);
        HttpURLConnection con = (HttpURLConnection) obj.openConnection();

        // Setting basic post request
        con.setRequestMethod("POST");
        con.setRequestProperty("Content-Type","text/plain");
        String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";

        con.setDoOutput(true);
        DataOutputStream wr = new DataOutputStream(con.getOutputStream());
        // Encode using UTF-8 if data contains Chinese characters
        wr.write(postJsonData.toString().getBytes("UTF-8"));
        wr.flush();
        wr.close();

        int responseCode = con.getResponseCode();
        System.out.println("Sending 'POST' request to URL : " + url);
        System.out.println("Post Data : " + postJsonData);
        System.out.println("Response Code : " + responseCode);

        BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
        String output;
        StringBuffer response = new StringBuffer();

        while ((output = in.readLine()) != null) {
            response.append(output);
        }
        in.close();

        System.out.println(response.toString());
    }
}


6.2 Пример на Python для HTTP API MatrixGate

import http.client

class MxgateExample(object):
    def __init__(self):
        # mxgate listens on port 8086 of localhost
        self.url = "localhost:8086"

        self.postData = "public.testtable\n/" \
                        "1603777821|1|101|201|301\n/" \
                        "1603777822|2|102|202|302\n/" \
                        "1603777823|3|103|203|303"
        self.headers = {"Content-Type": "text/plain"}

    # HTTP Post request
    def sending_post_request(self):

        conn = http.client.HTTPConnection(self.url)
        conn.request("POST", "/", self.postData, self.headers)

        response = conn.getresponse()
        response_code = response.getcode()
        print(f"Sending 'POST' request to URL : {self.url}")
        print(f"Post Data : {self.postData}")
        print(f"Response Code : {response_code}")

        output = response.read()
        print(output)


if __name__ == '__main__':
    gate_post = MxgateExample()
    gate_post.sending_post_request()


6.3 Пример на C# для HTTP API MatrixGate

Рекомендуется разработка в среде C# Core

using System;
using System.IO;
using System.Net;
using System.Text;

namespace HttpPostTest
{
class Program
    {
        static void Main(string[] args)
        {
            var url = "http://10.13.2.177:8086/";
            var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";

           HttpPost(url,txt);
        }

public static string HttpPost(string url, string content){
    string result = "";
    HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url);
    req.Method = "POST";
    req.ContentType = "text/plain";

    #region Add Post Parameters
    byte[] data = Encoding.UTF8.GetBytes(content);
    req.ContentLength = data.Length;
    using (Stream reqStream = req.GetRequestStream()){
        reqStream.Write(data, 0, data.Length);
        reqStream.Close();
    }
    #endregion

    HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
    Stream stream = resp.GetResponseStream();
    // Get response content
    using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
        result = reader.ReadToEnd();
        }
        return result;
    }

  }
}

Если возникает ошибка "error when serving connection ***** body size exceeds the given limit", увеличьте значение max-body-bytes в mxgate.conf.

6.4 Пример на Golang для HTTP API MatrixGate

package main

import (
    "bytes"
    "net/http"
)

func PostDataToServer(URL string) error {
    data := `public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
`
    resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
    if err != nil {
        return err
    }
    if resp.StatusCode != 200 {
        // Обработка тела ответа
        return nil
    }

    // Обработка тела ответа
    return nil
}

func main()  {
    err := PostDataToServer("http://127.0.0.1:8086")
    if err != nil{
        panic(err)
    }

}

7 Загрузка специальных типов данных с помощью MatrixGate

7.1 Пример: загрузка CSV-файлов через MatrixGate

  • Создайте таблицу csvtable в базе данных demo:
CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
  • Отредактируйте файл данных data.csv:
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
  • Запустите mxgate с источником stdin, направляя данные в существующую таблицу csvtable, с уровнем параллелизма 2:
mxgate \
  --source stdin \
  --db-database demo \
  --db-master-host 127.0.0.1 \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format unix-second \
  --delimiter "|" \
  --target csvtable \
  --parallel 2 < data.csv
  • Выполните запрос к базе данных, чтобы проверить успешность загрузки:
demo=# SELECT * FROM csvtable ;
          time          | tagid | c1  | c2  | c3
------------------------+-------+-----+-----+-----
 2020-10-27 05:50:23+08 |     3 | 103 | 203 | 303
 2020-10-27 05:50:22+08 |     2 | 102 | 202 | 302
 2020-10-27 05:50:21+08 |     1 | 101 | 201 | 301
(3 rows)

7.2 Пример: загрузка полей JSON

7.2.1 JSON

  • Создайте таблицу:
create table json_test(id int, j json);
  • Создайте файл данных:
1|"{""a"":10, ""b"":""xyz""}"
  • Загрузите данные
    В качестве примера используется режим stdin (другие режимы аналогичны):
    Ключевой момент: используйте --format csv из-за наличия специальных символов в JSON
mxgated \
  --source stdin \
  --db-database postgres \
  --db-master-host 127.0.0.1 \
  --db-master-port 7000 \
  --db-user mxadmin \
  --time-format raw \
  --format csv \
  --delimiter "|" \
  --target json_test < ~/json.csv
  • Проверьте загруженные данные:
postgres=# select * from json_test;
 id |           j
----+-----------------------
  1 | {"a":10, "b":"xyz"}
(1 row)

7.2.2 Массивы JSON

  • Создайте таблицу:
create table json_array_test(id int, j _json);
  • Создайте файл данных:
1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"
  • Загрузите данные:
mxgate \
  --source stdin \
  --db-database postgres \
  --db-master-host 127.0.0.1 \
  --db-master-port 7000 \
  --db-user mxadmin \
  --time-format raw \
  --format csv \
  --delimiter "|" \
  --target json_array_test < ~/json_array.csv
  • Проверьте результат:
postgres=# select * from json_array_test ;
 id |                      j
----+---------------------------------------------
  1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"}
(1 row)

Примечание: поскольку столбцы JSON содержат кавычки и другие специальные символы, параметр --format должен быть установлен в значение csv.