В этом документе описываются основные функции MatrixGate.
MatrixGate предоставляет внешнему миру HTTP API, позволяя различным языкам программирования импортировать данные в базы данных MatrixDB через HTTP-интерфейс. Ниже приведены формат протокола и коды ответов HTTP от MatrixGate.
Формат протокола HTTP MatrixGate
| Тип протокола | Формат протокола | Использование и примеры |
| --- | --- |
| URL | http://\
Timestamp|ID|C1|C2|..|Cn | Формат тела: первая строка — целевая таблица для загрузки данных. SchemeName можно опустить (по умолчанию Public), TableName — обязательный параметр. Начиная со второй строки — строки временных рядов. Каждая строка соответствует одной строке целевой таблицы. Колонки разделяются символом |, строки — \n. Первое поле каждой строки — временная метка (UNIX timestamp, точность до секунд, см. описание --time-format). Второе поле — TagID (целое число). С третьего поля и далее — значения столбцов, соответствующих целевой таблице. Рекомендуется, чтобы DDL-определение целевой таблицы также следовало порядку столбцов (Timestamp, TagID, C1, C2,…, Cn) |
Коды ответов HTTP MatrixGate
| Код ответа | Значение кода ответа | Примечания |
| --- | --- |
| 200 | StatusOK | Часть данных имеет неправильный формат; в теле ответа будет указан номер строки с ошибкой и сообщение об ошибке, например: At line: 2missing data for column "c3" |
| 204 | StatusNoContent | Данные успешно загружены в MatrixGate |
| 400 | StatusBadRequest | Ошибка запроса данных: неверный формат тела POST, таблица не существует, формат сжатия данных не соответствует заголовку HTTP-запроса и т.д. |
| 405 | StatusMethodNotAllowed | Использован не POST-запрос |
| 408 | StatusTimeout | Превышено время ожидания запроса |
| 500 | StatusIntervalServerError | Ошибка базы данных, сбой при загрузке данных; тело ответа содержит подробную информацию об ошибке |
| 503 | StatusServiceUnavailable | MatrixGate отклоняет запросы, например, превышено максимальное количество подключений или MatrixGate завершает работу |
CREATE TABLE testtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT)
DISTRIBUTED BY (tagid);
mxgate config --db-database testdb \
--db-master-host localhost \
--db-master-port 5432 \
--db-user mxadmin \
--db-password 123123 \
--target public.testtable \
--format csv \
--time-format unix-second \
--delimiter '|' \
--parallel 256 \
--stream-prepared 3 \
--interval 250 \
--transform plain \
> mxgate.conf
public.testtable
1603777821|1|101|201|301
1603777822|2|102|202|302
1603777823|3|103|203|303
mxgate --config mxgate.conf
curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@data.txt"
demo=# SELECT extract(epoch FROM "time"), * FROM testtable;
date_part | time | tagid | c1 | c2 | c3
----------------------------------------------------------------------------------------------------------------------------------
1603777821 | 2020-10-27 13:50:21+08 | 1 | 101 | 201 | 301
1603777822 | 2020-10-27 13:50:22+08 | 2 | 102 | 202 | 302
1603777823 | 2020-10-27 13:50:23+08 | 3 | 103 | 203 | 303
(3 rows)
SDK (Software Development Kit) — это комплект инструментов разработки программного обеспечения, который освобождает разработчиков от необходимости реализации нефункциональных задач, значительно повышая эффективность и удобство разработки.
Примечание!
Выберите любой из вышеперечисленных способов. Рекомендуется использовать (1) или (2), чтобы напрямую подключить SDK из удалённых репозиториев Maven или Gradle — это эффективно и удобно. По способу (3) см. MatrixGate FAQ:19. Рекомендуется выполнить поиск по ключевому словуJAVA SDK, так как этот способ используется редко, здесь он подробно не рассматривается.
(1). Вызов удалённого репозитория Maven для автоматической загрузки пакетов SDK
Настройте следующие зависимости в файле pom.xml вашего Java-проекта.
<dependencies>
<dependency>
<groupId>cn.ymatrix</groupId>
<artifactId>mxgate-sdk-java</artifactId>
<version>1.1.2</version>
</dependency>
</dependencies>
(2). Использование удалённого репозитория Gradle для подключения зависимостей SDK
repositories {
mavenCentral()
}
dependencies {
implementation 'cn.ymatrix:mxgate-sdk-java:1.0.20'
}
Примечание!
Необходимо использовать версию MatrixDB, поддерживающую gRPC, то есть 4.6.1 и выше.
Создайте файл конфигурации mxgate_grpc.conf на Master и запустите mxgate.
Примечание!
Перед использованием mxgate необходимо предварительно создать таблицы в базе данных. В примере хост Master — mdw, база данных — demo, таблица данных — test_table_transfer.
Код следующий:
# Create mxgate config file
[mxadmin@mdw ~]$ mxgate config \
--source grpc \
--db-database demo \
--target public.test_table_transfer \
--time-format raw \
--grpc-port 8087 \
--format csv \
> mxgate_grpc.conf
# Start mxgate
[mxadmin@mdw ~]$ mxgate start --config mxgate_grpc.conf
Как показано в примере, для записи данных в mxgate с помощью SDK необходимо указать, что параметр source mxgate — grpc, а format — csv. Также, поскольку SDK должен знать, куда отправлять данные, необходимо указать номер порта grpc-port в конфигурационном файле (в примере — 8087).
Примечание!
При запуске mxgate через HTTP всё равно требуется указывать номер порта gRPC mxgate, поскольку SDK по-прежнему получает метаданные таблиц базы данных из mxgate по протоколу gRPC, но способ записи данных переключается на HTTP.
Создайте файл конфигурации mxgate_http.conf на Master и запустите mxgate. В примере имя хоста Master — mdw, код следующий:
# Create the mxgate config file.
[mxadmin@mdw ~]$ mxgate config \
--source http \
--db-database demo \
--target public.test_table_transfer \
--time-format raw \
--grpc-port 8087 \
--format csv \
> mxgate_http.conf
# Start mxgate
[mxadmin@mdw ~]$ mxgate start --config mxgate_http.conf
Partitioned table "public.test_table_transfer"
Column | Type | Collation | Nullable | Default
--------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
ts | timestamp without time zone | | |
tag | integer | | not null |
c1 | double precision | | |
c2 | double precision | | |
c3 | double precision | | |
c4 | double precision | | |
c5 | text | | |
c6 | text | | |
c7 | text | | |
c8 | text | | |
Partition key: RANGE (ts)
Сначала выполните глобальную инициализацию:
// Set the log level, default INFO.
MxLogger.loggerLevel(LoggerLevel.INFO);
// The log will be output to stdout by default. If you do not want to output, you can pass false in the following API.
MxLogger.enableStdout(true);
// The default log file path and naming format of SDK is /tmp/mxgate_sdk_java_2022-08-26_133403.log,
// Users can customize the file path and file name of the output log file.
MxLogger.writeToFile("/tmp/mxgate_sdk_java.log");
Примечание!
Создавайте MxBuilder только один раз в проекте. Соответствующую конфигурацию можно выполнить до инициализации MxBuilder.MxBuilder builder = MxBuilder.newBuilder() .withDropAll(false) // 如果需用于测试,则可设置为 true,不发送数据给 mxgate,直接 drop;发送数据到 mxgate 需设置为 false .withCacheCapacity(100000) // 用于暂存 tuples 微批的 queue的大小 .withCacheEnqueueTimeout(2000) // 若queue满,tuples 存入 queue 的等待超时时间。若超时,会抛出IllegalStateException .withConcurrency(10) // 同时向 mxgate 写入数据的线程数量。 .withRequestTimeoutMillis(3000) // 每个线程每次数据写入请求的超时时间(毫秒)。 .withMaxRetryAttempts(3) // 每个线程每次写入请求遇到问题后的重试次数。 .withRetryWaitDurationMillis(3000) // 每次重试的时间间隔(当前的实现,每次重试的时间间隔是固定的)。 .withRequestType(RequestType.WithHTTP) // SDK 支持通过 HTTP 和 gRPC 两种方式向 mxgate post 数据,对应的配置为:RequestType.WithHTTP,RequestType.WithGRPC .withCircuitBreaker() // 使用内置熔断器。若失败率或者慢请求率达到阈值,则会开启,开启后持续30秒,暂停向 mxgate 发送数据,亦无法 append tuple .withMinimumNumberOfCalls(1) // 熔断器生效的最小请求数(要求 >= 1,默认 10) .withSlidingWindowSize(10) // 用以计算失败率的滑动窗口大小(要求 >= 1,默认 100) .withFailureRateThreshold(60.0f) // 失败率阈值(要求 >0 且 <= 100),若失败率达到阈值则会开启熔断器 .withSlowCallDurationThresholdMillis(1000) // 慢请求时长阈值(毫秒),超过该时长则认为是慢请求(注意该时长应小于请求超时时间) .withSlowCallRateThreshold(80.0f)// 慢请求阈值,若慢请求率达到阈值则会开启熔断器 // .withRequestAsync(true)// Turn on asynchronous mode to send data to mxgate (if you use this function to remove the comment content of this line) // .withConcurrency(20)// Usually, asynchronous mode only requires dozens of concurrency to achieve the same or even higher throughput as synchronous mode (such as using this function to remove the comment content of this line) // MxBuilder's builder mode has added the following API to adjust the concurrency of CSV parallel conversions (MxClient Group level) // .withCSVConstructionParallel(100)// (This function starts with v1.1.0, if you use it, just remove the comment content of this line) .build(); // MxBuilder has added a singleton API. After MxBuilder is successfully built, you can obtain the instance object of the globally unique singleton of MxBuilder at any location through the following API. There is no need for the user to manually write code to maintain the global reference of MxBuilder. // MxBuilder.instance();// (This function starts with v1.1.2, if you use it, just remove the comment content of this line)
// builder.getTupleCacheSize(); // Used to get the number of remaining Tuples in the Tuple cache inside the SDK in real time (if you use this function to remove the comment content of this line)
Метод connect объекта Builder принимает четыре параметра:
(1). Имя хоста (IP-адрес) и номер порта процесса mxgate, где предоставляется конкретная служба, используемая для получения данных и метаданных таблиц базы данных.
Например, для метода HTTP — `http://localhost:8086/`; для метода gRPC — `localhost:8087`
(2). Схема.
(3). Имя таблицы.
(4). Обратный вызов (callback). Если Builder успешно подключится к mxgate, будет вызван метод onSuccess и возвращён экземпляр MxClient для записи данных. При неудачном подключении будет вызван onFailure, а failureMsg объяснит причину сбоя.
Конкретный код следующий. Не забудьте заменить номер порта, имя схемы и имя таблицы на фактические значения.
// Asynchronous method (this feature is supported starting from v1.0.13) builder.connect("http://localhost:8086/", "localhost:8087", "public", "test_table", new ConnectionListener() { @Override public void onSuccess(MxClient client) { sendData(client); }
@Override
public void onFailure(String failureMsg) {
}
});
Через функцию обратного вызова onSuccess() интерфейса ConnectionListener можно получить экземпляр MxClient, а затем использовать API MxClient для реализации записи данных в mxgate.
// Synchronization method (this feature is supported starting from v1.0.13) MxClient client = builder.connect(httpHost, gRPCHost, schema, table);
// v1.1.0 improves the scalability of MxClient, and MxBuilder provides several new APIs // The MxClient obtained through this API belongs to a MxClient Group // You can freely define the concurrency parameters of MxClient Group Number, Tuples cache capability, and TuplesConsumer through these APIs. // MxClient client = builder.connectWithGroup(dataSendingHost, metadataHost, schema, table, 10);// (This function starts from v1.1.0, if you use to remove the comment content of this line) // MxClient client = builder.connectWithGroup(dataSendingHost, metadataHost, schema, table, 1, 1000, 3000, 10);// (This function starts with v1.1.0, if you use to remove the comment content of this line) // APIs starting with skip will skip the process of connecting to back-end services to obtain database table meta information when calling. // This type of MxClient can only obtain lightweight Tuples through generateEmptyTupleLite() // MxClient client = mxBuilder.skipConnectWithGroup(dataSendingHost, metadataHost, schema, table, delimiter, 1);// (This function starts with v1.1.0, if you use the comment content of this line, you can just remove the comment content) // MxClient client = mxBuilder.skipConnectWithGroup(dataSendingHost, metadataHost, schema, table, delimiter, 1, 1000, 3000, 1);// (This function starts with v1.1.0, if you use the comment content of this line, you can just remove the comment content)
> ***Примечание!***
MxClient потокобезопасен, однако при использовании в многопоточной среде для обеспечения оптимальной производительности рекомендуется иметь отдельный MxClient в каждом потоке, то есть возвращать несколько экземпляров MxClient через несколько вызовов connect MxBuilder.
private void sendData(MxClient client) { if (client != null) { // MxClient will accumulate a batch of Tuples as a micro batch and send them to mxgate. // This API sets the waiting time accumulated for each microbatch, with a default of 2000 million seconds. // That is, each 2s will try to send a batch of data to mxgate if the 2s time period is // No data is written, no sending. client.withIntervalToFlushMillis(5000); // Set how many Tuple bytes accumulated to be sent as a microbatch, even if the time has not been reached // The set flush interval, the accumulated micro-batch bytes will also be sent if it reaches the number of micro-batch bytes. // The time of flush interval has reached, and the number of bytes that are not accumulated enough to be set will also be sent. client.withEnoughBytesToFlush(500); // Compared withEnoughBytesToFlush, when appendTuple // Performance is improved because it avoids calculating bytes // Depending on the specific use scenario, if withEnoughBytesToFlush // It can also meet the performance requirements, so each flush, the data volume will be more uniform. // withEnoughLinesToFlush will have higher priority than withEnoughBytesToFlush client.withEnoughLinesToFlush(10000); // MxClient has added the following API to adjust the number of Tuples for each CSV conversion subtask // For example, there are 10,000 Tuples in Flush at a time. Set BatchSize = 2000 through the following API // Then the CSV conversion of these 10,000 Tuples will be divided into 5 subtasks, each subtask handles 2,000 Tuples and executes concurrently. client.withCSVConstructionBatchSize(2000);//(此功能从 v1.1.0 开始支持) // Each MxClient has a private object pool // When using it, you need to set the size of the object pool according to the number of Tuples in each MxClient flush. // client.useTuplesPool(poolSize); // (If you use this function to remove the comment content of this line) // MxClient supports compression and needs to be used with mxgate v4.7.6 and higher // client.withCompress(); // (If you use this function to remove the comment content of this line) // For HTTP requests, base64 encoding can be used without using base64 encoding, gRPC needs to perform base64 encoding. // client.withBase64Encode4Compress(); // (If you use this function to remove the comment content of this line) // MxClient can register a DataPostListener, and the success and failure of each batch of data are sent. // They will all callback in onSuccess and onFailure. You can understand which Tuple writes succeed and which Tuple writes fail client.registerDataPostListener(new DataPostListener() { @Override public void onSuccess(Result result) { System.out.println(CUSTOMER_LOG_TAG + "Send tuples success: " + result.getMsg()); System.out.println(CUSTOMER_LOG_TAG + "Succeed lines onSuccess callback " + result.getSucceedLines()); }
@Override
public void onFailure(Result result) {
// result.getErrorTuplesMap() key-value pair containing the line of error and the cause of the error Tuple -> String
// Key is the wrong line, value is the reason for the error;
for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) {
l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue());
for (Column c : entry.getKey().getColumns()) {
l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue());
}
}
System.out.println(result.getSuccessLines());
}
});
Возвращает пустой объект Tuple, который можно заполнить с помощью API generateEmptyTuple MxClinet.
Tuple tuple1 = client.generateEmptyTuple(); Tuple tuple2 = client.generateEmptyTuple();
Заполните пустой Tuple в формате «ключ -> значение». Ключ — имя столбца в таблице базы данных; значение — соответствующее значение поля. Если некоторые поля могут быть null или имеют значения по умолчанию, их можно не указывать — SDK автоматически заполнит значениями по умолчанию или пустыми значениями.
Метаданные таблиц базы данных, полученные через этот API, больше не поддерживаются внутри Tuple. Это более легковесный Tuple. Поэтому при вызове MxClient.appendTuple() для добавления в MxClient исключается работа по проверке легитимности данных Tuple на основе метаданных таблицы, что позволяет максимально быстро отправлять данные Tuple.
// Tuple tuple = client.generateEmptyTupleLite();// (This function starts with v1.1.0, if you use it to remove the comment content of this line)
>***Примечание!***
При добавлении столбцов в этот легковесный Tuple необходимо вручную поддерживать порядок пар `addColumn() key -> value`, чтобы он соответствовал порядку столбцов в таблице базы данных.
Например, порядок столбцов в таблице данных следующий:
Column | Type | Collation | Nullable | Default --------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ts | timestamp without time zone | | | tag | integer | | not null | c1 | double precision | | | c2 | double precision | | | c3 | double precision | | | c4 | double precision | | | c5 | text | | | c6 | text | | | c7 | text | | | c8 | text | | |
Тогда порядок добавления столбцов должен соответствовать порядку столбцов в таблице базы данных. В данном примере он следующий:
tuple1.addColumn("ts", "2022-05-18 16:30:06");
tuple1.addColumn("tag", 102020030);
tuple1.addColumn("c1", 1.1);
tuple1.addColumn("c2", 2.2);
tuple1.addColumn("c3", 3.3);
tuple1.addColumn("c4", 4.4);
tuple1.addColumn("c5", "中文字符测试-1");
tuple1.addColumn("c6", "lTxFCVLwcDTKbNbjau_c6");
tuple1.addColumn("c7", "lTxFCVLwcDTKbNbjau_c7");
tuple1.addColumn("c8", "lTxFCVLwcDTKbNbjau_c8");
tuple2.addColumn("ts", "2022-05-18 16:30:06");
tuple2.addColumn("tag", 102020030);
tuple2.addColumn("c1", 1.1);
tuple2.addColumn("c2", 2.2);
tuple2.addColumn("c3", 3.3);
tuple2.addColumn("c4", 4.4);
tuple2.addColumn("c5", "中文字符测试-2");
tuple2.addColumn("c6", "lTxFCVLwcDTKbNbjau_c26");
tuple2.addColumn("c7", "lTxFCVLwcDTKbNbjau_c27");
tuple2.addColumn("c8", "lTxFCVLwcDTKbNbjau_c28");
Наконец, добавьте заполненный Tuple в MxClient.
client.appendTuples(tuple1, tuple2);
> ***Примечание!***
MxClient предоставляет несколько API: можно добавлять один Tuple за раз, несколько Tuple за раз или список Tuples. Например: `client.appendTuple();`, `client.appendTupleList();`; также можно вызвать метод flush() MxClient для ручной отправки данных, независимо от количества записанных Tuple, например: `client.flush();`.
- Синхронная отправка данных с использованием MxClient
Синхронная отправка означает синхронную передачу Tuple в mxgate, что упрощает последующую обработку (например, фиксацию смещения Kafka).
Tuple tuple1 = client.generateEmptyTuple(); tuple1.addColumn("ts", "2022-05-18 16:30:06"); tuple1.addColumn("tag", 102020030); tuple1.addColumn("c1", 1.1); tuple1.addColumn("c2", 2.2); tuple1.addColumn("c3", 3.3); tuple1.addColumn("c4", 4.4); tuple1.addColumn("c5", "lTxFCVLwcDTKbNbjau_c5"); tuple1.addColumn("c6", "lTxFCVLwcDTKbNbjau_c6"); tuple1.addColumn("c7", "lTxFCVLwcDTKbNbjau_c7"); tuple1.addColumn("c8", "lTxFCVLwcDTKbNbjau_c8");
// appendTupleBlocking will return the return value of boolean type
// true: MxClient is already full of the set bytes size, and you can send a request
// false: The set bytes size has not been fully written yet
try {
if (client.appendTupleBlocking(tuple1)) {
l.info("append tuples enough");
// Manual trigger tuples flush
client.flushBlocking();
}
// If the flushBlocking throws an exception as follows, it means that the entire batch of Tuples cannot be written to mxgate, and the caller can handle the exception accordingly
} catch (AllTuplesFailException e) {
l.error("Tuples fail and catch the exception return.", e);
for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) {
l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue());
for (Column c : entry.getKey().getColumns()) {
l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue());
}
}
// If the flushBlocking throws an exception as follows, it means that some Tuples cannot be written to mxgate, and the caller can handle the exception accordingly
} catch (PartiallyTuplesFailException e) {
for (Map.Entry<Tuple, String> entry : result.getErrorTuplesMap().entrySet()) {
l.error(CUSTOMER_LOG_TAG + "error tuple of table={}, tuple={}, reason={}", entry.getKey().getTableName(), entry.getKey(), entry.getValue());
for (Column c : entry.getKey().getColumns()) {
l.error(CUSTOMER_LOG_TAG + "error entry columns {} = {}", c.getColumnName(), c.getValue());
}
}
}
#### 3.1.2 Пример HTTP API MatrixGate на Java
import java.io.BufferedReader; import java.io.DataOutputStream; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL;
public class MxgateExample { public static void main(String[] args) throws Exception { MxgateExample http = new MxgateExample(); http.sendingPostRequest(); }
// HTTP Post request
private void sendingPostRequest() throws Exception {
// mxgate listens on port 8086 of localhost
String url = "http://localhost:8086/";
URL obj = new URL(url);
HttpURLConnection con = (HttpURLConnection) obj.openConnection();
// Setting basic post request
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type","text/plain");
String postJsonData = "public.testtable\n1603777821|1|101|201|301\n1603777822|2|102|202|302\n1603777823|3|103|203|303";
con.setDoOutput(true);
DataOutputStream wr = new DataOutputStream(con.getOutputStream());
// When the data is in Chinese, it can be encoded by postJsonData.getBytes("UTF-8")
wr.write(postJsonData.toString().getBytes("UTF-8"));
wr.flush();
wr.close();
int responseCode = con.getResponseCode();
System.out.println("Sending 'POST' request to URL : " + url);
System.out.println("Post Data : " + postJsonData);
System.out.println("Response Code : " + responseCode);
BufferedReader in = new BufferedReader(new InputStreamReader(con.getInputStream()));
String output;
StringBuffer response = new StringBuffer();
while ((output = in.readLine()) != null) {
response.append(output);
}
in.close();
System.out.println(response.toString());
}
}
<a name="python"><br/></a>
### 3.2 Пример HTTP API MatrixGate на Python
import http.client
class MxgateExample(object): def init(self):
self.url = "localhost:8086"
self.postData = "public.testtable\n/" \
"1603777821|1|101|201|301\n/" \
"1603777822|2|102|202|302\n/" \
"1603777823|3|103|203|303"
self.headers = {"Content-Type": "text/plain"}
# HTTP Post request
def sending_post_request(self):
conn = http.client.HTTPConnection(self.url)
conn.request("POST", "/", self.postData, self.headers)
response = conn.getresponse()
response_code = response.getcode()
print(f"Sending 'POST' request to URL : {self.url}")
print(f"Post Data : {self.postData}")
print(f"Response Code : {response_code}")
output = response.read()
print(output)
if name == 'main': gate_post = MxgateExample() gate_post.sending_post_request()
<a name="C#"><br/></a>
### 3.3 Пример HTTP API MatrixGate на C#
> Рекомендуется использовать среду разработки C# Core
using System; using System.IO; using System.Net; using System.Text;
namespace HttpPostTest { class Program { static void Main(string[] args) { var url = "http://10.13.2.177:8086/"; var txt = "public.dest\n2021-01-01 00:00:00,1,a1\n2021-01-01 00:00:00,2,a2\n2021-01-01 00:00:00,3,a3";
HttpPost(url,txt);
}
public static string HttpPost(string url, string content){ string result = ""; HttpWebRequest req = (HttpWebRequest)WebRequest.Create(url); req.Method = "POST"; req.ContentType = "text/plain";
#region Add Post parameters
byte[] data = Encoding.UTF8.GetBytes(content);
req.ContentLength = data.Length;
using (Stream reqStream = req.GetRequestStream()){
reqStream.Write(data, 0, data.Length);
reqStream.Close();
}
#endregion
HttpWebResponse resp = (HttpWebResponse)req.GetResponse();
Stream stream = resp.GetResponseStream();
//Get the response content
using (StreamReader reader = new StreamReader(stream, Encoding.UTF8)){
result = reader.ReadToEnd();
}
return result;
}
} }
> Если возникает ошибка "connection ***** body size exceeds the given limit", увеличьте параметр max-body-bytes в файле mxgate.conf
### 3.4 Пример HTTP API MatrixGate на Golang
package main
import ( "bytes" "net/http" )
func PostDataToServer(URL string) error {
data := public.testtable 1603777821|1|101|201|301 1603777822|2|102|202|302 1603777823|3|103|203|303
resp, err := http.Post(URL, "application/text", bytes.NewBuffer([]byte(data)))
if err != nil {
return err
}
if resp.StatusCode != 200 {
// Обработка тела ответа.
return nil
}
// Обработка тела ответа.
return nil
}
func main() { err := PostDataToServer("http://127.0.0.1:8086") if err != nil{ panic(err) }
}
<a name="special_type"><br/></a>
## 4 MatrixGate Loading Special Types
### 4.1 Example of MatrixGate loading CSV files
- Create table csvtable in demo database.
demo=# CREATE TABLE csvtable (time TIMESTAMP WITH TIME ZONE, tagid INT, c1 INT, c2 INT, c3 INT) DISTRIBUTED BY (tagid);
- Edit the data load file data.csv, the content is as follows:
1603777821|1|101|201|301 1603777822|2|102|202|302 1603777823|3|103|203|303
- Start mxgate, specify the source parameter to stdin, the target table is an existing csvtable, and the loading parallelism is 2. In the example, the host is mdw.
[mxadmin@mdw ~]$ mxgate \ --source stdin \ --db-database demo \ --db-master-host 127.0.0.1 \ --db-master-port 5432 \ --db-user mxadmin \ --time-format unix-second \ --delimiter "|" \ --target csvtable \ --parallel 2 < data.csv
- Connect to the database to query whether the data is loaded successfully.
demo=# SELECT * FROM csvtable ; time | tagid | c1 | c2 | c3 -----------------------+-------+-------------------------------------------------------------------------------------------------- 2020-10-27 05:50:23+08 | 3 | 103 | 203 | 303 2020-10-27 05:50:22+08 | 2 | 102 | 202 | 302 2020-10-27 05:50:21+08 | 1 | 101 | 201 | 301
(3 rows)
### 4.2 Example of MatrixGate loading JSON fields
#### 4.2.1 JSON
- Create table.
demo=# CREATE TABLE json_test(id int, j json);
- Создание файлов данных.
`~/json.csv`
1|"{""a"":10, ""b"":""xyz""}"
- Загрузка
Здесь в качестве примера используется режим stdin, другие режимы аналогичны.
Ключевой параметр — `--format csv`.
[mxadmin@mdw ~]$ mxgate \ --source stdin \ --db-database postgres \ --db-master-host 127.0.0.1 \ --db-master-port 7000 \ --db-user mxadmin \ --time-format raw \ --format csv \ --delimiter "|" \ --target json_test < ~/json.csv
- Просмотр загруженных данных.
demo=# SELECT * FROM json_test; id | j ----+----------------------------------------------------------------------------------------------------------------------------- 1 | {"a":10, "b":"xyz"} (1 row)
#### 4.2.2 Массив JSON
- Создание таблицы.
demo=# CREATE TABLE json_array_test(id int, j _json);
- Создание файлов данных
`~/json_array.csv`.
1|"{""{\""a\"":10, \""b\"":\""xyz\""}"",""{\""c\"": 10}""}"
- Загрузка через mxgate.
[mxadmin@mdw ~]$ mxgate \ --source stdin \ --db-database postgres \ --db-master-host 127.0.0.1 \ --db-master-port 7000 \ --db-user mxadmin \ --time-format raw \ --format csv \ --delimiter "|" \ --target json_array_test < ~/json_array.csv
- Проверка.
demo=# SELECT * FROM json_array_test ; id | j ----+----------------------------------------------------------------------------------------------------------------------------- 1 | {"{\"a\":10, \"b\":\"xyz\"}","{\"c\": 10}"} (1 row)
> ***Примечание!***
Поскольку столбец JSON содержит специальные символы, такие как кавычки, параметр --format mxgate должен быть установлен в CSV.
<a name="watch"><br/></a>
## 5 Наблюдение за показателями работы mxgate
`watch` — это подкоманда mxgate, использующая ряд показателей для описания работы демона mxgate.
Существуют два режима `watch`:
- Режим реального времени: каждые 3 секунды выводятся метрики gate в формате, аналогичном sar.
- Режим исторических наблюдений: можно указать любой период времени (например, каждый час вчера, каждый день прошлого месяца, каждый месяц прошлого года) для статистики скорости импорта.
### 5.1 Наблюдение в реальном времени
[mxadmin@mdw ~]$ mxgate watch
Показатели работы mxgate будут собираться каждые три секунды, результат вывода следующий:
Time WCount ICount WSpeed/s ISpeed/s WBandWidth MB/S BlocakItems
2022-04-28 15:20:58 14478858 14527011 2598081 2627887 2395 0 2022-04-28 15:21:01 22231035 22633254 2584059 2702081 2222 0 2022-04-28 15:21:04 30494310 30500874 2754425 2622540 3551 0 2022-04-28 15:21:07 38004210 38032956 2503300 2510694 2862 0 2022-04-28 15:21:10 46188696 46298223 2728162 2755089 2227 0 ...
Параметр --info можно использовать для получения описания каждого из приведённых выше показателей.
[mxadmin@mdw ~]$ mxgate watch --info
По умолчанию выводятся только показатели скорости. Для анализа проблем можно наблюдать за показателями времени с помощью параметра --watch-lateency.
[mxadmin@mdw ~]$ mxgate watch --watch-latency
### 5.2 Наблюдение за историческими данными
[mxadmin@mdw ~]$ mxgate watch --history
Будет рассчитана средняя скорость за каждый час в течение 24 часов с текущего момента, результат вывода следующий:
TIME RANGE | SPEED/S | BANDWIDTH MB/S | BLOCK ITEMS
2022-04-28 16:00:00-2022-04-28 17:00:00 | 2208010 | 1254.48 | 0 2022-04-28 17:00:00-2022-04-28 18:00:00 | 1157920 | 1327.00 | 0 2022-04-28 18:00:00-2022-04-28 19:00:00 | 2228666 | 2162.32 | 0 2022-04-28 19:00:00-2022-04-28 20:00:00 | 1371092 | 2881.30 | 0 2022-04-28 20:00:00-2022-04-28 21:00:00 | 1575320 | 2608.20 | 0
SPEED/S, BANDWIDTH MB/S — скорость импорта записей и пропускная способность импорта (в МБ/с),
BLOCK ITEMS — объем данных, заблокированных в mxgate. Это значение возрастает, когда скорость потребления базы данных не успевает за скоростью генерации данных источниками (HTTP, Kafka и т.д.).
Можно добавить параметры `--watch-start`, `--watch-end`, `--watch-duration` для управления интервалом и периодом наблюдения за историческими данными.
Например:
[mxadmin@mdw ~]$ mxgate watch --history --watch-start '2022-03-27 00:00:00' --watch-end '2022-04-27 00:00:00' --watch-duration '168h'
Средняя скорость импорта в неделю (каждые 168 ч) с 27 марта по 27 апреля
Параметр `--watch-duration` поддерживает три единицы измерения: `h`` `m`` `s`.
### 5.3 Наблюдение за полным процессом вставки данных [**Экспериментальные функции**, поддерживаются с версии v4.8.2]
>***Примечание!***
Экспериментальные функции — это функции, присутствующие в определённой версии, но ещё не официально выпущенные. Они могут быть изменены по синтаксису или реализации либо удалены без предварительного уведомления. Используйте их с осторожностью.
YMatrix поддерживает использование параметра `--instrumentation (-I)` для более удобного выявления проблем при записи данных или узких мест производительности. При включении этого параметра в лог mxgate выводится информация о времени выполнения процесса от начала вставки до отправки задачи.
Полное инструментирование одной операции вставки выглядит примерно следующим образом:
2023-04-26:02:51:43.679 xxx-[INFO]:-[Writer.Instrumentation] start_insert_txn [slotid:7] [insert_seq:123] 2023-04-26:02:51:43.683 xxx-[INFO]:-[Writer.Instrumentation] wait_singleconn [slotid:7] [insert_seq:123] [cost: 4ms] [seg:0] [ssid:13346] [seg_cnt:2] 2023-04-26:02:51:43.683 xxx-[INFO]:-[Writer.Instrumentation] wait_singleconn [slotid:7] [insert_seq:123] [cost: 4ms] [seg:1] [ssid:13346] [seg_cnt:2] 2023-04-26:02:51:44.730 xxx-[INFO]:-[Writer.Instrumentation] wait_allconn [slotid:7] [insert_seq:123] [cost:1051ms] 2023-04-26:02:51:44.747 xxx-[INFO]:-[Writer.Instrumentation] wait_flow [slotid:7] [insert_seq:123] [cost: 17ms] [seg:1] [ssid:13346] [seg_cnt:2] 2023-04-26:02:51:44.747 xxx-[INFO]:-[Writer.Instrumentation] write_header [slotid:7] [insert_seq:123] [cost: 0ms] [seg:1] [ssid:13346] [seg_cnt:2] [bytes:5] 2023-04-26:02:51:44.747 xxx-[INFO]:-[Writer.Instrumentation] write_body [slotid:7] [insert_seq:123] [cost: 0ms] [seg:1] [ssid:13346] [seg_cnt:2] [bytes:45] 2023-04-26:02:51:44.831 xxx-[INFO]:-[Writer.Instrumentation] write_eof [slotid:7] [insert_seq:123] [cost: 0ms] [seg:1] [ssid:13346] [seg_cnt:2] 2023-04-26:02:51:44.831 xxx-[INFO]:-[Writer.Instrumentation] write_eof [slotid:7] [insert_seq:123] [cost: 0ms] [seg:0] [ssid:13346] [seg_cnt:2] 2023-04-26:02:51:44.832 xxx-[INFO]:-[Writer.Instrumentation] wait_insertdone [slotid:7] [insert_seq:123] [cost: 1ms] [rows:17] 2023-04-26:02:51:44.836 xxx-[INFO]:-[Writer.Instrumentation] commit [slotid:7] [insert_seq:123] [cost: 3ms] [rows:17] 2023-04-26:02:51:44.836 xxx-[INFO]:-[Writer.Instrumentation] complete_insert_txn [slotid:7] [insert_seq:123]
Данный лог содержит следующую ключевую информацию:
- `INSERT` Время начала: `start_insert_txn`
- Время ожидания подключения к Segment: `wait_singleconn`
- Время, когда все Segment подключились к слоту (Slot), и началась отправка: `wait_allconn`
- Время ожидания входящих данных: `wait_flow`
- Время записи каждого фрагмента данных: `write_header` (запись заголовка данных) / `write_body` (запись содержимого данных) / `write_eof` (запись завершения данных)
- Время от получения EOF до завершения вставки: `wait_insertdone`
- Время выполнения отправки: `commit`
- Время окончания данной операции вставки: `complete_insert_txn`
Вы можете добавить его в параметры запуска mxgate (или изменить этот параметр в конфигурационном файле). Примеры:
Отключение наблюдения за процессом вставки:
$ mxgate start --instrumentation disable
Включение наблюдения для слота 0 (Slot):
$ mxgate start --instrumentation single
Включение наблюдения для всех слотов:
$ mxgate start --instrumentation all
>***Примечание!***
`--instrumentation` Подробную информацию о подпараметрах см. в разделе [Параметры командной строки](/ru/doc/4.8/tools/mxgate/feature#instrumentation).
<a name="non-stop_update_parameters"><br/></a>
## 6 Обновление параметров параллельной записи без остановки
mxgate поддерживает изменение параметров параллельной загрузки во время работы без остановки: «interval» и «stream-prepared». Параметр «interval» определяет рабочее время каждого соединения записи от mxgate к таблице базы данных, а «stream-prepared» — количество активных соединений записи. С точки зрения логики mxgate, одновременно только одно соединение записи может выполнять задачу записи для одной и той же таблицы базы данных. Поэтому для каждой таблицы требуется несколько соединений, чтобы непрерывно выполнять свои задачи записи в разные временные интервалы, обеспечивая тем самым высокоскоростную и эффективную запись данных. В этом процессе вы можете использовать параметр «interval», чтобы настроить рабочее время каждого соединения записи, целенаправленно повысив скорость записи и производительность загрузки. Конкретные примеры использования:
- `mxgate set --stream-prepared-cli 3` Установить количество соединений записи на таблицу равным 3

- `mxgate get --stream-prepared-get` Получить текущее количество активных соединений записи на таблицу

- `mxgate set --job-interval 200` Установить временной интервал соединений записи для всех таблиц в 200 мс

- `mxgate get --job-interval-get` Получить текущий временной интервал соединений записи для всех таблиц

> ***Примечание!***
Для указанных выше параметров, если вы хотите задать или получить количество соединений записи или рабочее время для конкретной таблицы, добавьте после команды `--job <name>`. Каждое задание (job) соответствует одной таблице базы данных. Структура параметра job состоит из имени схемы и имени таблицы. Например, если ваша конкретная таблица называется test_table, а схема — public, то необходимо добавить `--job public.test_table` после существующей команды.
<a name="non-stop_update_table"><br/></a>
## 7 Изменение структуры таблицы без остановки
Во время процесса загрузки данных может внезапно выясниться, что под воздействием изменяющихся источников временных рядов ранее заданная структура таблицы больше не подходит для текущего сценария, и возникает необходимость её изменения, которую можно удовлетворить с помощью mxgate. В данном разделе объясняется, как mxgate выполняет серию операций — приостановку записи данных, перезагрузку изменённой метаинформации таблицы базы данных и восстановление записи данных — без остановки системы. Конкретные шаги следующие:
* Во-первых, используйте команду `mxgate pause -X`, чтобы прервать соединения записи для всех таблиц, подготовившись к изменению структуры таблицы базы данных. Параметр `-X` является обязательным — он помогает прервать соединение между mxgate и базой данных. Если соединение не будет прервано, изменить таблицу базы данных невозможно. Кроме того, используйте параметр `-S`, чтобы задача приостановки синхронно дождалась завершения прерывания всех соединений перед возвратом результата.

* Во-вторых, после прерывания всех соединений записи к соответствующей таблице можно изменить структуру таблицы базы данных, например, добавить столбцы, удалить столбцы, удалить существующую таблицу и создать новую таблицу с тем же именем.
> ***Примечание!***
Новая структура таблицы может отличаться, но имя таблицы должно оставаться одинаковым.
* В-третьих, используйте команду `mxgate resume -R`, чтобы восстановить соединения записи для всех таблиц и перезагрузить метаинформацию таблицы данных. Параметр `-R` является обязательным; `resume` и `-R` совместно обеспечивают выполнение операции перезагрузки.

* Особо отметим, что при одновременной работе нескольких процессов mxgate необходимо использовать параметр `-p`, обозначающий номер соответствующего процесса mxgate. Это требование распространяется на все вышеуказанные команды.

> ***Примечание!***
Условие выполнения команды перезагрузки — предварительная приостановка всех соединений записи mxgate к соответствующей таблице. В противном случае произойдёт следующая ошибка:

<a name="log_level"><br/></a>
## 8 Динамическое изменение уровня логирования
Иногда нам нужно включить отладочный лог mxgate, чтобы наблюдать за некоторыми ключевыми данными, однако включение или отключение отладочного лога обычно требует перезапуска mxgate, что неудобно при диагностике проблем. Поэтому YMatrix предоставляет возможность динамически изменять уровень логирования mxgate:
* Во время работы gate используйте команду `mxgate set --log-level VERBOSE`, чтобы включить лог уровня `VERBOSE` с относительно полной информацией, или `mxgate set --log-level DEBUG`, чтобы включить лог уровня `DEBUG` с максимально полной информацией. Когда отладочный лог больше не нужен, используйте `mxgate set --log-level INFO`, чтобы вернуть уровень логирования к значению `INFO`.