В этом документе описывается, как использовать API-интерфейс для подключения к MatrixGate и обеспечения высокоскоростного ввода данных.
MatrixGate предоставляет HTTP API, позволяющее различным языкам программирования импортировать данные в базу данных YMatrix по протоколу HTTP. Ниже приведены формат протокола и коды ответов HTTP от MatrixGate.
Формат протокола HTTP MatrixGate
| Тип протокола | Формат протокола | Использование и пример |
|---|---|---|
| URL | http://\ |
Указывает URL-адрес подключения к mxgate |
| PATH | / | В настоящее время поддерживается только /; любой путь после / игнорируется |
| HTTP-метод | POST | Поддерживается только метод POST для загрузки данных |
| HTTP-заголовок | Content-Encoding: gzip | Поддерживает сжатие содержимого тела HTTP с помощью gzip |
| Content-Type: text/plain | Поддерживается только text/plain | |
| Тело HTTP | SchemaName.TableName Timestamp|ID|C1|C2|..|Cn |
Первая строка тела указывает целевую таблицу для загрузки данных. Имя схемы (SchemaName) является необязательным и по умолчанию равно public. Имя таблицы (TableName) обязательно. Начиная со второй строки, каждая строка представляет собой строку временных рядов. Столбцы разделяются символом \|, а строки — символом \n. Первое поле каждой строки — это временная метка в формате Unix time (в секундах). Подробности см. в --time-format. Второе поле — TagID (целое число). Остальные поля соответствуют столбцам целевой таблицы. Рекомендуется, чтобы порядок столбцов в DDL целевой таблицы был следующим: (Timestamp, TagID, C1, C2, ..., Cn). |
Коды ответа HTTP MatrixGate
| Код ответа | Значение | Примечания |
|---|---|---|
| 200 | StatusOK | Произошли частичные ошибки формата данных. Подробности об ошибках, включая проблемные строки и сообщения, возвращаются в теле ответа, например:At line: 2missing data for column "c3" |
| 204 | StatusNoContent | Данные успешно загружены в MatrixGate |
| 400 | StatusBadRequest | Некорректный запрос, например, неправильный формат тела POST, отсутствует целевая таблица или несоответствие формата сжатия заголовку HTTP |
| 405 | StatusMethodNotAllowed | Запрос с использованием метода, отличного от POST |
| 408 | StatusTimeout | Превышено время ожидания запроса |
| 500 | StatusInternalServerErrror | Ошибка на стороне базы данных; загрузка данных не удалась. Подробное сообщение об ошибке возвращается в теле ответа |
| 503 | StatusServiceUnavailable | MatrixGate отклонил запрос, например, превышено максимальное количество подключений или MatrixGate завершает работу |
Сначала создайте таблицу testtable в базе данных demo.
=# CREATE TABLE testtable (
time TIMESTAMP WITH TIME ZONE,
tagid INT,
c1 INT,
c2 INT,
c3 INT
)USING MARS3
DISTRIBUTED BY (tagid)
ORDER BY (time,tagid);
Создайте файл конфигурации mxgate.conf.
$ mxgate config --db-database testdb \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--db-password 123123 \
--target public.testtable \
--format csv \
--time-format unix-second \
--delimiter '|' \
--parallel 256 \
--stream-prepared 3 \
--interval 250 \
--transform plain \
> mxgate.conf
Отредактируйте файл загрузки данных data.txt.
$ vi data.txt
public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
Запустите mxgate с использованием созданного файла конфигурации mxgate.conf.
$ mxgate --config mxgate.conf
Отправьте HTTP-запрос для загрузки данных.
$ curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
Подключитесь к базе данных и проверьте успешность загрузки данных.
$ psql demo
demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
date_part | time | tagid | c1 | c2 | c3
------------+------------------------+-------+-----+-----+-----
1603777821 | 2020-10-27 13:50:21+08 | 1 | 101 | 201 | 301
1603777822 | 2020-10-27 13:50:22+08 | 2 | 102 | 202 | 302
1603777823 | 2020-10-27 13:50:23+08 | 3 | 103 | 203 | 303
(3 rows)
Добавьте следующие две пары ключ-значение в заголовки HTTP POST-запроса при отправке данных в mxgate:
"Batch-Type" = json
"Job-Name" = public.t1
"Batch-Type" = json указывает, что формат данных в теле POST-запроса — JSON.Job-Name задаёт имя целевой таблицы для вставки данных в формате: schemaname.tablename.Добавьте следующую конфигурацию преобразования в файл конфигурации mxgate:
[transform]
## Overall parallel level for transform, only for non-strict mode
# parallel = 256
## Transform decodes input data and perform type/format conversion
## Types restricted to: plain/json/nil/mxmon/hanangdbc
## transform = "plain"
transform = "json"
[transform.json]
mapping = [{
table-name = "public.t1",
field-map = [{dest="id", source="$.id", enabled=true}, {dest="content", source="$.content", enable=false}]
}]
source = "$.id" — это выражение JsonPath. Подробнее см. JsonPath.Предположим, структура целевой таблицы следующая:
Table "public.t1"
Column | Type | Collation | Nullable | Default
---------+---------+-----------+----------+---------
id | integer | | |
content | text | | |
Distributed by: (id)
Пример структуры JSON-данных:
{"id": 12345, "content": "this is a sample json"}
Функция преобразования JSON в mxgate извлекает значения для id и content из JSON-данных на основе конфигурации [transform.json], затем форматирует их как CSV или text для загрузки в базу данных.
Результат:
=# SELECT * FROM t1;
id | content
-------+-----------------------
12345 | this is a simple json
(1 row)
На данный момент mxgate поддерживает только однострочные JSON-строки при передаче JSON-данных по HTTP. Если объект JSON занимает несколько строк, загрузка завершится ошибкой.
Например, следующий JSON можно передать:
{"employees": [{"id": 1, "name": "John Doe", "position": "Manager", "department": "Sales"},{"id": 2,"name": "Jane Smith","position": "Engineer","department": "Engineering"}]}
А вот следующий вызовет ошибку:
{
"employees": [
{
"id": 1,
"name": "John Doe",
"position": "Manager",
"department": "Sales"
},
{
"id": 2,
"name": "Jane Smith",
"position": "Engineer",
"department": "Engineering"
}
]
}
SDK (комплект разработки программного обеспечения) освобождает разработчиков от необходимости реализации нефункциональных компонентов, значительно повышая эффективность разработки и удобство использования.
Вы можете добавить JAR-файл SDK одним из следующих способов:
(1). Получить напрямую из удалённого репозитория Maven
(2). Получить напрямую из удалённого репозитория Gradle
(3). Вручную скачать JAR и импортировать локально
Примечание!
Выберите только один из перечисленных способов. Рекомендуется использовать (1) или (2), чтобы получить SDK из Maven или Gradle — это более эффективно и удобно. Для способа (3) обратитесь к MatrixGate FAQ: 21. Поищите по ключевому словуJAVA SDK, так как этот способ используется редко и здесь подробно не рассматривается.
(1). Используйте Maven для автоматической загрузки SDK
Добавьте следующую зависимость в файл pom.xml вашего Java-проекта:
<dependencies>
<dependency>
<groupId>cn.ymatrix</groupId>
<artifactId>mxgate-sdk-java</artifactId>
<version>1.1.2</version>
</dependency>
</dependencies>
(2). Используйте Gradle для добавления зависимости SDK
repositories {
mavenCentral()
}
dependencies {
implementation 'cn.ymatrix:mxgate-sdk-java:1.0.20'
}
YMatrix поддерживает ввод данных через gRPC и HTTP. Подробности ниже:
Примечание!
Требуется версия YMatrix, поддерживающая gRPC, то есть 4.6.1 или выше.
На узле Master создайте файл конфигурации mxgate mxgate_grpc.conf и запустите mxgate.
Примечание!
Перед использованием mxgate убедитесь, что целевая таблица уже создана в базе данных. В данном примере хост Master —mdw, база данных —demo, таблица —test_table_transfer.
Пример команд:
# Create mxgate config file
[mxadmin@mdw ~]$ mxgate config \
--source grpc \
--db-database demo \
--target public.test_table_transfer \
--time-format raw \
--grpc-port 8087 \
--format csv \
> mxgate_grpc.conf
# Start mxgate
[mxadmin@mdw ~]$ mxgate start --config mxgate_grpc.conf
Как показано, при записи данных в mxgate с помощью SDK установите --source равным grpc и --format равным csv. Параметр --grpc-port должен быть указан в конфигурации, чтобы SDK знал, куда отправлять данные; в данном примере используется порт 8087.
Примечание!
Даже при запуске mxgate с использованием HTTP необходимо указывать порт gRPC, поскольку SDK использует gRPC для получения метаданных таблиц из mxgate — только путь загрузки данных переключается на HTTP.
На узле Master создайте файл конфигурации с именем mxgate_http.conf и запустите mxgate. В данном примере имя хоста Master — mdw:
# Create mxgate config file
[mxadmin@mdw ~]$ mxgate config \
--source http \
--db-database demo \
--target public.test_table_transfer \
--time-format raw \
--grpc-port 8087 \
--format csv \
> mxgate_http.conf
# Start mxgate
[mxadmin@mdw ~]$ mxgate start --config mxgate_http.conf
В этом разделе описаны асинхронный и синхронный способы отправки данных. Выберите подходящий в зависимости от ваших требований.
Пример схемы таблицы:
Partitioned table "public.test_table_transfer"
Column | Type | Collation | Nullable | Default
--------+-----------------------------+-----------+----------+---------
ts | timestamp without time zone | | |
tag | integer | | not null |
c1 | double precision | | |
c2 | double precision | | |
c3 | double precision | | |
c4 | double precision | | |
c5 | text | | |
c6 | text | | |
c7 | text | | |
c8 | text | | |
Partition key: RANGE (ts)
Сначала выполните глобальную инициализацию:
// Set log level (default: INFO)
MxLogger.loggerLevel(LoggerLevel.INFO);
// Logs are output to stdout by default. Pass false to disable.
MxLogger.enableStdout(true);
// Default log file path and name: /tmp/mxgate_sdk_java_2022-08-26_133403.log
// Users can customize the log file path and name.
MxLogger.writeToFile("/tmp/mxgate_sdk_java.log");
Асинхронная отправка добавляет кортежи (Tuples) во внутреннюю очередь SDK, которые затем отправляются посредством асинхронных HTTP-запросов.
Начните с инициализации MxBuilder. Этот билдер следует шаблону «одиночка» и является глобально уникальным.
Примечание!
ИнициализируйтеMxBuilderтолько один раз в проекте. Конфигурацию можно задать до создания экземпляра.
MxBuilder builder = MxBuilder.newBuilder()
.withDropAll(false) // Set to true for testing (drops data instead of sending); set to false for production
.withCacheCapacity(100000) // Queue size for micro-batch tuple storage
.withCacheEnqueueTimeout(2000) // Timeout (ms) for enqueueing tuples if queue is full; throws IllegalStateException on timeout
.withConcurrency(10) // Number of threads concurrently writing to mxgate
.withRequestTimeoutMillis(3000) // Request timeout per thread (milliseconds)
.withMaxRetryAttempts(3) // Retry attempts per failed write request
.withRetryWaitDurationMillis(3000) // Fixed interval between retries
.withRequestType(RequestType.WithHTTP) // Supported types: RequestType.WithHTTP, RequestType.WithGRPC
.withCircuitBreaker() // Enable built-in circuit breaker
.withMinimumNumberOfCalls(1) // Minimum calls before circuit breaker activates (>=1, default 10)
.withSlidingWindowSize(10) // Sliding window size for failure rate calculation (>=1, default 100)
.withFailureRateThreshold(60.0f) // Failure rate threshold (%) to trigger circuit breaker
.withSlowCallDurationThresholdMillis(1000) // Duration threshold for slow requests (ms), should be less than request timeout
.withSlowCallRateThreshold(80.0f) // Slow call rate threshold (%) to trigger circuit breaker
// .withRequestAsync(true)// Enable async worker threads (uncomment to use)
// .withConcurrency(20)// Async mode typically achieves high throughput with moderate concurrency (uncomment to use)
// New API in MxBuilder for CSV construction parallelism (MxClient Group level)
// .withCSVConstructionParallel(100)// Available from v1.1.0 (uncomment to use)
.build();
// New singleton API in MxBuilder: after successful build, retrieve the global instance anywhere
// MxBuilder.instance();// Available from v1.1.2 (uncomment to use)
// builder.getTupleCacheSize(); // Get current number of remaining tuples in SDK cache (uncomment to use)
Метод connect класса MxBuilder принимает четыре параметра:
(1). Имя хоста (или IP) и порт, где mxgate предоставляет сервисы для загрузки данных и получения метаданных.
Для HTTP: http://localhost:8086/; для gRPC: localhost:8087.
(2). Имя схемы.
(3). Имя таблицы.
(4). Коллбэк: при успехе onSuccess возвращает экземпляр MxClient для записи данных; при ошибке onFailure возвращает сообщение об ошибке.
Пример кода (замените порт, имя схемы и таблицы по необходимости):
// Asynchronous method (available from v1.0.13)
builder.connect("http://localhost:8086/", "localhost:8087", "public", "test_table", new ConnectionListener() {
@Override
public void onSuccess(MxClient client) {
sendData(client);
}
@Override
public void onFailure(String failureMsg) {
}
});
Через коллбэк onSuccess() класса ConnectionListener получите экземпляр MxClient и используйте его API для записи данных в mxgate.
// Synchronous method (available from v1.0.13)
MxClient client = builder.connect(httpHost, gRPCHost, schema, table);
/*
* v1.1.0 enhances MxClient extensibility with new MxBuilder APIs.
* These allow customizing MxClient Group count, tuple cache capacity, and consumer concurrency.
* (Available from v1.1.0)
*/
// MxClient client = builder.connectWithGroup(dataSendingHost, metadataHost, schema, table, 10);
// MxClient client = builder.connectWithGroup(dataSendingHost, metadataHost, schema, table, 1, 1000, 3000, 10);// (Available from v1.1.0, uncomment to use)
/*
* skip-prefixed APIs skip backend connection for metadata retrieval.
* Such MxClient instances can only generate lightweight Tuples via generateEmptyTupleLite().
* (Available from v1.1.0)
*/
// MxClient client = mxBuilder.skipConnectWithGroup(dataSendingHost, metadataHost, schema, table, delimiter, 1);
// MxClient client = mxBuilder.skipConnectWithGroup(dataSendingHost, metadataHost, schema, table, delimiter, 1, 1000, 3000, 1);
Примечание!
MxClientпотокобезопасен. Однако для достижения оптимальной производительности в многопоточной среде рекомендуется иметь один экземплярMxClientна поток — получаемый путём многократного вызоваconnectна одном и том жеMxBuilder.
private void sendData(MxClient client) {
if (client != null) {
/*
* MxClient batches tuples into micro-batches for sending.
* This sets the flush interval in milliseconds (default: 2000).
* A batch is sent every 2 seconds, but only if data has been written.
*/
client.withIntervalToFlushMillis(5000);
/*
* Sets the byte size threshold for flushing a micro-batch.
* A flush occurs when either the byte limit is reached or the time interval expires.
*/
client.withEnoughBytesToFlush(500);
/*
* Improves append performance by avoiding byte counting.
* Use based on scenario. If withEnoughBytesToFlush meets performance needs,
* data per flush will be more uniform.
* withEnoughLinesToFlush takes precedence over withEnoughBytesToFlush.
*/
client.withEnoughLinesToFlush(10000);
/*
* New API in MxClient to control CSV conversion subtask size.
* E.g., 10,000 tuples in a flush, with BatchSize=2000 → 5 concurrent subtasks.
* (Available from v1.1.0)
*/
client.withCSVConstructionBatchSize(2000);
/*
* Each MxClient has a private object pool.
* Size should be tuned based on typical flush volume.
*/
// client.useTuplesPool(poolSize);
/*
* Compression support requires mxgate v4.7.6 or higher.
*/
// client.withCompress();
/*
* Base64 encoding is optional for HTTP, required for gRPC.
*/
// client.withBase64Encode4Compress();
/*
* Register a DataPostListener to receive callbacks on batch success/failure.
* You can track which tuples succeeded or failed.
*/
client.registerDataPostListener(new DataPostListener() {
@Override
public void onSuccess(Result result) {
System.out.println(CUSTOMER_LOG_TAG + "Send tuples success: " + result.getMsg());
System.out.println(CUSTOMER_LOG_TAG + "Succeed lines onSuccess callback " + result.getSucceedLines());
}
@Override
public void onFailure(Result result) {
/*
* result.getErrorTuplesMap() contains map of failed tuples and error messages.
* Key: failed tuple, Value: error reason.
*/
for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) {
l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue());
for (Column c : entry.getKey().getColumns()) {
l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue());
}
}
System.out.println(result.getSuccessLines());
}
}
Используйте API generateEmptyTuple класса MxClient, чтобы получить пустой кортеж (Tuple) для заполнения значений.
Tuple tuple1 = client.generateEmptyTuple();
Tuple tuple2 = client.generateEmptyTuple();
Заполняйте значения парами Ключ -> Значение. Ключ — это имя столбца в таблице базы данных; Значение — значение поля. Поля с NULL или значениями по умолчанию можно опустить — SDK заполнит их автоматически.
Кортежи, полученные через этот API, не хранят метаданные таблицы внутри, поэтому они легковесны. При вызове MxClient.appendTuple() пропуск проверки метаданных ускоряет операцию flush.
// Tuple tuple = client.generateEmptyTupleLite();// (Available from v1.1.0, uncomment to use)
Примечание!
При добавлении столбцов в такие легковесные кортежи вручную поддерживайте порядокaddColumn() key -> value, чтобы он точно соответствовал порядку столбцов в таблице базы данных.
Например, если порядок столбцов в таблице:
Column | Type | Collation | Nullable | Default
--------+-----------------------------+-----------+----------+---------
ts | timestamp without time zone | | |
tag | integer | | not null |
c1 | double precision | | |
c2 | double precision | | |
c3 | double precision | | |
c4 | double precision | | |
c5 | text | | |
c6 | text | | |
c7 | text | | |
c8 | text | | |
Тогда добавление столбцов должно выполняться в этом порядке, как в примере:
tuple1.addColumn("ts", "2022-05-18 16:30:06");
tuple1.addColumn("tag", 102020030);
tuple1.addColumn("c1", 1.1);
tuple1.addColumn("c2", 2.2);
tuple1.addColumn("c3", 3.3);
tuple1.addColumn("c4", 4.4);
tuple1.addColumn("c5", "中文字符测试-1");
tuple1.addColumn("c6", "lTxFCVLwcDTKbNbjau_c6");
tuple1.addColumn("c7", "lTxFCVLwcDTKbNbjau_c7");
tuple1.addColumn("c8", "lTxFCVLwcDTKbNbjau_c8");
tuple2.addColumn("ts", "2022-05-18 16:30:06");
tuple2.addColumn("tag", 102020030);
tuple2.addColumn("c1", 1.1);
tuple2.addColumn("c2", 2.2);
tuple2.addColumn("c3", 3.3);
tuple2.addColumn("c4", 4.4);
tuple2.addColumn("c5", "中文字符测试-2");
tuple2.addColumn("c6", "lTxFCVLwcDTKbNbjau_c26");
tuple2.addColumn("c7", "lTxFCVLwcDTKbNbjau_c27");
tuple2.addColumn("c8", "lTxFCVLwcDTKbNbjau_c28");
Наконец, добавьте заполненные кортежи в MxClient.
client.appendTuples(tuple1, tuple2);
Примечание!
MxClientпредоставляет несколько API: добавление одного кортежа, нескольких кортежей или списка кортежей (например,client.appendTuple();,client.appendTupleList();). Вы также можете вручную вызвать flush с помощьюflush(), независимо от количества кортежей в очереди, например:client.flush();.
Синхронная отправка блокирует выполнение до тех пор, пока кортеж не будет отправлен в mxgate, что позволяет выполнять последующие действия (например, фиксировать смещения Kafka).
Tuple tuple1 = client.generateEmptyTuple();
tuple1.addColumn("ts", "2022-05-18 16:30:06");
tuple1.addColumn("tag", 102020030);
tuple1.addColumn("c1", 1.1);
tuple1.addColumn("c2", 2.2);
tuple1.addColumn("c3", 3.3);
tuple1.addColumn("c4", 4.4);
tuple1.addColumn("c5", "lTxFCVLwcDTKbNbjau_c5");
tuple1.addColumn("c6", "lTxFCVLwcDTKbNbjau_c6");
tuple1.addColumn("c7", "lTxFCVLwcDTKbNbjau_c7");
tuple1.addColumn("c8", "lTxFCVLwcDTKbNbjau_c8");
/*
* appendTupleBlocking returns boolean:
* true: MxClient buffer is full (ready to send)
* false: Buffer not yet full
*/
try {
if (client.appendTupleBlocking(tuple1)) {
l.info("append tuples enough");
// Manually trigger flush
client.flushBlocking();
}
/*
* If flushBlocking throws exception, entire batch failed. Handle accordingly.
*/
} catch (AllTuplesFailException e) {
l.error("Tuples fail and catch the exception return.", e);
for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) {
l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue());
for (Column c : entry.getKey().getColumns()) {
l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue());
}
}
/*
* If flushBlocking throws exception, some tuples failed. Handle accordingly.
*/
} catch (PartiallyTuplesFailException e) {
for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) {
l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue());
for (Column c : entry.getKey().getColumns()) {
l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue());
}
}
}
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
public class MxgateExample {
public static void main(String[] args) throws Exception {
MxgateExample http = new MxgateExample();
http.sendingPostRequest();
}
/*
* HTTP Post request
*/
private void sendingPostRequest() throws Exception {
/*
* mxgate listens on port 8086 of localhost
*/
String url = "http://localhost:8086/";
URL obj = new URL(url);
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
/*
* Setting basic post request
*/
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type","text/plain");
String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";
con.setDoOutput(true);
DataOutputStream wr = new DataOutputStream(con.getOutputStream());
/*
* When data contains Chinese characters, encode using postJsonData.getBytes("UTF-8")
*/
wr.write(postJsonData.toString().getBytes("UTF-8"));
wr.flush();
wr.close();
int responseCode = con.getResponseCode();
System.out.println("Sending 'POST' request to URL : " + url);
System.out.println("Post Data : " + postJsonData);
System.out.println("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String output;
StringBuffer response = new StringBuffer();
while ((output = in.readLine()) != null) {
response.append(output);
}
in.close();
System.out.println(response.toString());
}
}
import http.client
class MxgateExample(object):
def __init__(self):
/*
* mxgate listens on port 8086 of localhost
*/
self.url = "localhost:8086"
self.postData = "public.testtable\n/" \
"1603777821|1|101|201|301\n/" \
"1603777822|2|102|202|302\n/" \
"1603777823|3|103|203|303"
self.headers = {"Content-Type": "text/plain"}
/*
* HTTP Post request
*/
def sending_post_request(self):
conn = http.client.HTTPConnection(self.url)
conn.request("POST", "/", self.postData, self.headers)
response = conn.getresponse()
response_code = response.getcode()
print(f"Sending 'POST' request to URL : {self.url}")
print(f"Post Data : {self.postData}")
print(f"Response Code : {response_code}")
output = response.read()
print(output)
if __name__ == '__main__':
gate_post = MxgateExample()
gate_post.sending_post_request()
Рекомендуется использовать среду разработки C# Core.
using System;
using System.IO;
using System.Net;
using System.Text;
namespace HttpPostTest
{
class Program
{
static void Main(string[] args)
{
var url = "http://10.13.2.177:8086/";
var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";
HttpPost(url,txt);
}
public static string HttpPost(string url, string content){
string result = "";
HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url);
req.Method = "POST";
req.ContentType = "text/plain";
/*
* region Add Post parameters
*/
byte[] data = Encoding.UTF8.GetBytes(content);
req.ContentLength = data.Length;
using (Stream reqStream = req.GetRequestStream()){
reqStream.Write(data, 0, data.Length);
reqStream.Close();
}
HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
Stream stream = resp.GetResponseStream();
/*
* Get response content
*/
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
result = reader.ReadToEnd();
}
return result;
}
}
}
Если вы столкнулись с ошибкой
error when serving connection ***** body size exceeds the given limit, увеличьте значениеmax-body-bytesв разделеmxgate.conf.
package main
import (
"bytes"
"net/http"
)
func PostDataToServer(URL string) error {
data := `public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
`
resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
if err != nil {
return err
}
if resp.StatusCode != 200 {
/*
* Обработка тела ответа.
*/
return nil
}
/*
* Обработка тела ответа.
*/
return nil
}
func main() {
err := PostDataToServer("http://127.0.0.1:8086")
if err != nil{
panic(err)
}
}
Примечание!
Дополнительные возможности MatrixGate см. в Руководстве по инструментам — mxgate.