Доступ к MatrixGate с помощью языков программирования

В этом документе описывается, как подключаться к MatrixGate с использованием API-интерфейсов для достижения высокоскоростного ввода данных.

1 API MatrixGate

MatrixGate предоставляет HTTP API, поддерживающий различные языки программирования для импорта данных в базу данных YMatrix через HTTP-интерфейсы. Ниже приведены формат протокола и коды ответов HTTP MatrixGate.

Формат протокола HTTP MatrixGate

Тип протокола Формат протокола Использование и пример
URL http://\:\ Указывает URL-адрес подключения mxgate
PATH / В настоящее время поддерживается только /; любой путь после / игнорируется
HTTP-метод POST В настоящее время поддерживается загрузка данных только методом POST
HTTP-заголовок Content-Encoding: gzip В настоящее время поддерживается сжатие gzip содержимого тела HTTP
Content-Type: text/plain В настоящее время поддерживается тип text/plain
Тело HTTP SchemaName.TableName
Timestamp|ID|C1|C2|..|Cn
Первая строка формата тела указывает целевую таблицу для загрузки данных. Имя схемы можно опустить (по умолчанию Public), имя таблицы обязательно. Начиная со второй строки — строки данных временных рядов, каждая строка соответствует строке в целевой таблице. Столбцы разделяются символом |, строки — символом \n. Первое поле каждой строки — временная метка в формате UNIX (в секундах), как указано в --time-format. Второе поле — TagID, целое число. Поля с третьего до последнего соответствуют столбцам целевой таблицы. Рекомендуется, чтобы определение DDL целевой таблицы также следовало порядку столбцов (Timestamp, TagID, C1, C2, …, Cn)

Коды ответов HTTP MatrixGate

Код ответа Значение Примечания
200 StatusOK Частичные ошибки формата данных. В теле ответа будут указаны строки с ошибками и сообщениями об ошибках, например:
At line: 2
missing data for column "c3"
204 StatusNoContent Данные успешно загружены в MatrixGate
400 StatusBadRequest Ошибка запроса данных, например, ошибка формата тела POST, таблица не найдена, несоответствие формата сжатия данных и заголовка HTTP-запроса и т.д.
405 StatusMethodNotAllowed Запрос с использованием метода, отличного от POST
408 StatusTimeout Таймаут запроса
500 StatusIntervalServerError Ошибка на стороне базы данных, загрузка данных не удалась. В теле ответа содержится подробная информация об ошибке
503 StatusServiceUnavailable MatrixGate отклоняет запрос, например, превышено максимальное количество подключений или MatrixGate завершает работу и т.д.

2 Пример командной строки HTTP API MatrixGate

Сначала создайте таблицу testtable в базе данных demo.

=# CREATE TABLE testtable (
    time TIMESTAMP WITH TIME ZONE,
    tagid INT,
    c1 INT,
    c2 INT,
    c3 INT
    )USING MARS3
     DISTRIBUTED BY (tagid)
     ORDER BY (time,tagid);

2.1 Отправка CSV-данных по HTTP

Создайте файл конфигурации mxgate.conf.

$ mxgate config --db-database testdb \
              --db-master-host localhost \
              --db-master-port 5432 \
              --db-user mxadmin \
              --db-password 123123 \
              --target public.testtable \
              --format csv \
              --time-format unix-second \
              --delimiter '|' \
              --parallel 256 \
              --stream-prepared 3 \
              --interval 250 \
              --transform plain \
              > mxgate.conf

Отредактируйте файл загрузки данных data.txt.

$ vi data.txt
public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303

Запустите mxgate с использованием созданного файла конфигурации mxgate.conf.

$ mxgate --config mxgate.conf

Отправьте HTTP-запрос для загрузки данных.

$ curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"

Подключитесь к базе данных, чтобы проверить успешность загрузки данных.

$ psql demo
demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
 date_part  |          time          | tagid | c1  | c2  | c3
------------+------------------------+-------+-----+-----+-----
 1603777821 | 2020-10-27 13:50:21+08 |     1 | 101 | 201 | 301
 1603777822 | 2020-10-27 13:50:22+08 |     2 | 102 | 202 | 302
 1603777823 | 2020-10-27 13:50:23+08 |     3 | 103 | 203 | 303
(3 rows)

2.2 Отправка JSON-данных по HTTP

Добавьте следующие две пары «ключ-значение» в заголовок каждого HTTP POST-запроса, отправляемого в mxgate:

"Batch-Type" = json
"Job-Name" = public.t1
  • "Batch-Type" = json указывает, что формат данных в теле POST-запроса — JSON.
  • Job-Name задаёт имя соответствующей таблицы для вставки данных в формате: schemaname.tablename.

Добавьте следующую конфигурацию, связанную с преобразованием, в файл конфигурации mxgate:

[transform]

  ## Overall parallel level for transform, only for non - strict mode
  # parallel = 256

  ## Transform decodes input data and performs type/format conversion
  ## Types restricted to: plain/json/nil/mxmon/hanangdbc
  ## transform = "plain"
  transform = "json"
    [transform.json]
      mapping = [{
                  table-name = "public.t1",
                  field-map = [{dest="id", source="$.id", enabled=true}, {dest="content", source="$.content", enable=false}]
                }]
  • Здесь source = "$.id" — выражение Json Path. Подробнее см. JsonPath.

Предположим, структура целевой таблицы следующая:

                 Table "public.t1"
 Column  |  Type   | Collation | Nullable | Default
---------+---------+-----------+----------+---------
 id      | integer |           |          |
 content | text    |           |          |
Distributed by: (id)

Пример структуры JSON-данных:

{"id": 12345, "content": "this is a sample json"}

Функция преобразования JSON в mxgate извлечёт значения id и content из соответствующих JSON-данных согласно конфигурации [transform.json], а затем объединит их в формат CSV или text для загрузки в базу данных.

Результат будет следующим:

=# SELECT * FROM t1;
  id   |        content
-------+-----------------------
 12345 | this is a simple json
(1 row)

В настоящее время при записи JSON по HTTP mxgate поддерживает только формат, в котором каждая строка — это полная JSON-строка. Если внутри строки есть перенос, операция записи завершится ошибкой.

Например, следующие JSON-данные могут быть переданы:

{"employees": [{"id": 1, "name": "John Doe", "position": "Manager", "department": "Sales"},{"id": 2,"name": "Jane Smith","position": "Engineer","department": "Engineering"}]}

Следующий вариант завершится ошибкой:

{
  "employees": [
    {
      "id": 1,
      "name": "John Doe",
      "position": "Manager",
      "department": "Sales"
    },
    {
      "id": 2,
      "name": "Jane Smith",
      "position": "Engineer",
      "department": "Engineering"
    }
  ]
}

3 Интеграция языков программирования с MatrixGate

3.1 Интеграция Java с MatrixGate

3.1.1 SDK Java для MatrixGate

SDK (комплект разработки программного обеспечения) — это набор инструментов, позволяющих разработчикам сосредоточиться на бизнес-логике, значительно повышая эффективность и удобство разработки.

  1. Подключение зависимости SDK

Вы можете подключить JAR-пакет SDK следующими способами:

  • (1) Непосредственно из удалённого репозитория Maven
  • (2) Непосредственно из удалённого репозитория Gradle
  • (3) Вручную скачать JAR-файл и импортировать его локально

Примечание!
Достаточно выбрать один из вышеперечисленных способов. Рекомендуется использовать (1) или (2), чтобы подключить SDK непосредственно из удалённого репозитория Maven или Gradle — это эффективно и удобно. Для способа (3) — локального импорта — см. MatrixGate FAQ: 21. Он используется редко, поэтому здесь подробно не рассматривается.

(1) Автоматическое получение пакета SDK из удалённого репозитория Maven

Настройте следующую зависимость в файле pom.xml вашего проекта:

<dependencies>
    <dependency>
        <groupId>cn.ymatrix</groupId>
        <artifactId>mxgate-sdk-java</artifactId>
        <version>1.1.2</version>
    </dependency>
</dependencies>

(2) Подключение зависимости SDK через удалённый репозиторий Gradle

repositories {
    mavenCentral()
}

dependencies {
    implementation 'cn.ymatrix:mxgate-sdk-java:1.0.20'
}
  1. Запуск mxgate

YMatrix поддерживает запись данных через gRPC и HTTP. Подробности ниже:

  • Запуск mxgate (источник gRPC)

Примечание!
Требуется версия YMatrix, поддерживающая gRPC, то есть версия 4.6.1 и выше.

Создайте файл конфигурации mxgate_grpc.conf на узле Master и запустите mxgate.

Примечание!
Перед использованием mxgate необходимо сначала создать таблицу в базе данных. В примере хост Master — mdw, база данных — demo, таблица данных — test_table_transfer.

Код следующий:

# Create mxgate config file
[mxadmin@mdw ~]$ mxgate config \
    --source grpc \
    --db-database demo \
    --target public.test_table_transfer \
    --time-format raw \
    --grpc-port 8087 \
    --format csv \
    > mxgate_grpc.conf

# Start mxgate
[mxadmin@mdw ~]$ mxgate start --config mxgate_grpc.conf

Как показано в примере, при использовании SDK для записи данных в mxgate необходимо указать параметр source mxgate как grpc, а format — как csv. Поскольку SDK должен знать, куда записывать данные, нужно также указать номер порта gRPC в файле конфигурации, в примере — 8087.

  • Запуск mxgate (источник HTTP)

Примечание!
Даже при запуске mxgate через HTTP всё равно необходимо указать порт gRPC mxgate. Это связано с тем, что SDK использует gRPC для получения метаданных таблиц базы данных из mxgate, хотя метод записи данных переключается на HTTP.

Создайте файл конфигурации mxgate_http.conf на узле Master и запустите mxgate. В примере имя хоста Master — mdw. Код следующий:

# Create mxgate config file
[mxadmin@mdw ~]$ mxgate config \
    --source http \
    --db-database demo \
    --target public.test_table_transfer \
    --time-format raw \
    --grpc-port 8087 \
    --format csv \
    > mxgate_http.conf

# Start mxgate
[mxadmin@mdw ~]$ mxgate start --config mxgate_http.conf
  1. Отправка данных в mxgate с помощью SDK

В этом разделе рассматриваются асинхронный и синхронный способы отправки. Выберите подходящий по необходимости.

Структура примера таблицы:

            Partitioned table "public.test_table_transfer"
 Column |            Type             | Collation | Nullable | Default
--------+-----------------------------+-----------+----------+---------
 ts     | timestamp without time zone |           |          |
 tag    | integer                     |           | not null |
 c1     | double precision            |           |          |
 c2     | double precision            |           |          |
 c3     | double precision            |           |          |
 c4     | double precision            |           |          |
 c5     | text                        |           |          |
 c6     | text                        |           |          |
 c7     | text                        |           |          |
 c8     | text                        |           |          |
Partition key: RANGE (ts)

Сначала выполните глобальные настройки инициализации:

// Set log level, default is INFO.
MxLogger.loggerLevel(LoggerLevel.INFO);
// Logs will be output to stdout by default. If you do not want to output, you can pass false in the following API.
MxLogger.enableStdout(true);
// The default log file path and naming format of the SDK is /tmp/mxgate_sdk_java_2022-08-26_133403.log,
// Users can customize the output log file path and file name.
MxLogger.writeToFile("/tmp/mxgate_sdk_java.log");
  • Асинхронная отправка данных с помощью SDK

Асинхронная отправка добавляет кортеж (Tuple) во внутреннюю очередь SDK, а затем отправляет данные асинхронными HTTP-запросами. Сначала необходимо запустить MxBuilder, который является синглтоном и глобально уникальным.

Примечание!
В одном проекте достаточно создать его один раз. Вы можете выполнить соответствующие настройки до инициализации MxBuilder.

MxBuilder builder = MxBuilder.newBuilder()
        .withDropAll(false) // If used for testing, set to true to drop data instead of sending to mxgate; set to false to send data to mxgate
        .withCacheCapacity(100000) // Size of the queue for temporarily storing tuples in batches
        .withCacheEnqueueTimeout(2000) // Timeout for tuples to be enqueued if the queue is full. Throws IllegalStateException if exceeded
        .withConcurrency(10) // Number of threads writing data to mxgate concurrently
        .withRequestTimeoutMillis(3000) // Timeout for each data write request (in milliseconds)
        .withMaxRetryAttempts(3) // Number of retries for each write request if an issue occurs
        .withRetryWaitDurationMillis(3000) // Interval between retries (currently fixed)
        .withRequestType(RequestType.WithHTTP) // SDK supports sending data to mxgate via HTTP and gRPC. Corresponding configurations are: RequestType.WithHTTP, RequestType.WithGRPC
        .withCircuitBreaker() // Use the built - in circuit breaker. If the failure rate or slow request rate reaches the threshold, it will be activated. After activation, data sending to mxgate will be paused for 30 seconds, and tuples cannot be appended
        .withMinimumNumberOfCalls(1) // Minimum number of calls for the circuit breaker to take effect (must be >= 1, default is 10)
        .withSlidingWindowSize(10) // Size of the sliding window for calculating the failure rate (must be >= 1, default is 100)
        .withFailureRateThreshold(60.0f) // Failure rate threshold (must be >0 and <= 100). If the failure rate reaches this threshold, the circuit breaker will be activated
        .withSlowCallDurationThresholdMillis(1000) // Threshold for slow request duration (milliseconds). Requests exceeding this duration are considered slow (note that this duration should be less than the request timeout)
        .withSlowCallRateThreshold(80.0f) // Slow request rate threshold. If the slow request rate reaches this threshold, the circuit breaker will be activated
        // .withRequestAsync(true) // Uncomment this line to enable asynchronous mode for sending data to mxgate (optional)
        // .withConcurrency(20) // Typically, asynchronous mode only requires tens of concurrency to achieve the same or even higher throughput as synchronous mode (optional)
        // MxBuilder's builder pattern has added the following API to adjust the concurrency of CSV parallel transformation (at the MxClient Group level)
        // .withCSVConstructionParallel(100) // (Supported from v1.1.0, uncomment this line to use)
        .build();
        // MxBuilder has added a singleton API. After MxBuilder is successfully built, you can obtain the globally unique singleton instance of MxBuilder at any location using the following API (supported from v1.1.2)
        // MxBuilder.instance(); // (Supported from v1.1.2, uncomment this line to use)
        // builder.getTupleCacheSize(); // Uncomment this line to get the number of remaining Tuples in the SDK's internal Tuple cache in real - time (optional)

Метод connect Builder принимает четыре параметра: (1) Имя хоста (IP-адрес) и номер порта процесса mxgate, предоставляющего сервис для приёма данных и метаданных таблиц базы данных. Например, для HTTP — http://localhost:8086/; для gRPC — localhost:8087. (2) Схема. (3) Имя таблицы. (4) Обратный вызов (Callback). Если Builder успешно подключился к mxgate, будет вызван метод onSuccess, возвращающий экземпляр MxClient для записи данных. Если подключение не удалось, будет вызван метод onFailure, а failureMsg объяснит причину сбоя.

Конкретный код приведён ниже. Не забудьте заменить номер порта, имя схемы и имя таблицы на фактические значения.

// Asynchronous method (supported from v1.0.13)
builder.connect("http://localhost:8086/", "localhost:8087", "public", "test_table", new ConnectionListener() {
    @Override
    public void onSuccess(MxClient client) {
        sendData(client);
    }

    @Override
    public void onFailure(String failureMsg) {
        // Handle failure
    }
});

Через функцию обратного вызова ConnectionListener метода onSuccess можно получить экземпляр MxClient. После этого можно использовать API MxClient для записи данных в mxgate.

// Synchronous method (supported from v1.0.13)
MxClient client = builder.connect(httpHost, gRPCHost, schema, table);
/* 
 * v1.1.0 has enhanced the scalability of MxClient. MxBuilder has added several new APIs.
 * The MxClient obtained through these APIs belongs to a specific MxClient Group.
 * You can freely define parameters such as the MxClient Group Number, Tuples cache capacity, and concurrency of TuplesConsumer using these APIs.
 * (Supported from v1.1.0)
 */
// MxClient client = builder.connectWithGroup(dataSendingHost, metadataHost, schema, table, 10);
// MxClient client = builder.connectWithGroup(dataSendingHost, metadataHost, schema, table, 1, 1000, 3000, 10); // (Supported from v1.1.0, uncomment this line to use)

/* 
 * APIs starting with "skip" will skip the process of connecting to the backend service to obtain database table metadata.
 * Such MxClient can only obtain a lightweight Tuple through generateEmptyTupleLite().
 * (Supported from v1.1.0)
 */
// MxClient client = mxBuilder.skipConnectWithGroup(dataSendingHost, metadataHost, schema, table, delimiter, 1);
// MxClient client = mxBuilder.skipConnectWithGroup(dataSendingHost, metadataHost, schema, table, delimiter, 1, 1000, 3000, 1);

Примечание!
MxClient потокобезопасен. Однако для достижения наилучшей производительности при многопоточном использовании рекомендуется иметь отдельный MxClient в каждом потоке, то есть получать несколько экземпляров MxClient через многократные вызовы метода connect MxBuilder.

private void sendData(MxClient client) {
    if (client != null) {
        /*
         * MxClient accumulates a batch of Tuples and sends them to mxgate as a micro - batch.
         * This API sets the waiting time for each micro - batch accumulation. The default is 2000 milliseconds,
         * meaning that every 2 seconds, it will attempt to send a batch of data to mxgate. If no data is written within this 2 - second period,
         * no data will be sent.
         */
        client.withIntervalToFlushMillis(5000);

        /*
         * Sets the number of Tuple bytes to accumulate as a micro - batch for sending. Even if the flush interval time has not been reached,
         * if the accumulated micro - batch byte count reaches this threshold, it will be sent. If the flush interval time is reached but the
         * accumulated byte count has not reached the set threshold, it will still be sent.
         */
        client.withEnoughBytesToFlush(500);

        /*
         * Compared to withEnoughBytesToFlush, this API improves performance when appending Tuples because it avoids calculating byte counts.
         * Depending on the specific use case, if withEnoughBytesToFlush also meets the performance requirements, then each flush will have a more
         * uniform data volume. The priority of withEnoughLinesToFlush is higher than withEnoughBytesToFlush.
         */
        client.withEnoughLinesToFlush(10000);

        /*
         * MxClient has added the following API to adjust the number of Tuples for each CSV conversion sub - task.
         * For example, if a flush has a total of 10,000 Tuples, and the following API sets BatchSize to 2000,
         * then the CSV conversion of these 10,000 Tuples will be split into 5 sub - tasks, each handling 2000 Tuples and executed concurrently.
         * (Supported from v1.1.0)
         */
        client.withCSVConstructionBatchSize(2000);

        /*
         * Each MxClient has its own private object pool.
         * When using it, you need to set the object pool size reasonably based on the number of Tuples flushed by each MxClient each time.
         */
        // client.useTuplesPool(poolSize);

        /*
         * MxClient supports compression and needs to be used with mxgate v4.7.6 and higher versions.
         */
        // client.withCompress();

        /* 
         * For HTTP requests, base64 encoding is not required. For gRPC, base64 encoding is necessary.
         */
        // client.withBase64Encode4Compress(); 

        /*
         * MxClient can register a DataPostListener. The success and failure of each batch of data sent will be
         * called back in onSuccess and onFailure, respectively. You can understand which Tuples were successfully written and which failed.
         */
        client.registerDataPostListener(new DataPostListener() {
            @Override
            public void onSuccess(Result result) {
                System.out.println(CUSTOMER_LOG_TAG + "Send tuples success: " + result.getMsg());
                System.out.println(CUSTOMER_LOG_TAG + "Succeed lines onSuccess callback " + result.getSucceedLines());
            }

            @Override
            public void onFailure(Result result) {
                /* 
                 * result.getErrorTuplesMap() contains a map of error lines and their reasons Tuple -> String.
                 * Key is the error line, and value is the reason.
                 */
                for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) {
                    l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue());
                    for (Column c : entry.getKey().getColumns()) {
                        l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue());
                    }
                }
                System.out.println(result.getSuccessLines());
            }
        });
    }
}

Получите пустой объект Tuple для заполнения с помощью API generateEmptyTuple MxClient.

Tuple tuple1 = client.generateEmptyTuple();
Tuple tuple2 = client.generateEmptyTuple();

Заполните пустой Tuple парами «ключ-значение». Ключ — имя столбца соответствующей таблицы базы данных; значение — значение поля. Если некоторые поля могут быть null или имеют значения по умолчанию, вы можете не указывать значения для этих полей, и SDK автоматически заполнит значения по умолчанию или null.

Кортеж, полученный через этот API, больше не хранит метаданные таблицы базы данных и представляет собой более лёгкий Tuple. Поэтому при вызове MxClient.appendTuple() для добавления этого лёгкого Tuple в MxClient пропускается проверка достоверности данных на основе метаданных таблицы, что позволяет максимально быстро отправить данные.

// Tuple tuple = client.generateEmptyTupleLite(); // (Supported from v1.1.0, uncomment this line to use)

Примечание!
При добавлении столбцов в этот лёгкий Tuple необходимо вручную поддерживать порядок пар addColumn() key -> value, чтобы он точно соответствовал порядку столбцов в таблице базы данных.

Например, если порядок столбцов таблицы следующий:

  Column |            Type             | Collation | Nullable | Default
 --------+-----------------------------+-----------+----------+---------
  ts     | timestamp without time zone |           |          |
  tag    | integer                     |           | not null |
  c1     | double precision            |           |          |
  c2     | double precision            |           |          |
  c3     | double precision            |           |          |
  c4     | double precision            |           |          |
  c5     | text                        |           |          |
  c6     | text                        |           |          |
  c7     | text                        |           |          |
  c8     | text                        |           |          |

Тогда порядок добавления столбцов должен соответствовать порядку столбцов в таблице базы данных, как показано в примере ниже:

        tuple1.addColumn("ts", "2022-05-18 16:30:06");
        tuple1.addColumn("tag", 102020030);
        tuple1.addColumn("c1", 1.1);
        tuple1.addColumn("c2", 2.2);
        tuple1.addColumn("c3", 3.3);
        tuple1.addColumn("c4", 4.4);
        tuple1.addColumn("c5", "Chinese character test - 1");
        tuple1.addColumn("c6", "lTxFCVLwcDTKbNbjau_c6");
        tuple1.addColumn("c7", "lTxFCVLwcDTKbNbjau_c7");
        tuple1.addColumn("c8", "lTxFCVLwcDTKbNbjau_c8");

        tuple2.addColumn("ts", "2022-05-18 16:30:06");
        tuple2.addColumn("tag", 102020030);
        tuple2.addColumn("c1", 1.1);
        tuple2.addColumn("c2", 2.2);
        tuple2.addColumn("c3", 3.3);
        tuple2.addColumn("c4", 4.4);
        tuple2.addColumn("c5", "Chinese character test - 2");
        tuple2.addColumn("c6", "lTxFCVLwcDTKbNbjau_c26");
        tuple2.addColumn("c7", "lTxFCVLwcDTKbNbjau_c27");
        tuple2.addColumn("c8", "lTxFCVLwcDTKbNbjau_c28");

Наконец, добавьте заполненный Tuple в MxClient.

client.appendTuples(tuple1, tuple2);

Примечание!
MxClient предоставляет несколько API, позволяющих добавлять один Tuple за раз, несколько Tuple одновременно или список Tuple. Например: client.appendTuple();, client.appendTupleList();. Вы также можете вызвать метод flush() MxClient для ручной отправки данных независимо от количества записанных Tuple, например: client.flush();.

  • Синхронная отправка данных с помощью MxClient

Синхронная отправка означает синхронную отправку Tuple в mxgate для последующей обработки (например, фиксации смещений Kafka).

Tuple tuple1 = client.generateEmptyTuple();
tuple1.addColumn("ts", "2022-05-18 16:30:06");
tuple1.addColumn("tag", 102020030);
tuple1.addColumn("c1", 1.1);
tuple1.addColumn("c2", 2.2);
tuple1.addColumn("c3", 3.3);
tuple1.addColumn("c4", 4.4);
tuple1.addColumn("c5", "lTxFCVLwcDTKbNbjau_c5");
tuple1.addColumn("c6", "lTxFCVLwcDTKbNbjau_c6");
tuple1.addColumn("c7", "lTxFCVLwcDTKbNbjau_c7");
tuple1.addColumn("c8", "lTxFCVLwcDTKbNbjau_c8");

/* 
 * appendTupleBlocking returns a boolean value:
 * true: The MxClient has written the set bytes size and can send a request;
 * false: The set bytes size has not been reached yet.
 */
try {
    if (client.appendTupleBlocking(tuple1)) {
        l.info("append tuples enough");
        // Manually trigger tuples flush       
        client.flushBlocking();
    }
/* 
 * If the following flushBlocking throws an exception, it means that the entire batch of Tuples failed to write to mxgate. The caller can handle this exception accordingly.
 */
} catch (AllTuplesFailException e) {
    l.error("Tuples fail and catch the exception return.", e);
    for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) {
        l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue());
        for (Column c : entry.getKey().getColumns()) {
            l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue());
        }
    }
}                  
/*
 * If the following flushBlocking throws an exception, it means that some Tuples failed to write to mxgate. The caller can handle this exception accordingly.
 */
} catch (PartiallyTuplesFailException e) {
    for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) {
        l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue());
        for (Column c : entry.getKey().getColumns()) {
            l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue());
        }
    }
}

3.1.2 Пример использования HTTP API MatrixGate на Java

import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;

public class MxgateExample {
    public static void main(String[] args) throws Exception {
        MxgateExample http = new MxgateExample();
        http.sendingPostRequest();
    }

    /* 
     * HTTP Post request
     */
    private void sendingPostRequest() throws Exception {

        /* 
         * mxgate listens on port 8086 of localhost
         */
        String url = "http://localhost:8086/";
        URL obj = new URL(url);
        HttpURLConnection con = (HttpURLConnection) obj.openConnection();

        /* 
         * Setting basic post request
         */
        con.setRequestMethod("POST");
        con.setRequestProperty("Content-Type","text/plain");
        String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";

        con.setDoOutput(true);
        DataOutputStream wr = new DataOutputStream(con.getOutputStream());

        /*
         * When data contains Chinese characters, you can encode it using postJsonData.getBytes("UTF-8")
         */
        wr.write(postJsonData.toString().getBytes("UTF-8"));
        wr.flush();
        wr.close();

        int responseCode = con.getResponseCode();
        System.out.println("Sending 'POST' request to URL : " + url);
        System.out.println("Post Data : " + postJsonData);
        System.out.println("Response Code : " + responseCode);

        BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
        String output;
        StringBuffer response = new StringBuffer();

        while ((output = in.readLine()) != null) {
            response.append(output);
        }
        in.close();

        System.out.println(response.toString());
    }
}

3.2 Пример использования HTTP API MatrixGate на Python

import http.client

class MxgateExample(object):
    def __init__(self):

        /*
         * mxgate listens on port 8086 of localhost
         */
        self.url = "localhost:8086"

        self.postData = "public.testtable\n/" \
                        "1603777821|1|101|201|301\n/" \
                        "1603777822|2|102|202|302\n/" \
                        "1603777823|3|103|203|303"
        self.headers = {"Content-Type": "text/plain"}

    /* 
     * HTTP Post request
     */
    def sending_post_request(self):

        conn = http.client.HTTPConnection(self.url)
        conn.request("POST", "/", self.postData, self.headers)

        response = conn.getresponse()
        response_code = response.getcode()
        print(f"Sending 'POST' request to URL : {self.url}")
        print(f"Post Data : {self.postData}")
        print(f"Response Code : {response_code}")

        output = response.read()
        print(output)

if __name__ == '__main__':
    gate_post = MxgateExample()
    gate_post.sending_post_request()

3.3 Пример использования HTTP API MatrixGate на C

Рекомендуется использовать среду разработки C# Core.

using System;
using System.IO;
using System.Net;
using System.Text;

namespace HttpPostTest
{
    class Program
    {
        static void Main(string[] args)
        {
            var url = "http://10.13.2.177:8086/";
            var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";

            HttpPost(url,txt);
        }

        public static string HttpPost(string url, string content){
            string result = "";
            HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url);
            req.Method = "POST";
            req.ContentType = "text/plain";

            /*
             * region Add Post parameters
             */
            byte[] data = Encoding.UTF8.GetBytes(content);
            req.ContentLength = data.Length;
            using (Stream reqStream = req.GetRequestStream()){
                reqStream.Write(data, 0, data.Length);
                reqStream.Close();
            }

            HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
            Stream stream = resp.GetResponseStream();

            /* 
             * Get the response content
             */
            using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
                result = reader.ReadToEnd();
            }
            return result;
        }
    }
}

Если возникает ошибка "error when serving connection ***** body size exceeds the given limit", необходимо увеличить значение max-body-bytes в mxgate.conf.

3.4 Пример использования HTTP API MatrixGate на Golang

package main

import (
    "bytes"
    "net/http"
)

func PostDataToServer(URL string) error {
    data := `public.testtable
            1603777821|1|101|201|301
            1603777822|2|102|202|302
            1603777823|3|103|203|303
            `
    resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
    if err != nil {
        return err
    }
    if resp.StatusCode != 200 {
        /* 
         * Обработка тела ответа.
         */
        return nil
    }

    /*
     * Обработка тела ответа.
     */
    return nil
}

func main()  {
    err := PostDataToServer("http://127.0.0.1:8086")
    if err != nil{
        panic(err)
    }

}

Примечание!
Дополнительные возможности MatrixGate см. в Руководстве по инструментам — mxgate.