Сервер загрузки данных MatrixGate

MatrixGate, сокращённо mxgate — это высокопроизводительный сервер потоковой загрузки данных, расположенный в каталоге bin/mxgate установки MatrixDB. В настоящее время MatrixGate поддерживает прием данных через интерфейсы HTTP и STDIN, а также форматы данных TEXT и CSV.

1 Принцип работы MatrixGate

Логика загрузки данных в MatrixGate показана ниже:

  1. Система сбора данных собирает данные с устройств или принимает данные, отправленные устройствами.
  2. Система сбора непрерывно передаёт данные процессу службы MatrixGate (mxgate) в виде параллельных микропакетов.
  3. Процесс mxgate эффективно взаимодействует с мастер-процессом MatrixDB для обмена информацией о транзакциях и управления.
  4. Данные напрямую передаются на сегменты и параллельно записываются с высокой скоростью.

Архитектура MatrixGate

2 Использование MatrixGate

  • Создайте конфигурационный файл, указав целевую базу данных и таблицу: ``
    mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf

    ``

Эта команда создаст конфигурационный файл с именем mxgate.conf. Он позволяет пользователям настраивать поведение загрузки данных для testtable и testtable2, а также поддерживает глобальные параметры по умолчанию для загрузки в другие таблицы.

  • При необходимости измените файл mxgate.conf (например, задайте разделитель полей). Этот шаг можно пропустить, если используются значения по умолчанию. В сгенерированной конфигурации содержатся записи следующего вида:

``

    [[job.target]]
      # delimiter = "|"
      # exclude-columns = []
      # format = "text"
      name = "job_text_to_public.testtable"
      # null-as = ""
      table = "public.testtable"
      # time-format = "unix-second"
      # use-auto-increment = true

    [[job.target]]
      # delimiter = "|"
      # exclude-columns = []
      # format = "text"
      name = "job_text_to_public.testtable2"
      # null-as = ""
      table = "public.testtable2"
      # time-format = "unix-second"
      # use-auto-increment = true

``

Если в testtable используется разделитель @, а в testtable2%, соответствующим образом обновите конфигурацию:

``

    [[job.target]]
      delimiter = "@"
      # exclude-columns = []
      # format = "text"
      name = "job_text_to_public.testtable"
      # null-as = ""
      table = "public.testtable"
      # time-format = "unix-second"
      # use-auto-increment = true

    [[job.target]]
      delimiter = "%"
      # exclude-columns = []
      # format = "text"
      name = "job_text_to_public.testtable2"
      # null-as = ""
      table = "public.testtable2"
      # time-format = "unix-second"
      # use-auto-increment = true

``

По умолчанию mxgate прослушивает порт 8086. Это значение указано в параметре mxgate.conf внутри секции source.http.http-port. Его можно изменить при необходимости:

``

[source]

  ## Source plugin is the data entrance to MatrixGate
  ## Types restricted to: http
  source = "http"

  [source.http]

    ## Port of http push
    # http-port = 8086

    ## Maximum request body size (after gzip)
    ## The server rejects requests with bodies exceeding this limit.
    # max-body-bytes = 4194304

    ## The maximum number of concurrent HTTP connections to the server
    ## The server response with 503 after exceed this limit.
    # max-concurrency = 40000

``

  • Запустите mxgate с конфигурационным файлом, подключитесь к базе данных demo и подготовьтесь к приему данных:

``

mxgate start --config mxgate.conf

``

  • Проверьте состояние службы:

``

mxgate status

``

  • Остановите фоновую службу:

``

mxgate stop

``

Для принудительной остановки в случае таймаута или других проблем:

``

mxgate stop --force

``

3 Параметры командной строки для MatrixGate

Параметр Значение по умолчанию Описание
--db-database postgres Имя базы данных, к которой подключается MatrixGate в MatrixDB
--db-master-host localhost Имя хоста мастер-узла MatrixDB
--db-master-port 5432 Номер порта мастер-узла MatrixDB
--db-user текущий пользователь ОС Имя пользователя, используемое MatrixGate для подключения к MatrixDB
Примечание: этот пользователь должен иметь разрешение на создание внешних таблиц. Для несуперпользователей предоставьте права с помощью:
alter user {username} CREATEEXTTABLE;
--db-password пусто Пароль пользователя MatrixDB
--db-max-conn 10 Максимальное количество соединений от MatrixGate к MatrixDB
--interval 100 мс Интервал (в миллисекундах) выполнения пакетной загрузки данных
--source http Тип источника данных; поддерживает http и stdin
--target schemaName.tableName Имя целевой таблицы. Имя схемы необязательно и по умолчанию равно public. Можно указать несколько таблиц с помощью нескольких опций --target. Если не указано, используйте --allow-dynamic для включения динамического определения таблицы
--allow-dynamic false При значении true включается динамическое сопоставление данных POST с целевыми таблицами на основе первой строки входных данных. Используйте только тогда, когда целевые таблицы неизвестны при запуске. Для фиксированных целей укажите явно с помощью --target
--format text Формат входных данных: text или csv. text работает быстрее, но не допускает переводов строк внутри полей. csv более гибкий; строковые поля должны быть заключены в двойные кавычки
--delimiter | Символ-разделитель полей в каждой строке
--null-as пустая строка Строковое представление значений NULL. По умолчанию — не заключённая в кавычки пустая строка. Для \N экранируйте обратную косую черту: --null-as \\N
--time-format unix-second Единица измерения времени: unix-second, unix-ms, unix-nano или raw. По умолчанию MatrixGate рассматривает первый столбец как временную метку Unix. Используйте raw, если временная метка не в первом столбце или уже имеет формат БД
--use-auto-increment true Пропускать ли столбцы с автоинкрементом при загрузке и использовать системные значения
--exclude-columns пусто Список имён столбцов, исключаемых при загрузке. Оставшиеся столбцы должны соответствовать порядку определения таблицы. Столбцы с автоинкрементом, пропущенные через --use-auto-increment, здесь указывать не нужно
--help Отобразить справку и список параметров


4 API MatrixGate

MatrixGate предоставляет HTTP API, позволяя загружать данные в MatrixDB из любого языка программирования по протоколу HTTP.

Формат протокола HTTP MatrixGate

Элемент протокола Формат Использование и пример
URL http://mxgate-host:port Адрес для подключения к mxgate
PATH / Поддерживается только корневой путь /; любые суффиксы игнорируются
HTTP-метод POST Поддерживается только метод POST для загрузки данных
HTTP-заголовок Content-Encoding: gzip Поддерживает сжатие тела запроса с помощью gzip
Content-Type: text/plain Поддерживается только тип содержимого text/plain
Тело HTTP SchemaName.TableName
Timestamp\|ID\|C1\|C2\|...\|Cn
Первая строка указывает целевую таблицу (SchemaName необязательно, по умолчанию public). Последующие строки содержат данные временных рядов. Каждая строка соответствует одной строке в целевой таблице. Столбцы разделяются символом |, строки — \n. Первое поле — временная метка Unix (см. --time-format). Второе поле — TagID (целое число). Остальные поля сопоставляются со столбцами таблицы. Рекомендуется, чтобы DDL целевой таблицы соответствовал порядку столбцов (Timestamp, TagID, C1, C2, ..., Cn)

Коды ответов HTTP API MatrixGate

Код Значение Примечания
204 StatusNoContent Данные успешно загружены в MatrixGate
400 StatusBadRequest Некорректный запрос: например, неверный формат тела POST, таблица не найдена, несоответствие кодировки содержимого
405 StatusMethodNotAllowed Использован метод HTTP, отличный от POST
500 StatusInternalServerError Ошибка на стороне базы данных; загрузка не удалась. Тело ответа содержит подробное сообщение об ошибке
503 StatusServiceUnavailable Запрос отклонён: например, превышено максимальное количество соединений или MatrixGate завершает работу

5 Пример использования HTTP API MatrixGate из командной строки

  • Создайте таблицу testtable в базе данных demo:

``

CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);

``

  • Отредактируйте файл данных data.txt:

``

public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303

``

  • Запустите mxgate с конфигурационным файлом:

``

mxgate --config mxgate.conf

``

  • Отправьте HTTP-запрос для загрузки данных:

``

curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"

``

  • Выполните запрос к базе данных, чтобы проверить успешность загрузки:

``

demo=# SELECT * FROM testtable ;
          time          | tagid | c1  | c2  | c3
------------------------+-------+-----+-----+-----
 2020-10-27 05:50:23+08 |     3 | 103 | 203 | 303
 2020-10-27 05:50:22+08 |     2 | 102 | 202 | 302
 2020-10-27 05:50:21+08 |     1 | 101 | 201 | 301

 (3 rows)

``

6 Подключение к MatrixGate из языков программирования


6.1 Пример на Java для HTTP API MatrixGate

``

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

public class MxgateExample {
    public static void main(String[] args) throws Exception {
        MxgateExample http = new MxgateExample();
        http.sendingPostRequest();
    }

    // HTTP Post request
    private void sendingPostRequest() throws Exception {
        // mxgate listens on port 8086 of localhost
        String url = "http://localhost:8086/";
        URL obj = new URL(url);
        HttpURLConnection con = (HttpURLConnection) obj.openConnection();

        // Setting basic post request
        con.setRequestMethod("POST");
        con.setRequestProperty("Content-Type","text/plain");
        String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";

        con.setDoOutput(true);
        DataOutputStream wr = new DataOutputStream(con.getOutputStream());
        // Encode using UTF-8 if data contains Chinese characters
        wr.write(postJsonData.toString().getBytes("UTF-8"));
        wr.flush();
        wr.close();

        int responseCode = con.getResponseCode();
        System.out.println("Sending 'POST' request to URL : " + url);
        System.out.println("Post Data : " + postJsonData);
        System.out.println("Response Code : " + responseCode);

        BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
        String output;
        StringBuffer response = new StringBuffer();

        while ((output = in.readLine()) != null) {
            response.append(output);
        }
        in.close();

        System.out.println(response.toString());
    }
}

``


6.2 Пример на Python для HTTP API MatrixGate

``

import http.client

class MxgateExample(object):
    def __init__(self):
        # mxgate listens on port 8086 of localhost
        self.url = "localhost:8086"

        self.postData = "public.testtable\n/" \
                        "1603777821|1|101|201|301\n/" \
                        "1603777822|2|102|202|302\n/" \
                        "1603777823|3|103|203|303"
        self.headers = {"Content-Type": "text/plain"}

    # HTTP Post request
    def sending_post_request(self):

        conn = http.client.HTTPConnection(self.url)
        conn.request("POST", "/", self.postData, self.headers)

        response = conn.getresponse()
        response_code = response.getcode()
        print(f"Sending 'POST' request to URL : {self.url}")
        print(f"Post Data : {self.postData}")
        print(f"Response Code : {response_code}")

        output = response.read()
        print(output)


if __name__ == '__main__':
    gate_post = MxgateExample()
    gate_post.sending_post_request()

``


6.3 Пример на C# для HTTP API MatrixGate

Рекомендуется разработка в среде C# Core

``

using System;
using System.IO;
using System.Net;
using System.Text;

namespace HttpPostTest
{
class Program
    {
        static void Main(string[] args)
        {
            var url = "http://10.13.2.177:8086/";
            var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";

           HttpPost(url,txt);
        }

public static string HttpPost(string url, string content){
    string result = "";
    HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url);
    req.Method = "POST";
    req.ContentType = "text/plain";

    #region Add Post Parameters
    byte[] data = Encoding.UTF8.GetBytes(content);
    req.ContentLength = data.Length;
    using (Stream reqStream = req.GetRequestStream()){
        reqStream.Write(data, 0, data.Length);
        reqStream.Close();
    }
    #endregion

    HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
    Stream stream = resp.GetResponseStream();
    // Get response content
    using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
        result = reader.ReadToEnd();
        }
        return result;
    }

  }
}

``

Если возникает ошибка "error when serving connection ***** body size exceeds the given limit", увеличьте значение max-body-bytes в mxgate.conf.

6.4 Пример на Golang для HTTP API MatrixGate

``

package main

import (
    "bytes"
    "net/http"
)

func PostDataToServer(URL string) error {
    data := `public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
`
    resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
    if err != nil {
        return err
    }
    if resp.StatusCode != 200 {
        // Обработка тела ответа
        return nil
    }

    // Обработка тела ответа
    return nil
}

func main()  {
    err := PostDataToServer("http://127.0.0.1:8086")
    if err != nil{
        panic(err)
    }

}

``

7 Загрузка специальных типов данных с помощью MatrixGate

7.1 Пример: загрузка CSV-файлов с помощью MatrixGate

  • Создайте таблицу csvtable в базе данных demo:

``

CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);

``

  • Создайте файл данных data.csv:

``

1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303

``

  • Запустите mxgate с источником stdin, направляя данные в существующую таблицу csvtable, с уровнем параллелизма 2:

``

mxgate \
  --source stdin \
  --db-database demo \
  --db-master-host 127.0.0.1 \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format unix-second \
  --delimiter "|" \
  --target csvtable \
  --parallel 2 < data.csv

``

  • Выполните запрос к базе данных, чтобы проверить успешность загрузки:

``

demo=# SELECT * FROM csvtable ;
          time          | tagid | c1  | c2  | c3
------------------------+-------+-----+-----+-----
 2020-10-27 05:50:23+08 |     3 | 103 | 203 | 303
 2020-10-27 05:50:22+08 |     2 | 102 | 202 | 302
 2020-10-27 05:50:21+08 |     1 | 101 | 201 | 301

(3 rows)

``

7.2 Пример: загрузка полей JSON с помощью MatrixGate

7.2.1 JSON

  • Создайте таблицу:

``

create table json_test(id int, j json);

``

  • Создайте файл данных:

~/json.csv ``

1|"{""a"":10, ""b"":""xyz""}"

``

  • Загрузите данные (используя режим stdin; другие режимы аналогичны):
    Ключевой момент: используйте --format csv из-за наличия специальных символов в JSON.

--format csv ``

mxgated \
  --source stdin \
  --db-database postgres \
  --db-master-host 127.0.0.1 \
  --db-master-port 7000 \
  --db-user mxadmin \
  --time-format raw \
  --format csv \
  --delimiter "|" \
  --target json_test < ~/json.csv

``

  • Проверьте загруженные данные:

``

postgres=# select * from json_test;
 id |           j
----+-----------------------
  1 | {"a":10, "b":"xyz"}
(1 row)

``

7.2.2 Массивы JSON

  • Создайте таблицу:

``

create table json_array_test(id int, j _json);

``

  • Создайте файл данных:

~/json_array.csv ``

1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"

``

  • Загрузите данные:

``

mxgate \
  --source stdin \
  --db-database postgres \
  --db-master-host 127.0.0.1 \
  --db-master-port 7000 \
  --db-user mxadmin \
  --time-format raw \
  --format csv \
  --delimiter "|" \
  --target json_array_test < ~/json_array.csv

``

  • Проверьте результат:

``

postgres=# select * from json_array_test ;
 id |                      j
----+---------------------------------------------
  1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"}
(1 row)

``

Примечание: поскольку поля JSON содержат кавычки и другие специальные символы, параметр --format должен быть установлен в значение csv.