Apache Kafka — это открытая распределённая платформа для потоковой передачи событий. Она может использоваться как система обмена сообщениями для чтения и записи потоков данных, обеспечивая публикацию и подписку на сообщения. Также поддерживает создание масштабируемых приложений для обработки потоков в режиме реального времени и интеграцию с базами данных для безопасного хранения потоковых данных в распределённом, реплицированном и отказоустойчивом кластере. Создаваемые ею «потоки событий» выполняют функцию центральной нервной системы передачи данных.
Если вы планируете записывать данные из Kafka в кластер YMatrix, данный документ является обязательным для ознакомления. YMatrix Database поддерживает бесшовную интеграцию с Kafka, позволяя непрерывно и автоматически загружать данные из Kafka в таблицы YMatrix с возможностью управления через графический интерфейс.
В настоящее время поддерживаются форматы данных CSV и JSON. Мы рассмотрим простой пример интеграции данных Kafka с помощью Платформы управления YMatrix.
Сначала настройте работоспособную среду Kafka, следуя официальному руководству: Kafka Quickstart.
После установки Kafka на сервере перейдите в каталог установки:
$ cd packages/kafka_2.13-3.2.0
Запустите службу Kafka и создайте тестовый топик (Topic), используя порт по умолчанию 9092:
$ bin/kafka-topics.sh --create --topic csv_test --bootstrap-server localhost:9092
Интерактивно запишите несколько тестовых записей, завершив ввод комбинацией Ctrl-C:
$ bin/kafka-console-producer.sh --topic csv_test --bootstrap-server localhost:9092
>1,Beijing,123.05,true,1651043583,1651043583123,1651043583123456
>2,Shanghai,112.95,true,1651043583,1651043583123,1651043583123456
>3,Shenzhen,100.0,false,1651043583,1651043583123,1651043583123456
>4,Guangxi,88.5,false,1651043583,1651043583123,1651043583123456
>^C
Приведённые выше команды записывают четыре строки в формате CSV с разделителями-запятыми в только что созданный топик csv_test.
После завершения подготовки перейдём к созданию потока данных Kafka для загрузки данных.
В браузере введите IP-адрес (обычно это IP-адрес master-узла) и номер порта хоста MatrixGate:
http://<IP>:8240
После успешного входа в систему вы попадёте на главный интерфейс, содержащий два представления: Представление процессов и Представление задач.
Нажмите Импорт данных из Kafka.

Выберите Kafka в качестве источника данных. Далее шаги будут показаны на примере формата CSV.

Вы можете выбрать существующую таблицу или создать новую для получения данных из Kafka.
Выберите существующую таблицу и укажите режим импорта данных. Предположим, выбрана таблица postgres в схеме public базы данных test.
Создайте новую таблицу. Предположим, схема — public, имя таблицы — test1.

После успешного создания появится красный индикатор New.
Выберите режим импорта данных и стратегию автоматического секционирования. По умолчанию используется auto_partitioning.

После выбора целевой таблицы настройте подключение к исходным данным. Выполните следующие шаги:

Настройте образец данных и выполните сопоставление полей из источника с полями целевой таблицы (существующей или новой).
Выберите CSV в качестве формата исходных данных и предоставьте образец данных. Необходимо указать или вручную настроить формат образца данных для точного сопоставления.
При настройке образца данных:
Примечание!
Таблицы, созданные через графический интерфейс, по умолчанию используют движок хранения MARS3. Подробнее см. MARS3.
После базовой настройки выполните сопоставление полей источника с полями целевой таблицы. Процедура немного отличается для существующих и новых таблиц.
Сопоставление с существующей таблицей
Выберите поле источника и сопоставьте его с соответствующим полем назначения. Нажмите Сохранить, чтобы применить сопоставление.
Альтернативно, используйте Автоматическое сопоставление для быстрого и точного сопоставления полей.
Сопоставление с новой таблицей
Помимо сопоставления полей, обратите внимание на следующее:
Если поле источника содержит UNIX-метку времени, а тип целевого столбца — timestamptz, рядом со значением вывода появляется флажок 存储为 timestamp. Установите этот флажок, чтобы автоматически преобразовать метку времени в формат timestamp базы данных.
После сохранения правил сопоставления выберите Идентификатор устройства и Столбец времени:
timestamptz.Проверьте все параметры потока данных. Исправьте ошибки, если они есть, затем отправьте конфигурацию.
На странице отображаются все детали конфигурации. Подтвердите и нажмите Отправить.
Отображается дополнительная информация об уникальном идентификаторе, образованном комбинацией Идентификатор устройства + Столбец времени. Подтвердите и нажмите Отправить.
После отправки появится страница подтверждения успешного создания. Поток данных Kafka успешно создан.
В этом разделе описаны отличия от интеграции в формате CSV, которые проявляются в основном на этапе mapping.
Во-первых, выберите JSON в качестве формата данных и настройте образец данных.
Во-вторых, индексация столбцов JSON следует семантике $["Listed"]. За синтаксическими правилами обращайтесь к JSONPath.
В-третьих, JSON поддерживает многоуровневую вложенность. YMatrix позволяет сопоставлять поля из разных уровней вложенности. Нажмите Sample data в столбце ⊕, чтобы раскрыть вложенные структуры.
После создания потока данных Kafka через графический интерфейс YMatrix данные начинают постоянно загружаться в YMatrix. Если необходимо временно приостановить загрузку — например, для отладки или обслуживания — это можно сделать непосредственно через графический интерфейс.
Для работающих потоков данных Kafka операции приостановки и возобновления доступны как в Представлении процессов, так и в Представлении задач.
Примечание!
Приостановка может занять от 1 до 10 секунд. В течение этого времени никакие другие операции над потоком недопустимы. После приостановки загрузка данных прекращается, а статус меняется на «Приостановлен».
Приостановленный поток данных Kafka можно возобновить. После подтверждения действия загрузка немедленно возобновляется. Интерфейс обновляет текущий статус загрузки в течение 5 секунд.
В выпадающем меню в разделе Действия доступны следующие опции:
В выпадающем меню в разделе Действия доступны: