Сервер загрузки данных MatrixGate

MatrixGate, сокращённо mxgate, — это высокопроизводительный сервер потоковой загрузки данных, расположенный в каталоге bin/mxgate установки MatrixDB. В настоящее время MatrixGate поддерживает прием данных через интерфейсы HTTP и STDIN, а также форматы данных TEXT и CSV.

1 Принцип работы MatrixGate

Логика загрузки данных в MatrixGate показана ниже:

  1. Система сбора данных собирает данные с устройств или принимает данные, отправленные с устройств.
  2. Система сбора непрерывно передаёт данные в процесс службы MatrixGate (mxgate) в виде параллельных микропакетов.
  3. Процесс mxgate эффективно взаимодействует с главным процессом MatrixDB для обмена информацией о транзакциях и управления.
  4. Данные напрямую передаются на сегментные узлы и параллельно записываются с высокой скоростью.

Архитектура MatrixGate

2 Использование MatrixGate

  • Создайте конфигурационный файл mxgate, указав целевую базу данных и таблицу:

    mxgate config --db-database demo --target public.testtable --target public.testtable2 --allow-dynamic > mxgate.conf

    Эта команда создает конфигурационный файл с именем mxgate.conf. Она позволяет пользователям настраивать загрузку данных для testtable и testtable2, а также поддерживает глобальные настройки по умолчанию для загрузки в другие таблицы.

  • При необходимости измените файл mxgate.conf (например, задайте разделители полей). Этот шаг можно пропустить, если используются настройки по умолчанию. В созданный конфигурационный файл включены параметры testtable и testtable2, как показано ниже:

      [[job.target]]
        # delimiter = "|"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable"
        # null-as = ""
        table = "public.testtable"
        # time-format = "unix-second"
        # use-auto-increment = true
    
      [[job.target]]
        # delimiter = "|"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable2"
        # null-as = ""
        table = "public.testtable2"
        # time-format = "unix-second"
        # use-auto-increment = true

    Если в testtable используется "@" в качестве разделителя, а в testtable2 — "%", соответствующим образом обновите конфигурацию:

      [[job.target]]
        delimiter = "@"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable"
        # null-as = ""
        table = "public.testtable"
        # time-format = "unix-second"
        # use-auto-increment = true
    
      [[job.target]]
        delimiter = "%"
        # exclude-columns = []
        # format = "text"
        name = "job_text_to_public.testtable2"
        # null-as = ""
        table = "public.testtable2"
        # time-format = "unix-second"
        # use-auto-increment = true

    По умолчанию mxgate прослушивает порт 8086 для входящих данных. Это можно увидеть в параметре mxgate.conf внутри секции [source.http] как http-port = 8086. При необходимости вы можете изменить его на другой порт:

    [source]
    
    ## Source plugin is the data entrance to MatrixGate
    ## Types restricted to: http
    source = "http"
    
    [source.http]
    
      ## Port of http push
      # http-port = 8086
    
      ## Maximum request body size (after gzip)
      ## The server rejects requests with bodies exceeding this limit.
      # max-body-bytes = 4194304
    
      ## The maximum number of concurrent HTTP connections to the server
      ## The server response with 503 after exceed this limit.
      # max-concurrency = 40000
  • Запустите mxgate с конфигурационным файлом, подключитесь к базе данных demo и подготовьтесь к приему запросов на загрузку данных:

    mxgate start --config mxgate.conf
  • Проверьте состояние фонового сервиса:

    mxgate status
  • Остановите фоновый сервис:

    mxgate stop

    Принудительно остановите при возникновении таймаутов или других проблем:

    mxgate stop --force

3 Параметры командной строки MatrixGate

Параметр Значение по умолчанию Описание
[database]
--db-database postgres Имя базы данных MatrixDB, к которой подключается MatrixGate
--db-master-host локальное имя хоста Имя хоста главного узла MatrixDB
--db-master-port 5432 Номер порта главного узла MatrixDB
--db-user текущий пользователь ОС Имя пользователя для подключения к MatrixDB
Примечание: этот пользователь должен иметь разрешение на создание внешних таблиц. Для несуперпользователей предоставьте права с помощью:
alter user {username} CREATEEXTTABLE;
--db-password пусто Пароль для подключения к MatrixDB
--db-max-conn 10 Максимальное количество соединений от MatrixGate к MatrixDB
[job]
--allow-dynamic false При значении true включается динамическое определение целевой таблицы на основе содержимого POST-данных (первая строка). Используйте только когда имя целевой таблицы неизвестно при запуске. Для фиксированных таблиц используйте --target, чтобы явно указать имена таблиц
--delimiter | Символ, используемый для разделения столбцов в каждой строке
--error-handling accurate Как обрабатывать некорректные строки
accurate: пропускать недопустимые строки, записывать ошибки; остальные корректные строки в пакете продолжают обработку
legacy: весь пакет отклоняется при любой ошибке
--exclude-columns пусто Список имен столбцов, исключаемых при загрузке. Обязательные столбцы должны соответствовать порядку определения таблицы. Примечание: автоинкрементные столбцы, пропущенные через --use-auto-increment, здесь указывать не нужно
--format text Формат входных данных: text или csv. text быстрее, но не допускает переводов строк в текстовых полях. csv более гибкий; текстовые поля должны быть заключены в кавычки
--null-as пустая строка Строковое представление значений NULL. По умолчанию — незаключённая в кавычки пустая строка. Если столбец NOT NULL содержит это значение, загрузка завершится ошибкой. Чтобы использовать \N, экранируйте обратный слэш: --null-as \\N
--time-format unix-second Единица измерения временной метки: unix-second|unix-ms|unix-nano|raw.
MatrixGate предполагает, что первый столбец — временная метка Unix, и преобразует её в тип времени БД. Используйте raw, если временная метка не в первом столбце или уже отформатирована
--upsert-key пусто Ключ(и) для операций upsert.
Целевая таблица должна иметь UNIQUE-ограничение, и все ключи ограничения должны быть указаны
--use-auto-increment true Пропускать ли автоинкрементные столбцы во входных данных и использовать системные значения
--target schemaName.tableName Имя целевой таблицы. Схема по умолчанию — public. Поддерживаются несколько таблиц: --target table1 --target table2 .... Без этого параметра используйте --allow-dynamic для динамического определения таблицы
[misc]
--log-archive-hours 72 Журналы, не изменявшиеся дольше указанного периода (в часах), автоматически сжимаются
--log-compress true Глобальный переключатель включения сжатия журналов
--log-dir /home/mxadmin/gpAdminLogs Каталог для файлов журналов
--log-max-archive-files 0 Максимальное количество сжатых файлов журналов для хранения. Самые старые удаляются при превышении. 0 означает отсутствие удаления
--log-remove-after-days 0 Количество дней, после которых сжатые журналы удаляются. 0 означает, что удаление никогда не происходит
--log-rotate-size-mb 100 Ротация файла журнала при превышении размера (в МБ); старый файл немедленно сжимается
[source]
--source http Тип источника данных: поддерживает http, stdin, kafka, transfer
[source][http]
--http-port 8086 Порт HTTP, на который клиенты отправляют данные
--max-body-bytes 4194304 Максимальный размер (в байтах) тела одного HTTP-запроса
--max-concurrency 40000 Максимальное количество одновременных HTTP-соединений
[source][transfer]
--src-host IP-адрес главного узла исходной базы данных
--src-port Порт главного узла исходной базы данных
--src-user Имя пользователя для подключения к исходной базе данных (рекомендуется суперпользователь)
--src-password Пароль
--src-schema Имя схемы исходной таблицы
--src-table Имя исходной таблицы
--src-sql SQL-фильтр для миграции данных
--compress Метод сжатия данных с исходных сегментных хостов:
"" (пусто): без сжатия, обычный текст
gzip: требует установленного gzip на исходных сегментах
lz4: требует установленного lz4 на исходных сегментах
Рекомендация: lz4 > gzip > без сжатия
--port-base Базовый порт для передачи; диапазон начинается с 9129
--local-ip IP-адрес, доступный из исходной базы данных
[writer]
--interval 100ms Интервал (в миллисекундах) между операциями массовой вставки
--stream-prepared 10 Уровень параллелизма рабочих процессов вставки
--use-gzip auto Сжимать ли данные, отправляемые на сегменты: auto/yes/no
--timing false Включать информацию о времени выполнения в журнал для каждого оператора INSERT
Другое
--help Отображение справки и списка параметров


4 API MatrixGate

MatrixGate предоставляет HTTP API, позволяющий языкам программирования импортировать данные в MatrixDB по протоколу HTTP.

Формат протокола HTTP API MatrixGate

Элемент протокола Формат Использование и пример
URL http://mxgate-host:port Адрес для подключения к mxgate
PATH / Поддерживается только /; суффиксы пути игнорируются
HTTP-метод POST На данный момент поддерживается только POST
HTTP-заголовок Content-Encoding: gzip Необязательно: сжатие тела запроса с помощью gzip
Content-Type: text/plain Обязательно: MIME-тип
Тело HTTP SchemaName.TableName
Timestamp|ID|C1|C2|...|Cn
Первая строка указывает целевую таблицу (SchemaName необязательно, по умолчанию public). Последующие строки содержат данные временного ряда. Каждая строка соответствует одной строке в целевой таблице, столбцы разделены символом |, строки — символом \n. Первое поле — временная метка Unix в секундах (см. --time-format). Второе поле — TagID (целое число). Остальные поля соответствуют столбцам таблицы. Порядок столбцов в DDL таблицы должен соответствовать (Timestamp, TagID, C1, C2, ..., Cn)

Коды ответов HTTP API MatrixGate

Код Значение Примечания
200 StatusOK Частичный успех: некоторые строки не прошли проверку формата. Тело ответа содержит подробности об ошибках, например:
At line: 2
missing data for column "c3"
204 StatusNoContent Все данные успешно приняты MatrixGate
400 StatusBadRequest Некорректный запрос: неверное тело POST, таблица не найдена, несоответствие заголовков и содержимого и т.д.
405 StatusMethodNotAllowed Метод запроса не является POST
500 StatusInternalServerError Ошибка на стороне базы данных; загрузка не удалась. Тело ответа содержит подробное сообщение об ошибке
503 StatusServiceUnavailable MatrixGate отклонил запрос (например, достигнуто максимальное количество соединений, служба завершает работу)

5 Примеры командной строки для HTTP API MatrixGate

  • Создайте таблицу testtable в базе данных demo:

    CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
    DISTRIBUTED BY (tagid);
  • Отредактируйте файл данных data.txt:

    public.testtable
    1603777821|1|101|201|301
    1603777822|2|102|202|302
    1603777823|3|103|203|303
  • Запустите mxgate с конфигурационным файлом:

    mxgate --config mxgate.conf
  • Отправьте HTTP-запрос для загрузки данных:

    curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
  • Выполните запрос к базе данных для проверки загрузки:

    demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
    date_part  |          time          | tagid | c1  | c2  | c3
    ------------+------------------------+-------+-----+-----+-----
    1603777821 | 2020-10-27 13:50:21+08 |     1 | 101 | 201 | 301
    1603777822 | 2020-10-27 13:50:22+08 |     2 | 102 | 202 | 302
    1603777823 | 2020-10-27 13:50:23+08 |     3 | 103 | 203 | 303
    (3 rows)

6 Интеграция языков программирования с MatrixGate


6.1 Пример использования HTTP API MatrixGate на Java

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

public class MxgateExample {
    public static void main(String[] args) throws Exception {
        MxgateExample http = new MxgateExample();
        http.sendingPostRequest();
    }

    // HTTP Post request
    private void sendingPostRequest() throws Exception {
        // mxgate listens on port 8086 of localhost
        String url = "http://localhost:8086/";
        URL obj = new URL(url);
        HttpURLConnection con = (HttpURLConnection) obj.openConnection();

        // Setting basic post request
        con.setRequestMethod("POST");
        con.setRequestProperty("Content-Type","text/plain");
        String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";

        con.setDoOutput(true);
        DataOutputStream wr = new DataOutputStream(con.getOutputStream());
        // Encode Chinese characters using UTF-8
        wr.write(postJsonData.toString().getBytes("UTF-8"));
        wr.flush();
        wr.close();

        int responseCode = con.getResponseCode();
        System.out.println("Sending 'POST' request to URL : " + url);
        System.out.println("Post Data : " + postJsonData);
        System.out.println("Response Code : " + responseCode);

        BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
        String output;
        StringBuffer response = new StringBuffer();

        while ((output = in.readLine()) != null) {
            response.append(output);
        }
        in.close();

        System.out.println(response.toString());
    }
}


6.2 Пример использования HTTP API MatrixGate на Python

import http.client

class MxgateExample(object):
    def __init__(self):
        # mxgate listens on port 8086 of localhost
        self.url = "localhost:8086"

        self.postData = "public.testtable\n/" \
                        "1603777821|1|101|201|301\n/" \
                        "1603777822|2|102|202|302\n/" \
                        "1603777823|3|103|203|303"
        self.headers = {"Content-Type": "text/plain"}

    # HTTP Post request
    def sending_post_request(self):

        conn = http.client.HTTPConnection(self.url)
        conn.request("POST", "/", self.postData, self.headers)

        response = conn.getresponse()
        response_code = response.getcode()
        print(f"Sending 'POST' request to URL : {self.url}")
        print(f"Post Data : {self.postData}")
        print(f"Response Code : {response_code}")

        output = response.read()
        print(output)


if __name__ == '__main__':
    gate_post = MxgateExample()
    gate_post.sending_post_request()


6.3 Пример использования HTTP API MatrixGate на C

Рекомендуется разработка в среде C# Core

using System;
using System.IO;
using System.Net;
using System.Text;

namespace HttpPostTest
{
class Program
    {
        static void Main(string[] args)
        {
            var url = "http://10.13.2.177:8086/";
            var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";

           HttpPost(url,txt);
        }

public static string HttpPost(string url, string content){
    string result = "";
    HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url);
    req.Method = "POST";
    req.ContentType = "text/plain";

    #region 添加Post 参数
    byte[] data = Encoding.UTF8.GetBytes(content);
    req.ContentLength = data.Length;
    using (Stream reqStream = req.GetRequestStream()){
        reqStream.Write(data, 0, data.Length);
        reqStream.Close();
    }
    #endregion

    HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
    Stream stream = resp.GetResponseStream();
    //获取响应内容
    using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
        result = reader.ReadToEnd();
        }
        return result;
    }

  }
}

Если вы столкнулись с ошибкой "error when serving connection ***** body size exceeds the given limit", увеличьте значение max-body-bytes в mxgate.conf.

6.4 Пример использования HTTP API MatrixGate на Golang

package main

import (
    "bytes"
    "net/http"
)

func PostDataToServer(URL string) error {
    data := `public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
`
    resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
    if err != nil {
        return err
    }
    if resp.StatusCode != 200 {
        // Обработка тела ответа.
        return nil
    }

    // Обработка тела ответа.
    return nil
}

func main()  {
    err := PostDataToServer("http://127.0.0.1:8086")
    if err != nil{
        panic(err)
    }

}

7 Загрузка специальных типов данных с помощью MatrixGate

7.1 Пример: загрузка CSV-файлов через MatrixGate

  • Создайте таблицу csvtable в базе данных demo:

    CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
    DISTRIBUTED BY (tagid);
  • Отредактируйте файл данных data.csv:

    1603777821|1|101|201|301
    1603777822|2|102|202|302
    1603777823|3|103|203|303
  • Запустите mxgate с источником stdin, направляя данные в существующую таблицу csvtable, используя 2 параллельных потока:

    mxgate \
    --source stdin \
    --db-database demo \
    --db-master-host 127.0.0.1 \
    --db-master-port 5432 \
    --db-user mxadmin \
    --time-format unix-second \
    --delimiter "|" \
    --target csvtable \
    --parallel 2 < data.csv
  • Выполните запрос к базе данных, чтобы подтвердить успешную загрузку:

demo=# SELECT * FROM csvtable ;
          time          | tagid | c1  | c2  | c3
------------------------+-------+-----+-----+-----
 2020-10-27 05:50:23+08 |     3 | 103 | 203 | 303
 2020-10-27 05:50:22+08 |     2 | 102 | 202 | 302
 2020-10-27 05:50:21+08 |     1 | 101 | 201 | 301

(3 rows)

7.2 Примеры загрузки JSON-полей с помощью MatrixGate

7.2.1 json

  • Создайте таблицу

    create table json_test(id int, j json);
  • Создайте файл данных ~/json.csv

    1|"{""a"":10, ""b"":""xyz""}"
  • Загрузите данные В качестве примера используется режим stdin; другие режимы аналогичны. Ключевой момент — --format csv

    mxgated \
    --source stdin \
    --db-database postgres \
    --db-master-host 127.0.0.1 \
    --db-master-port 7000 \
    --db-user mxadmin \
    --time-format raw \
    --format csv \
    --delimiter "|" \
    --target json_test < ~/json.csv
  • Просмотрите загруженные данные

    postgres=# select * from json_test;
    id |           j
    ----+-----------------------
    1 | {"a":10, "b":"xyz"}
    (1 row)

7.2.2 массив json

  • Создайте таблицу

    create table json_array_test(id int, j _json);
  • Создайте файл данных ~/json_array.csv

    1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"
  • Загрузите данные

    mxgate \
    --source stdin \
    --db-database postgres \
    --db-master-host 127.0.0.1 \
    --db-master-port 7000 \
    --db-user mxadmin \
    --time-format raw \
    --format csv \
    --delimiter "|" \
    --target json_array_test < ~/json_array.csv
  • Проверьте данные

    postgres=# select * from json_array_test ;
    id |                      j
    ----+---------------------------------------------
    1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"}
    (1 row)

    Примечание: поскольку столбец JSON содержит специальные символы, такие как кавычки, параметр --format в mxgate должен быть установлен в csv.