Подключение языков программирования к MatrixGate

В этом документе описывается, как использовать API-интерфейс для подключения к MatrixGate и обеспечения высокоскоростного ввода данных.


1 API MatrixGate

MatrixGate предоставляет HTTP API, позволяющее различным языкам программирования импортировать данные в базу данных YMatrix по протоколу HTTP. Ниже приведены формат протокола и коды ответов HTTP от MatrixGate.

Формат протокола HTTP MatrixGate

Тип протокола Формат протокола Использование и пример
URL http://\:\ Указывает URL-адрес подключения к mxgate
PATH / В настоящее время поддерживается только /; любой путь после / игнорируется
HTTP-метод POST Поддерживается только метод POST для загрузки данных
HTTP-заголовок Content-Encoding: gzip Поддерживает сжатие содержимого тела HTTP с помощью gzip
Content-Type: text/plain Поддерживается только text/plain
Тело HTTP SchemaName.TableName
Timestamp|ID|C1|C2|..|Cn
Первая строка тела указывает целевую таблицу для загрузки данных. Имя схемы (SchemaName) является необязательным и по умолчанию равно public. Имя таблицы (TableName) обязательно. Начиная со второй строки, каждая строка представляет собой строку временных рядов. Столбцы разделяются символом \|, а строки — символом \n. Первое поле каждой строки — это временная метка в формате Unix time (в секундах). Подробности см. в --time-format. Второе поле — TagID (целое число). Остальные поля соответствуют столбцам целевой таблицы. Рекомендуется, чтобы порядок столбцов в DDL целевой таблицы был следующим: (Timestamp, TagID, C1, C2, ..., Cn).

Коды ответа HTTP MatrixGate

Код ответа Значение Примечания
200 StatusOK Произошли частичные ошибки формата данных. Подробности об ошибках, включая проблемные строки и сообщения, возвращаются в теле ответа, например:
At line: 2
missing data for column "c3"
204 StatusNoContent Данные успешно загружены в MatrixGate
400 StatusBadRequest Некорректный запрос, например, неправильный формат тела POST, отсутствует целевая таблица или несоответствие формата сжатия заголовку HTTP
405 StatusMethodNotAllowed Запрос с использованием метода, отличного от POST
408 StatusTimeout Превышено время ожидания запроса
500 StatusInternalServerErrror Ошибка на стороне базы данных; загрузка данных не удалась. Подробное сообщение об ошибке возвращается в теле ответа
503 StatusServiceUnavailable MatrixGate отклонил запрос, например, превышено максимальное количество подключений или MatrixGate завершает работу


2 Примеры командной строки для HTTP API MatrixGate

Сначала создайте таблицу testtable в базе данных demo.

=# CREATE TABLE testtable (
    time TIMESTAMP WITH TIME ZONE,
    tagid INT,
    c1 INT,
    c2 INT,
    c3 INT
    )USING MARS3
     DISTRIBUTED BY (tagid)
     ORDER BY (time,tagid);

2.1 Отправка CSV-данных через HTTP

Создайте файл конфигурации mxgate.conf.

$ mxgate config --db-database testdb \
              --db-master-host localhost \
              --db-master-port 5432 \
              --db-user mxadmin \
              --db-password 123123 \
              --target public.testtable \
              --format csv \
              --time-format unix-second \
              --delimiter '|' \
              --parallel 256 \
              --stream-prepared 3 \
              --interval 250 \
              --transform plain \
              > mxgate.conf

Отредактируйте файл загрузки данных data.txt.

$ vi data.txt
public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303

Запустите mxgate с использованием созданного файла конфигурации mxgate.conf.

$ mxgate --config mxgate.conf

Отправьте HTTP-запрос для загрузки данных.

$ curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"

Подключитесь к базе данных и проверьте успешность загрузки данных.

$ psql demo
demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
 date_part  |          time          | tagid | c1  | c2  | c3
------------+------------------------+-------+-----+-----+-----
 1603777821 | 2020-10-27 13:50:21+08 |     1 | 101 | 201 | 301
 1603777822 | 2020-10-27 13:50:22+08 |     2 | 102 | 202 | 302
 1603777823 | 2020-10-27 13:50:23+08 |     3 | 103 | 203 | 303
(3 rows)

2.2 Отправка JSON-данных через HTTP

Добавьте следующие две пары ключ-значение в заголовки HTTP POST-запроса при отправке данных в mxgate:

"Batch-Type" = json
"Job-Name" = public.t1
  • "Batch-Type" = json указывает, что формат данных в теле POST-запроса — JSON.
  • Job-Name задаёт имя целевой таблицы для вставки данных в формате: schemaname.tablename.

Добавьте следующую конфигурацию преобразования в файл конфигурации mxgate:

[transform]

  ## Overall parallel level for transform, only for non-strict mode
  # parallel = 256

  ## Transform decodes input data and perform type/format conversion
  ## Types restricted to: plain/json/nil/mxmon/hanangdbc
  ## transform = "plain"
  transform = "json"
    [transform.json]
      mapping = [{
                  table-name = "public.t1",
                  field-map = [{dest="id", source="$.id", enabled=true}, {dest="content", source="$.content", enable=false}]
                }]
  • Здесь source = "$.id" — это выражение JsonPath. Подробнее см. JsonPath.

Предположим, структура целевой таблицы следующая:

                 Table "public.t1"
 Column  |  Type   | Collation | Nullable | Default
---------+---------+-----------+----------+---------
 id      | integer |           |          |
 content | text    |           |          |
Distributed by: (id)

Пример структуры JSON-данных:

{"id": 12345, "content": "this is a sample json"}

Функция преобразования JSON в mxgate извлекает значения для id и content из JSON-данных на основе конфигурации [transform.json], затем форматирует их как CSV или text для загрузки в базу данных.

Результат:

=# SELECT * FROM t1;
  id   |        content
-------+-----------------------
 12345 | this is a simple json
(1 row)

На данный момент mxgate поддерживает только однострочные JSON-строки при передаче JSON-данных по HTTP. Если объект JSON занимает несколько строк, загрузка завершится ошибкой.

Например, следующий JSON можно передать:

{"employees": [{"id": 1, "name": "John Doe", "position": "Manager", "department": "Sales"},{"id": 2,"name": "Jane Smith","position": "Engineer","department": "Engineering"}]}

А вот следующий вызовет ошибку:

{
  "employees": [
    {
      "id": 1,
      "name": "John Doe",
      "position": "Manager",
      "department": "Sales"
    },
    {
      "id": 2,
      "name": "Jane Smith",
      "position": "Engineer",
      "department": "Engineering"
    }
  ]
}


3 Подключение языков программирования к MatrixGate

3.1 Подключение Java к MatrixGate

3.1.1 SDK Java для MatrixGate

SDK (комплект разработки программного обеспечения) освобождает разработчиков от необходимости реализации нефункциональных компонентов, значительно повышая эффективность разработки и удобство использования.

  1. Добавление зависимости SDK

Вы можете добавить JAR-файл SDK одним из следующих способов:
(1). Получить напрямую из удалённого репозитория Maven
(2). Получить напрямую из удалённого репозитория Gradle
(3). Вручную скачать JAR и импортировать локально

Примечание!
Выберите только один из перечисленных способов. Рекомендуется использовать (1) или (2), чтобы получить SDK из Maven или Gradle — это более эффективно и удобно. Для способа (3) обратитесь к MatrixGate FAQ: 21. Поищите по ключевому слову JAVA SDK, так как этот способ используется редко и здесь подробно не рассматривается.

(1). Используйте Maven для автоматической загрузки SDK
Добавьте следующую зависимость в файл pom.xml вашего Java-проекта:

<dependencies>
    <dependency>
        <groupId>cn.ymatrix</groupId>
        <artifactId>mxgate-sdk-java</artifactId>
        <version>1.1.2</version>
    </dependency>
</dependencies>

(2). Используйте Gradle для добавления зависимости SDK

repositories {
    mavenCentral()
}

dependencies {
    implementation 'cn.ymatrix:mxgate-sdk-java:1.0.20'
}
  1. Запуск mxgate

YMatrix поддерживает ввод данных через gRPC и HTTP. Подробности ниже:

  • Запуск mxgate (источник gRPC)

Примечание!
Требуется версия YMatrix, поддерживающая gRPC, то есть 4.6.1 или выше.

На узле Master создайте файл конфигурации mxgate mxgate_grpc.conf и запустите mxgate.

Примечание!
Перед использованием mxgate убедитесь, что целевая таблица уже создана в базе данных. В данном примере хост Master — mdw, база данных — demo, таблица — test_table_transfer.

Пример команд:

# Create mxgate config file
[mxadmin@mdw ~]$ mxgate config \
    --source grpc \
    --db-database demo \
    --target public.test_table_transfer \
    --time-format raw \
    --grpc-port 8087 \
    --format csv \
    > mxgate_grpc.conf

# Start mxgate
[mxadmin@mdw ~]$ mxgate start --config mxgate_grpc.conf

Как показано, при записи данных в mxgate с помощью SDK установите --source равным grpc и --format равным csv. Параметр --grpc-port должен быть указан в конфигурации, чтобы SDK знал, куда отправлять данные; в данном примере используется порт 8087.

  • Запуск mxgate (источник HTTP)

Примечание!
Даже при запуске mxgate с использованием HTTP необходимо указывать порт gRPC, поскольку SDK использует gRPC для получения метаданных таблиц из mxgate — только путь загрузки данных переключается на HTTP.

На узле Master создайте файл конфигурации с именем mxgate_http.conf и запустите mxgate. В данном примере имя хоста Master — mdw:

# Create mxgate config file
[mxadmin@mdw ~]$ mxgate config \
    --source http \
    --db-database demo \
    --target public.test_table_transfer \
    --time-format raw \
    --grpc-port 8087 \
    --format csv \
    > mxgate_http.conf

# Start mxgate
[mxadmin@mdw ~]$ mxgate start --config mxgate_http.conf
  1. Отправка данных в mxgate с помощью SDK

В этом разделе описаны асинхронный и синхронный способы отправки данных. Выберите подходящий в зависимости от ваших требований.

Пример схемы таблицы:

            Partitioned table "public.test_table_transfer"
 Column |            Type             | Collation | Nullable | Default
--------+-----------------------------+-----------+----------+---------
 ts     | timestamp without time zone |           |          |
 tag    | integer                     |           | not null |
 c1     | double precision            |           |          |
 c2     | double precision            |           |          |
 c3     | double precision            |           |          |
 c4     | double precision            |           |          |
 c5     | text                        |           |          |
 c6     | text                        |           |          |
 c7     | text                        |           |          |
 c8     | text                        |           |          |
Partition key: RANGE (ts)

Сначала выполните глобальную инициализацию:

// Set log level (default: INFO)
MxLogger.loggerLevel(LoggerLevel.INFO);
// Logs are output to stdout by default. Pass false to disable.
MxLogger.enableStdout(true);
// Default log file path and name: /tmp/mxgate_sdk_java_2022-08-26_133403.log
// Users can customize the log file path and name.
MxLogger.writeToFile("/tmp/mxgate_sdk_java.log");
  • Асинхронная отправка данных с помощью SDK

Асинхронная отправка добавляет кортежи (Tuples) во внутреннюю очередь SDK, которые затем отправляются посредством асинхронных HTTP-запросов.
Начните с инициализации MxBuilder. Этот билдер следует шаблону «одиночка» и является глобально уникальным.

Примечание!
Инициализируйте MxBuilder только один раз в проекте. Конфигурацию можно задать до создания экземпляра.

MxBuilder builder = MxBuilder.newBuilder()
        .withDropAll(false) // Set to true for testing (drops data instead of sending); set to false for production
        .withCacheCapacity(100000) // Queue size for micro-batch tuple storage
        .withCacheEnqueueTimeout(2000) // Timeout (ms) for enqueueing tuples if queue is full; throws IllegalStateException on timeout
        .withConcurrency(10) // Number of threads concurrently writing to mxgate
        .withRequestTimeoutMillis(3000) // Request timeout per thread (milliseconds)
        .withMaxRetryAttempts(3) // Retry attempts per failed write request
        .withRetryWaitDurationMillis(3000) // Fixed interval between retries
        .withRequestType(RequestType.WithHTTP) // Supported types: RequestType.WithHTTP, RequestType.WithGRPC
        .withCircuitBreaker() // Enable built-in circuit breaker
        .withMinimumNumberOfCalls(1) // Minimum calls before circuit breaker activates (>=1, default 10)
        .withSlidingWindowSize(10) // Sliding window size for failure rate calculation (>=1, default 100)
        .withFailureRateThreshold(60.0f) // Failure rate threshold (%) to trigger circuit breaker
        .withSlowCallDurationThresholdMillis(1000) // Duration threshold for slow requests (ms), should be less than request timeout
        .withSlowCallRateThreshold(80.0f) // Slow call rate threshold (%) to trigger circuit breaker
        // .withRequestAsync(true)// Enable async worker threads (uncomment to use)
        // .withConcurrency(20)// Async mode typically achieves high throughput with moderate concurrency (uncomment to use)
        // New API in MxBuilder for CSV construction parallelism (MxClient Group level)
        // .withCSVConstructionParallel(100)// Available from v1.1.0 (uncomment to use)
        .build();
        // New singleton API in MxBuilder: after successful build, retrieve the global instance anywhere
        // MxBuilder.instance();// Available from v1.1.2 (uncomment to use)

    // builder.getTupleCacheSize(); // Get current number of remaining tuples in SDK cache (uncomment to use)

Метод connect класса MxBuilder принимает четыре параметра:
(1). Имя хоста (или IP) и порт, где mxgate предоставляет сервисы для загрузки данных и получения метаданных.
Для HTTP: http://localhost:8086/; для gRPC: localhost:8087.
(2). Имя схемы.
(3). Имя таблицы.
(4). Коллбэк: при успехе onSuccess возвращает экземпляр MxClient для записи данных; при ошибке onFailure возвращает сообщение об ошибке.

Пример кода (замените порт, имя схемы и таблицы по необходимости):

// Asynchronous method (available from v1.0.13)
builder.connect("http://localhost:8086/", "localhost:8087", "public", "test_table", new ConnectionListener() {
    @Override
    public void onSuccess(MxClient client) {
        sendData(client);
    }

    @Override
    public void onFailure(String failureMsg) {

    }
});

Через коллбэк onSuccess() класса ConnectionListener получите экземпляр MxClient и используйте его API для записи данных в mxgate.

// Synchronous method (available from v1.0.13)
MxClient client = builder.connect(httpHost, gRPCHost, schema, table);
/* 
 * v1.1.0 enhances MxClient extensibility with new MxBuilder APIs.
 * These allow customizing MxClient Group count, tuple cache capacity, and consumer concurrency.
 * (Available from v1.1.0)
 */
// MxClient client = builder.connectWithGroup(dataSendingHost, metadataHost, schema, table, 10);
// MxClient client = builder.connectWithGroup(dataSendingHost, metadataHost, schema, table, 1, 1000, 3000, 10);// (Available from v1.1.0, uncomment to use)

/* 
 * skip-prefixed APIs skip backend connection for metadata retrieval.
 * Such MxClient instances can only generate lightweight Tuples via generateEmptyTupleLite().
 * (Available from v1.1.0) 
 */
// MxClient client = mxBuilder.skipConnectWithGroup(dataSendingHost, metadataHost, schema, table, delimiter, 1);
// MxClient client = mxBuilder.skipConnectWithGroup(dataSendingHost, metadataHost, schema, table, delimiter, 1, 1000, 3000, 1);

Примечание!
MxClient потокобезопасен. Однако для достижения оптимальной производительности в многопоточной среде рекомендуется иметь один экземпляр MxClient на поток — получаемый путём многократного вызова connect на одном и том же MxBuilder.

private void sendData(MxClient client) {
    if (client != null) {
        /*
         * MxClient batches tuples into micro-batches for sending.
         * This sets the flush interval in milliseconds (default: 2000).
         * A batch is sent every 2 seconds, but only if data has been written.
         */
        client.withIntervalToFlushMillis(5000);

        /*
         * Sets the byte size threshold for flushing a micro-batch.
         * A flush occurs when either the byte limit is reached or the time interval expires.
         */
        client.withEnoughBytesToFlush(500);

        /*
         * Improves append performance by avoiding byte counting.
         * Use based on scenario. If withEnoughBytesToFlush meets performance needs,
         * data per flush will be more uniform.
         * withEnoughLinesToFlush takes precedence over withEnoughBytesToFlush.
         */
        client.withEnoughLinesToFlush(10000);

        /*
         * New API in MxClient to control CSV conversion subtask size.
         * E.g., 10,000 tuples in a flush, with BatchSize=2000 → 5 concurrent subtasks.
         * (Available from v1.1.0)
         */
        client.withCSVConstructionBatchSize(2000);

        /*
         * Each MxClient has a private object pool.
         * Size should be tuned based on typical flush volume.
         */
        // client.useTuplesPool(poolSize);

        /*
         * Compression support requires mxgate v4.7.6 or higher.
         */
        // client.withCompress();

        /* 
         * Base64 encoding is optional for HTTP, required for gRPC.
         */
        // client.withBase64Encode4Compress(); 

        /*
         * Register a DataPostListener to receive callbacks on batch success/failure.
         * You can track which tuples succeeded or failed.
         */
        client.registerDataPostListener(new DataPostListener() {
            @Override
            public void onSuccess(Result result) {
                System.out.println(CUSTOMER_LOG_TAG + "Send tuples success: " + result.getMsg());
                System.out.println(CUSTOMER_LOG_TAG + "Succeed lines onSuccess callback " + result.getSucceedLines());
            }

            @Override
            public void onFailure(Result result) {
                /* 
                 * result.getErrorTuplesMap() contains map of failed tuples and error messages.
                 * Key: failed tuple, Value: error reason.
                 */
                for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) {
                    l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue());
                    for (Column c : entry.getKey().getColumns()) {
                        l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue());
            }
        }
        System.out.println(result.getSuccessLines());
    }
}

Используйте API generateEmptyTuple класса MxClient, чтобы получить пустой кортеж (Tuple) для заполнения значений.

Tuple tuple1 = client.generateEmptyTuple();
Tuple tuple2 = client.generateEmptyTuple();

Заполняйте значения парами Ключ -> Значение. Ключ — это имя столбца в таблице базы данных; Значение — значение поля. Поля с NULL или значениями по умолчанию можно опустить — SDK заполнит их автоматически.

Кортежи, полученные через этот API, не хранят метаданные таблицы внутри, поэтому они легковесны. При вызове MxClient.appendTuple() пропуск проверки метаданных ускоряет операцию flush.

// Tuple tuple = client.generateEmptyTupleLite();// (Available from v1.1.0, uncomment to use)

Примечание!
При добавлении столбцов в такие легковесные кортежи вручную поддерживайте порядок addColumn() key -> value, чтобы он точно соответствовал порядку столбцов в таблице базы данных.

Например, если порядок столбцов в таблице:

  Column |            Type             | Collation | Nullable | Default
 --------+-----------------------------+-----------+----------+---------
  ts     | timestamp without time zone |           |          |
  tag    | integer                     |           | not null |
  c1     | double precision            |           |          |
  c2     | double precision            |           |          |
  c3     | double precision            |           |          |
  c4     | double precision            |           |          |
  c5     | text                        |           |          |
  c6     | text                        |           |          |
  c7     | text                        |           |          |
  c8     | text                        |           |          |

Тогда добавление столбцов должно выполняться в этом порядке, как в примере:

        tuple1.addColumn("ts", "2022-05-18 16:30:06");
        tuple1.addColumn("tag", 102020030);
        tuple1.addColumn("c1", 1.1);
        tuple1.addColumn("c2", 2.2);
        tuple1.addColumn("c3", 3.3);
        tuple1.addColumn("c4", 4.4);
        tuple1.addColumn("c5", "中文字符测试-1");
        tuple1.addColumn("c6", "lTxFCVLwcDTKbNbjau_c6");
        tuple1.addColumn("c7", "lTxFCVLwcDTKbNbjau_c7");
        tuple1.addColumn("c8", "lTxFCVLwcDTKbNbjau_c8");

        tuple2.addColumn("ts", "2022-05-18 16:30:06");
        tuple2.addColumn("tag", 102020030);
        tuple2.addColumn("c1", 1.1);
        tuple2.addColumn("c2", 2.2);
        tuple2.addColumn("c3", 3.3);
        tuple2.addColumn("c4", 4.4);
        tuple2.addColumn("c5", "中文字符测试-2");
        tuple2.addColumn("c6", "lTxFCVLwcDTKbNbjau_c26");
        tuple2.addColumn("c7", "lTxFCVLwcDTKbNbjau_c27");
        tuple2.addColumn("c8", "lTxFCVLwcDTKbNbjau_c28");

Наконец, добавьте заполненные кортежи в MxClient.

client.appendTuples(tuple1, tuple2);

Примечание!
MxClient предоставляет несколько API: добавление одного кортежа, нескольких кортежей или списка кортежей (например, client.appendTuple();, client.appendTupleList();). Вы также можете вручную вызвать flush с помощью flush(), независимо от количества кортежей в очереди, например: client.flush();.

  • Синхронная отправка данных с помощью MxClient

Синхронная отправка блокирует выполнение до тех пор, пока кортеж не будет отправлен в mxgate, что позволяет выполнять последующие действия (например, фиксировать смещения Kafka).

Tuple tuple1 = client.generateEmptyTuple();
tuple1.addColumn("ts", "2022-05-18 16:30:06");
tuple1.addColumn("tag", 102020030);
tuple1.addColumn("c1", 1.1);
tuple1.addColumn("c2", 2.2);
tuple1.addColumn("c3", 3.3);
tuple1.addColumn("c4", 4.4);
tuple1.addColumn("c5", "lTxFCVLwcDTKbNbjau_c5");
tuple1.addColumn("c6", "lTxFCVLwcDTKbNbjau_c6");
tuple1.addColumn("c7", "lTxFCVLwcDTKbNbjau_c7");
tuple1.addColumn("c8", "lTxFCVLwcDTKbNbjau_c8");

/* 
 * appendTupleBlocking returns boolean:
 * true: MxClient buffer is full (ready to send)
 * false: Buffer not yet full
 */
try {
    if (client.appendTupleBlocking(tuple1)) {
        l.info("append tuples enough");
        // Manually trigger flush       
        client.flushBlocking();
    }
/* 
 * If flushBlocking throws exception, entire batch failed. Handle accordingly.
 */
} catch (AllTuplesFailException e) {
    l.error("Tuples fail and catch the exception return.", e);
    for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) {
        l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue());
        for (Column c : entry.getKey().getColumns()) {
            l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue());
  }
}                  
/*
 * If flushBlocking throws exception, some tuples failed. Handle accordingly.
 */
} catch (PartiallyTuplesFailException e) {
    for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) {
    l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue());
        for (Column c : entry.getKey().getColumns()) {
            l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue());
  }
 }
}

3.1.2 Пример использования HTTP API MatrixGate на Java

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

public class MxgateExample {
    public static void main(String[] args) throws Exception {
        MxgateExample http = new MxgateExample();
        http.sendingPostRequest();
    }

    /* 
     * HTTP Post request
     */
    private void sendingPostRequest() throws Exception {

        /* 
         * mxgate listens on port 8086 of localhost
         */
        String url = "http://localhost:8086/";
        URL obj = new URL(url);
        HttpURLConnection con = (HttpURLConnection) obj.openConnection();

        /* 
         * Setting basic post request
         */
        con.setRequestMethod("POST");
        con.setRequestProperty("Content-Type","text/plain");
        String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";

        con.setDoOutput(true);
        DataOutputStream wr = new DataOutputStream(con.getOutputStream());

        /*
         * When data contains Chinese characters, encode using postJsonData.getBytes("UTF-8")
         */
        wr.write(postJsonData.toString().getBytes("UTF-8"));
        wr.flush();
        wr.close();

        int responseCode = con.getResponseCode();
        System.out.println("Sending 'POST' request to URL : " + url);
        System.out.println("Post Data : " + postJsonData);
        System.out.println("Response Code : " + responseCode);

        BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
        String output;
        StringBuffer response = new StringBuffer();

        while ((output = in.readLine()) != null) {
            response.append(output);
        }
        in.close();

        System.out.println(response.toString());
    }
}


3.2 Пример использования HTTP API MatrixGate на Python

import http.client

class MxgateExample(object):
    def __init__(self):

        /*
         * mxgate listens on port 8086 of localhost
         */
        self.url = "localhost:8086"

        self.postData = "public.testtable\n/" \
                        "1603777821|1|101|201|301\n/" \
                        "1603777822|2|102|202|302\n/" \
                        "1603777823|3|103|203|303"
        self.headers = {"Content-Type": "text/plain"}

    /* 
     * HTTP Post request
     */
    def sending_post_request(self):

        conn = http.client.HTTPConnection(self.url)
        conn.request("POST", "/", self.postData, self.headers)

        response = conn.getresponse()
        response_code = response.getcode()
        print(f"Sending 'POST' request to URL : {self.url}")
        print(f"Post Data : {self.postData}")
        print(f"Response Code : {response_code}")

        output = response.read()
        print(output)

if __name__ == '__main__':
    gate_post = MxgateExample()
    gate_post.sending_post_request()


3.3 Пример использования HTTP API MatrixGate на C

Рекомендуется использовать среду разработки C# Core.

using System;
using System.IO;
using System.Net;
using System.Text;

namespace HttpPostTest
{
class Program
    {
        static void Main(string[] args)
        {
            var url = "http://10.13.2.177:8086/";
            var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";

           HttpPost(url,txt);
        }

public static string HttpPost(string url, string content){
    string result = "";
    HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url);
    req.Method = "POST";
    req.ContentType = "text/plain";

    /*
     * region Add Post parameters
     */
    byte[] data = Encoding.UTF8.GetBytes(content);
    req.ContentLength = data.Length;
    using (Stream reqStream = req.GetRequestStream()){
        reqStream.Write(data, 0, data.Length);
        reqStream.Close();
    }

    HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
    Stream stream = resp.GetResponseStream();

    /* 
     * Get response content
     */
    using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
        result = reader.ReadToEnd();
        }
        return result;
    }
  }
}

Если вы столкнулись с ошибкой error when serving connection ***** body size exceeds the given limit, увеличьте значение max-body-bytes в разделе mxgate.conf.

3.4 Пример использования HTTP API MatrixGate на Golang

package main

import (
    "bytes"
    "net/http"
)

func PostDataToServer(URL string) error {
    data := `public.testtable
            1603777821|1|101|201|301
            1603777822|2|102|202|302
            1603777823|3|103|203|303
            `
    resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
    if err != nil {
        return err
    }
    if resp.StatusCode != 200 {
        /* 
         * Обработка тела ответа.
         */
        return nil
    }

    /*
     * Обработка тела ответа.
     */
    return nil
}

func main()  {
    err := PostDataToServer("http://127.0.0.1:8086")
    if err != nil{
        panic(err)
    }

}

Примечание!
Дополнительные возможности MatrixGate см. в Руководстве по инструментам — mxgate.