MatrixGate, инструмент высокопроизводительной загрузки данных

MatrixGate сокращённо называется mxgate — это высокопроизводительный инструмент загрузки данных, поставляемый вместе с MatrixDB.

Использование mxgate для загрузки данных обеспечивает значительно более высокую производительность по сравнению с нативными операторами INSERT. Поскольку mxgate может напрямую взаимодействовать с сегментами, отсутствует узкое место на мастер-узле.

1. Сравнение INSERT и MatrixGate

Метод записи Преимущества Недостатки Применимые сценарии
Прямой INSERT Простой интерфейс Низкая пропускная способность Низкая пропускная способность, сотни тысяч точек данных/секунду
MatrixGate Высокая пропускная способность
Стандартная поддержка реального времени
Требуется дополнительное развертывание, есть эксплуатационные расходы Высокая пропускная способность, десятки миллионов точек данных/секунду

2. Как использовать MatrixGate

MatrixGate предоставляет следующие режимы работы:

  • Режим сервиса
  • Командный режим
  • Режим миграции

Ниже показано, как использовать эти два режима для загрузки данных в таблицу. Схема целевой таблицы dest следующая:

CREATE TABLE dest(
    time timestamp,
    c1 int,
    c2 text
)DISTRIBUTED BY(c1);

2.1 Режим сервиса

Режим сервиса предполагает постоянную работу фонового процесса, предоставляющего HTTP-интерфейс пользователям для отправки временных рядов данных. Это наиболее распространённый способ использования в производственных средах.

2.1.1 Генерация конфигурационных файлов

Для использования режима сервиса сначала необходимо сгенерировать конфигурационный файл и определить параметры подключения к базе данных, целевую таблицу и другие параметры:

mxgate config --db-database test \
            --db-master-host localhost \
            --db-master-port 5432 \
            --db-user mxadmin \
            --target public.dest \
            --time-format raw \
            --delimiter ',' \
            > mxgate.conf

Как указано в команде выше:

Параметр Описание Значение
--db-database База данных test
--db-master-host Хост базы данных localhost
--db-master-port Порт базы данных 5432
--db-user Имя пользователя базы данных mxadmin
--target Целевая таблица public.dest
--time-format Формат времени raw (обычный текст)
--delimiter Разделитель ,

2.1.2 Запуск MatrixGate

Затем запустите MatrixGate, указав в параметрах запуска только что созданный конфигурационный файл:

mxgate start --config mxgate.conf
**********************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.2.0
  Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
**********************************************************
Launching MatrixGate daemon...
MatrixGate daemon started successfully

2.1.3 Submit data

After the startup is successful, use the curl tool to send HTTP request to submit data.

In production environment, use the HTTP library supported by the programming language to submit data

The test data file rows_header.csv has been prepared, and the content is as follows:

[mxadmin@sdw2 ~]$ cat rows_header.csv
public.dest
2021-01-01 00:00:00,1,a1
2021-01-01 00:00:00,2,a2
2021-01-01 00:00:00,3,a3

When submitting data, the first row must specify the target table name, because the MatrixGate service may have multiple target tables.

Submit data:

curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@rows_header.csv"

MatrixGate binds to port 8086 by default, which can be modified through configuration files.

Query the injected data:

test=# select * from dest;
        time         | c1 | c2
--------------------+-------------------------------------------------------------------------------------------------------------
 2021-01-01 00:00:00 | 11 | a11
 2021-01-01 00:00:00 | 12 | a12
 2021-01-01 00:00:00 | 13 | a13
(3 rows)

For more detailed API parameters, please refer to Document

2.1.4 Operation and maintenance management

MatrixGate also provides other operation and maintenance commands for operation and maintenance management.

View Status

mxgate status
**********************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.2.0
  Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
**********************************************************
PID          15146 alive
Launched At  2021-09-01 14:59:03
Up For       26 seconds
Binary       /usr/local/matrixdb-4.2.0.community/bin/mxgated
Log          /home/mxadmin/gpAdminLogs/matrixgate.2021-09-01_145904.log
Config       /home/mxadmin/mxgate.conf

You can see the service program running status, configuration files and log paths, which are used to track down problems.

Stop service

mxgate stop
**********************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.2.0
  Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
**********************************************************
PID 15146 stopped

mxgate stop can stop the service.


2.2 Command Line Mode

Command line mode is used to pour data files into one go, and the process exits after the end.

It's still the data file just now. Remove the first row of the target table, only keep the data rows, and execute the following command:

cat rows.csv \| mxgate --source stdin --db-database test --db-master-host localhost --db-master-port 5432 --db-user mxadmin --time-format raw --target public.dest --parallel 2 --delimiter ','

For more methods of file access, please refer to Document


2.3 Migration Mode

The migration mode is used for high-speed data migration, and supports migration of data tables from other Greenplum5, Greenplum6, and MatrixDB clusters to the current MatrixDB cluster.

The usage is as follows:

mxgate --source transfer \
         --src-host 172.31.41.7 \
         --src-port 5432 \
         --src-db postgres \
         --src-user ec2-user \
         --src-password abc \
         --src-schema public \
         --src-table trans_ao \
         --compress "gzip" \
         --port-base 9129 \
         --local-ip 172.31.33.128 \
         --db-database ttt \
         --target public.trans_ao \
         --format text \
         --time-format raw \
         --use-auto-increment=false

in:

Parameter name Description
--source Function portal, 'transfer' must be specified
--src-host IP address of source library master
--src-port Port number of source library master
--src-user Username for connecting to the source library (superuser is recommended)
--src-password Connection Password
--src-schema schema name of the source table
--src-table Table name of the source table
--compress Transfer method from the source database segment host to this data:
The blank string "" means non-compression, plain text transmission
gzip: Using gzip compression, the linux command gzip that requires the source database must be installed on the segment host
lz4: Using lz4 compression, the linux command lz4 that requires the source database must be installed on the segment host
recommended lz4 > gzip > non-compression
--port-base A batch of ports will be occupied during transmission, and the port range is 9129~
--local-ip The IP address that must be connected to the local machine using the source library
--db-database The database name where the migration target table is located
--target The migration target table name can be in the form of \<schema>.\<table>. If the schema name is not written, the default is public
--format text or csv, CSV is required only if there are complex strings (including newlines, quotes, and separators) in the migrated data. When both text/csv are available in other cases, text mode is preferred
--time-format must be raw in transfer mode
--use-auto-increment When the target table includes a self-increment field of serial type, the fields of this type will be skipped by default in mxgate. This option is added to close the logic of skipping mxgate

Another usage of migration mode is to quickly export data to a file:

mxgate --source transfer \
         --src-host 172.31.41.7 \
         --src-port 5432 \
         --src-db postgres \
         --src-user ec2-user \
         --src-schema public \
         --src-table trans_ao_1 \
         --compress "lz4" \
         --port-base 9129 \
         --local-ip 172.31.33.128 \
         --save-to-dir /tmp/receive/ \
         --db-database ttt \
         --transform nil \
         --writer nil \
         --target trans_ao

Используйте параметр --save-to-dir, чтобы указать путь хранения файла.

Обратите внимание, что даже при экспорте в файл необходимо указывать параметры --db-database и --target для определения целевой базы данных и таблицы, и целевая база данных и таблица должны существовать.

Фильтрация миграции может быть задана с помощью SQL для фильтрации данных, которые необходимо синхронизировать, с использованием параметра --src-sql. Этот параметр может применяться при миграции из таблицы в таблицу и из таблицы в файл:

mxgate --source transfer \
         --src-host 172.31.41.7 \
         --src-port 5432 \
         --src-db postgres \
         --src-user ec2-user \
         --src-sql "select * from demo where c1 = 'xxxx'" \
         --compress "lz4" \
         --port-base 9129 \
         --local-ip 172.31.33.128 \
         --save-to-dir /tmp/receive/ \
         --db-database ttt \
         --transform nil \
         --writer nil \
         --target trans_ao

Более подробную информацию об использовании режима миграции см. в документации


3. Поддержка семантики UPSERT в MatrixGate

При работе с временными данными могут возникнуть следующие сценарии:

  • Данные устройства передаются не сразу, а пакетами. Необходимо объединять данные по номеру устройства и метке времени как по первичному ключу.
  • Данные устройства могут передаваться повторно, и дублирующиеся данные следует обновлять, а не вставлять повторно.

В версии MatrixGate 4.2 добавлена поддержка семантики UPSERT для решения указанных задач.

3.1 Загрузка данных с использованием семантики UPSERT

3.1.1 Создание таблицы данных

CREATE TABLE upsert_demo (
    ts    timestamp
  , tagid int
  , c1    int
  , c2    int
  , UNIQUE(ts, tagid)
) DISTRIBUTED BY (tagid);

Обратите внимание: для использования функции UPSERT в базе данных необходимо создать UNIQUE-ограничения на комбинацию device id + timestamp в таблице.

3.1.2 Подготовка файлов данных

upsert_demo1.dat:

2020-11-11|1|10|

upsert_demo2.dat:

2020-11-11|1||20
2020-11-11|2||100
2020-11-11|2|200|

3.1.3 Загрузка файлов данных

Загрузка upsert_demo1.dat:

cat upsert_demo1.dat|mxgate --source stdin \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --upsert-key ts \
  --upsert-key tagid

Результат запроса:

test=# select * from upsert_demo ;
         ts          | tagid | c1 | c2
--------------------+------+------------------------------------------------------------------------------------------------------
 2020-11-11 00:00:00 |     1 | 10 |
(1 row)

Загрузка upsert_demo2.dat:

cat upsert_demo2.dat|mxgate --source stdin \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --upsert-key ts \
  --upsert-key tagid

Результат запроса:

test=# select * from upsert_demo ;
         ts          | tagid | c1  | c2
--------------------+-------------------------------------------------------------------------------------------------------------
 2020-11-11 00:00:00 |     1 |  10 |  20
 2020-11-11 00:00:00 |     2 | 200 | 100
(2 rows)

Из результатов видно, что строки с одинаковыми значениями ts и tagid были объединены.

3.2 Генерация конфигурационного файла для режима сервиса с использованием UPSERT

mxgate config \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --stream-prepared 0 \
  --upsert-key ts \
  --upsert-key tagid > mxgate.conf

Примечание: при использовании функции UPSERT в режиме сервиса необходимо установить параметр stream-prepared в значение 0, иначе может возникнуть взаимоблокировка.

3.3 Особенности использования UPSERT

  • Ограничения UNIQUE и параметр --upsert-key должны полностью совпадать; порядок передачи параметров не важен.
  • Параметр --upsert-key должен соответствовать созданному ограничению UNIQUE.

3.4 Удаление дубликатов через UPSERT

Ещё одна функция UPSERT — удаление дубликатов. В отличие от стандартного UPSERT, который принудительно перезаписывает старое значение новым, при удалении дубликатов новое значение будет записано только если старое значение пустое; в противном случае новое значение отбрасывается.

Удаление дубликатов активируется с помощью параметра --deduplicate-key. Этот параметр взаимоисключителен с параметром --upsert-key, можно выбрать только один из двух.


4. Механизм отказоустойчивости MatrixGate

Поскольку MatrixGate внутренне использует механизм внешних таблиц, данные вставляются пакетами в целевую таблицу. Таким образом, любая строка данных вставляется в базу данных вместе с другими данными пакета. Если формат хотя бы одной строки неверен, весь пакет данных не будет сохранён.

Начиная с версии 4.3, MatrixGate добавил механизм отказоустойчивости. Одиночная ошибка формата данных не повлияет на вставку остальных данных, а информация об ошибочных строках будет возвращена и записана в логи ошибок.

Примечание: отказоустойчивость работает только для ошибок формата. При нарушении правил ограничений (например, уникального индекса) вставка всего пакета всё равно завершится неудачей.

4.1 Сообщения об ошибках

В отличие от прежнего поведения, когда при ошибках в данных возвращался HTTP-код 500, после включения отказоустойчивости возвращается код 200. В теле ответа содержится информация об ошибках, например:

At line: 2
missing data for column "c3"

4.2 Контроль пороговых значений

Конечно, количество допустимых ошибок не бесконечно. Порог отказоустойчивости связан с GUC-параметром: gp_initial_bad_row_limit. Если количество строк с ошибками превышает 5 × gp_initial_bad_row_limit, пакетная запись завершится неудачей.

Более подробную информацию об использовании см. в разделе MatrixGate.