MatrixGate, инструмент высокопроизводительной загрузки данных

MatrixGate сокращённо называется mxgate — это высокопроизводительный инструмент загрузки данных, поставляемый вместе с MatrixDB.

Использование mxgate для загрузки данных обеспечивает значительно более высокую производительность по сравнению с нативными операторами INSERT. Поскольку mxgate может напрямую взаимодействовать с сегментами, отсутствует узкое место на мастер-узле.

1. Сравнение INSERT и MatrixGate

Метод записи Преимущества Недостатки Применимые сценарии
Прямой INSERT Простой интерфейс Низкая пропускная способность Низкая пропускная способность, сотни тысяч точек данных/секунду
MatrixGate Высокая пропускная способность
Стандартная поддержка в реальном времени
Требуется дополнительное развертывание, затраты на эксплуатацию и обслуживание Высокая пропускная способность, десятки миллионов точек данных/секунду

2. Как использовать MatrixGate

MatrixGate предоставляет следующие режимы работы:

  • Режим сервиса
  • Командный режим
  • Режим миграции

Ниже показано, как использовать эти два режима для загрузки данных в таблицу. Схема таблицы dest выглядит следующим образом:

CREATE TABLE dest(
    time timestamp,
    c1 int,
    c2 text
)DISTRIBUTED BY(c1);

2.1 Режим сервиса

Режим сервиса предполагает наличие постоянно работающих фоновых процессов, предоставляющих HTTP-интерфейс для пользователей, через который можно отправлять временные данные. Это наиболее распространённый способ использования в рабочих средах.

2.1.1 Создание конфигурационных файлов

Для использования режима сервиса сначала необходимо создать конфигурационные файлы и определить параметры подключения к базе данных, целевую таблицу и другие параметры:

mxgate config --db-database test \
            --db-master-host localhost \
            --db-master-port 5432 \
            --db-user mxadmin \
            --target public.dest \
            --time-format raw \
            --delimiter ',' \
            > mxgate.conf

Как указано в команде выше:

Параметр Описание Значение
--db-database База данных test
--db-master-host Хост базы данных localhost
--db-master-port Порт базы данных 5432
--db-user Имя пользователя базы данных mxadmin
--target Целевая таблица public.dest
--time-format Формат времени raw (обычный текст)
--delimiter Разделитель ,

2.1.2 Запуск MatrixGate

Затем запустите MatrixGate и укажите в параметрах запуска только что созданный конфигурационный файл:

mxgate start --config mxgate.conf
**********************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.2.0
  Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
**********************************************************
Launching MatrixGate daemon...
MatrixGate daemon started successfully

2.1.3 Submit data

After the startup is successful, use the curl tool to send HTTP request to submit data.

In production environment, use the HTTP library supported by the programming language to submit data

The test data file rows_header.csv has been prepared, and the content is as follows:

[mxadmin@sdw2 ~]$ cat rows_header.csv
public.dest
2021-01-01 00:00:00,1,a1
2021-01-01 00:00:00,2,a2
2021-01-01 00:00:00,3,a3

When submitting data, the first row must specify the target table name, because the MatrixGate service may have multiple target tables.

Submit data:

curl http://localhost:8086/ -X POST -H 'Content-Type: text/plain' --data-binary "@rows_header.csv"

MatrixGate binds to port 8086 by default, which can be modified through configuration files.

Query the injected data:

test=# select * from dest;
        time         | c1 | c2
--------------------+-------------------------------------------------------------------------------------------------------------
 2021-01-01 00:00:00 | 11 | a11
 2021-01-01 00:00:00 | 12 | a12
 2021-01-01 00:00:00 | 13 | a13
(3 rows)

For more detailed API parameters, please refer to Document

2.1.4 Operation and maintenance management

MatrixGate also provides other operation and maintenance commands for operation and maintenance management.

View Status

mxgate status
**********************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.2.0
  Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
**********************************************************
PID          15146 alive
Launched At  2021-09-01 14:59:03
Up For       26 seconds
Binary       /usr/local/matrixdb-4.2.0.community/bin/mxgated
Log          /home/mxadmin/gpAdminLogs/matrixgate.2021-09-01_145904.log
Config       /home/mxadmin/mxgate.conf

You can see the service program running status, configuration files and log paths, which are used to track down problems.

Stop service

mxgate stop
**********************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.2.0
  Your Copy is Licensed to: yMatrix.cn; 2022-03-01; any
**********************************************************
PID 15146 stopped

Команда mxgate stop останавливает сервис.

Мониторинг сервиса

Вы можете использовать подкоманду mxgate watch для наблюдения за работой сервиса в реальном времени:

mxgate watch
**********************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.5.0
  Your Copy is Licensed to: yMatrix.cn; 2022-05-14; any
**********************************************************
watch cmd will run forever until killed, you can use watch -T n to change the duration to n seconds;and you can use mxgate watch --info to get info of columns;
                 Time          WCount          ICount        WSpeed/s        ISpeed/s  WBandWidth MB/S     BlocakItems
  2022-04-28 15:20:58        14478858        14527011         2598081         2627887            2395               0
  2022-04-28 15:21:01        22231035        22633254         2584059         2702081            2222               0
  2022-04-28 15:21:04        30494310        30500874         2754425         2622540            3551               0
  2022-04-28 15:21:07        38004210        38032956         2503300         2510694            2862               0
  2022-04-28 15:21:10        46188696        46298223         2728162         2755089            2227               0
  ...

Или используйте mxgate watch --history для просмотра исторических данных:

mxgate watch --history
**********************************************************
 __  __       _        _       ____       _
|  \/  | __ _| |_ _ __(_)_  __/ ___| __ _| |_ ___
| |\/| |/ _` | __| '__| \ \/ / |  _ / _` | __/ _ \
| |  | | (_| | |_| |  | |>  <| |_| | (_| | ||  __/
|_|  |_|\__,_|\__|_|  |_/_/\_\\____|\__,_|\__\___|
  Version: 4.5.0
  Your Copy is Licensed to: yMatrix.cn; 2022-05-14; any
**********************************************************
                TIME RANGE                | SPEED/S  | BANDWIDTH MB/S  | BLOCK ITEMS
  2022-04-28 16:00:00-2022-04-28 17:00:00 |  2208010 |         1254.48 |           0
  2022-04-28 17:00:00-2022-04-28 18:00:00 |  1157920 |         1327.00 |           0
  2022-04-28 18:00:00-2022-04-28 19:00:00 |  2228666 |         2162.32 |           0
  2022-04-28 19:00:00-2022-04-28 20:00:00 |  1371092 |         2881.30 |           0
  2022-04-28 20:00:00-2022-04-28 21:00:00 |  1575320 |         2608.20 |           0


2.2 Command Line Mode

Command line mode is used to pour data files into one go, and the process exits after the end.

It's still the data file just now. Remove the first row of the target table, only keep the data rows, and execute the following command:

cat rows.csv | mxgate --source stdin --db-database test --db-master-host localhost --db-master-port 5432 --db-user mxadmin --time-format raw --target public.dest --parallel 2  --delimiter ',' 

For more methods of file access, please refer to Document


2.3 Migration Mode

The migration mode is used for high-speed data migration, and supports migration of data tables from other Greenplum5, Greenplum6, and MatrixDB clusters to the current MatrixDB cluster.

The usage is as follows:

mxgate --source transfer \
         --src-host 172.31.41.7 \
         --src-port 5432 \
         --src-db postgres \
         --src-user ec2-user \
         --src-password abc \
         --src-schema public \
         --src-table trans_ao \
         --compress "gzip" \
         --port-base 9129 \
         --local-ip 172.31.33.128 \
         --db-database ttt \
         --target public.trans_ao \
         --format text \
         --time-format raw \
         --use-auto-increment=false

in:

Parameter name Description
--source Function portal, 'transfer' must be specified
--src-host IP address of source library master
--src-port Port number of source library master
--src-user Username for connecting to the source library (superuser is recommended)
--src-password Connection Password
--src-schema schema name of the source table
--src-table Table name of the source table
--compress Transfer method from the source database segment host to this data:
The blank string "" means non-compression, plain text transmission
gzip: Using gzip compression, the linux command gzip that requires the source database must be installed on the segment host
lz4: Using lz4 compression, the linux command lz4 that requires the source database must be installed on the segment host
recommended lz4 > gzip > non-compression
--port-base A batch of ports will be occupied during transmission, and the port range is 9129~
--local-ip The IP address that must be connected to the local machine using the source library
--db-database The database name where the migration target table is located
--target The migration target table name can be in the form of \<schema>.\<table>. If the schema name is not written, the default is public
--format text or csv, CSV is required only if there are complex strings (including newlines, quotes, and separators) in the migrated data. When both text/csv are available in other cases, text mode is preferred
--time-format must be raw in transfer mode
--use-auto-increment When the target table includes a self-increment field of serial type, the fields of this type will be skipped by default in mxgate. This option is added to close the logic of skipping mxgate

Another usage of migration mode is to quickly export data to a file:

mxgate --source transfer \
         --src-host 172.31.41.7 \
         --src-port 5432 \
         --src-db postgres \
         --src-user ec2-user \
         --src-schema public \
         --src-table trans_ao_1 \
         --compress "lz4" \
         --port-base 9129 \
         --local-ip 172.31.33.128 \
         --save-to-dir /tmp/receive/ \
         --db-database ttt \
         --transform nil \
         --writer nil \
         --target trans_ao

Используйте параметр --save-to-dir для указания пути хранения файла.

Обратите внимание, что даже при экспорте в файл необходимо указывать параметры --db-database и --target для определения целевой базы данных и таблицы, которые должны существовать.

Фильтрация миграции может быть задана с помощью SQL для выбора данных, которые нужно синхронизировать, с использованием параметра --src-sql. Этот параметр применим при миграции из таблицы в таблицу и из таблицы в файл:

mxgate --source transfer \
         --src-host 172.31.41.7 \
         --src-port 5432 \
         --src-db postgres \
         --src-user ec2-user \
         --src-sql "select * from demo where c1 = 'xxxx'" \
         --compress "lz4" \
         --port-base 9129 \
         --local-ip 172.31.33.128 \
         --save-to-dir /tmp/receive/ \
         --db-database ttt \
         --transform nil \
         --writer nil \
         --target trans_ao

Более подробную информацию о режиме миграции см. в документации


3. Поддержка семантики UPSERT в MatrixGate

При работе с временными данными могут возникнуть следующие ситуации:

  • Данные с устройства передаются не сразу, а пакетами. Необходимо объединять их по идентификатору устройства и метке времени, используемым в качестве первичного ключа.
  • Данные с устройства могут дублироваться, и вместо повторной вставки следует обновлять уже существующие записи.

В версии MatrixGate 4.2 была добавлена семантика UPSERT для решения этих задач.

3.1 Загрузка данных с использованием семантики UPSERT

3.1.1 Создание таблицы данных

CREATE TABLE upsert_demo (
    ts    timestamp
  , tagid int
  , c1    int
  , c2    int
  , UNIQUE(ts, tagid)
) DISTRIBUTED BY (tagid);

Обратите внимание: чтобы база данных могла использовать функцию UPSERT, необходимо создать UNIQUE-ограничения на комбинацию идентификатора устройства и метки времени.

3.1.2 Подготовка файлов данных

upsert_demo1.dat:

2020-11-11|1|10|

upsert_demo2.dat:

2020-11-11|1||20
2020-11-11|2||100
2020-11-11|2|200|

3.1.3 Загрузка файлов данных

Загрузка upsert_demo1.dat:

cat upsert_demo1.dat|mxgate --source stdin \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --upsert-key ts \
  --upsert-key tagid

Результат запроса:

test=# select * from upsert_demo ;
         ts          | tagid | c1 | c2
--------------------+------+------------------------------------------------------------------------------------------------------
 2020-11-11 00:00:00 |     1 | 10 |
(1 row)

Загрузка upsert_demo2.dat:

cat upsert_demo2.dat|mxgate --source stdin \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --upsert-key ts \
  --upsert-key tagid

Результат запроса:

test=# select * from upsert_demo ;
         ts          | tagid | c1  | c2
--------------------+-------------------------------------------------------------------------------------------------------------
 2020-11-11 00:00:00 |     1 |  10 |  20
 2020-11-11 00:00:00 |     2 | 200 | 100
(2 rows)

Из результатов видно, что строки с одинаковыми значениями ts и tagid были объединены.

3.2 Генерация конфигурационного файла для режима сервиса с использованием UPSERT

mxgate config \
  --db-database test \
  --db-master-host localhost \
  --db-master-port 5432 \
  --db-user mxadmin \
  --time-format raw \
  --delimiter "|" \
  --target upsert_demo \
  --stream-prepared 0 \
  --upsert-key ts \
  --upsert-key tagid > mxgate.conf

Примечание: при использовании функции UPSERT в режиме сервиса необходимо установить параметр stream-prepared в 0, иначе может возникнуть взаимоблокировка.

3.3 Рекомендации по использованию UPSERT

  • Условия UNIQUE и параметр --upsert-key должны полностью совпадать; порядок указания параметров не важен.
  • Параметр --upsert-key должен соответствовать созданному UNIQUE-ограничению.

3.4 Удаление дубликатов с помощью UPSERT

Ещё одна функция UPSERT — удаление дубликатов. В отличие от обычного UPSERT, который всегда заменяет старое значение новым, режим удаления дубликатов заполняет новым значением только пустые поля, а новые значения для уже заполненных полей игнорируются.

Удаление дубликатов активируется с помощью параметра --deduplicate-key. Этот параметр взаимоисключает параметр --upsert-key, можно выбрать только один из двух.


4. Механизм отказоустойчивости MatrixGate

Поскольку MatrixGate внутренне использует механизм внешних таблиц, данные вставляются пакетами в целевую таблицу. Таким образом, любая строка данных вносится в базу данных вместе с другими данными пакета. Если формат хотя бы одной строки неверен, весь пакет будет отклонён.

Начиная с версии 4.3, MatrixGate добавил механизм отказоустойчивости. Одиночная ошибка формата не повлияет на вставку остальных данных, а информация об ошибочной строке будет возвращена и записана в лог.

Примечание: отказоустойчивость работает только для ошибок формата. При нарушении правил ограничений (например, уникального индекса) пакетная запись всё равно завершится сбоем.

4.1 Сообщения об ошибках

В отличие от прежнего поведения, когда при ошибке в данных возвращался HTTP-код 500, теперь при наличии отказоустойчивости возвращается код 200. В теле ответа содержится информация об ошибках, например:

At line: 2
missing data for column "c3"

4.2 Контроль порога ошибок

Конечно, количество допускаемых ошибок не бесконечно. Порог отказоустойчивости зависит от GUC-параметра: gp_initial_bad_row_limit. Если количество строк с ошибками превышает 5 * gp_initial_bad_row_limit, пакетная запись завершится неудачей.

Более подробную информацию см. в MatrixGate.