Интеграция Kafka

Apache Kafka — это открытая распределённая платформа для потоковой передачи событий. Она может использоваться как система обмена сообщениями для чтения и записи потоков данных, обеспечивая публикацию и подписку на сообщения. Также поддерживает создание масштабируемых приложений для обработки потоков в реальном времени и интеграцию с базами данных для безопасного хранения потоковых данных в распределённом, реплицированном и отказоустойчивом кластере. Создаваемые ею «потоки событий» выполняют функцию центральной нервной системы передачи данных.

Если вы планируете записывать данные из Kafka в кластер YMatrix, данный документ является обязательным для ознакомления. YMatrix Database поддерживает бесшовную интеграцию с Kafka, позволяя непрерывно и автоматически загружать данные из Kafka в таблицы YMatrix с возможностью управления через графический интерфейс.
В настоящее время поддерживаются форматы данных CSV и JSON. Мы рассмотрим простой пример, демонстрирующий, как интегрировать данные Kafka с помощью Платформы управления YMatrix.

1 Предварительные требования

1.1 Настройка среды Kafka

Сначала настройте работоспособную среду Kafka, следуя официальному руководству: Kafka Quickstart.

1.2 Создание топика Kafka

После установки Kafka на сервере перейдите в каталог установки:

$ cd packages/kafka_2.13-3.2.0

Запустите службу Kafka и создайте тестовый топик (Topic), используя порт по умолчанию 9092:

$ bin/kafka-topics.sh --create --topic csv_test --bootstrap-server localhost:9092

1.3 Запись тестовых данных

Интерактивно запишите несколько тестовых записей, завершив ввод комбинацией клавиш Ctrl-C:

$ bin/kafka-console-producer.sh --topic csv_test --bootstrap-server localhost:9092
>1,Beijing,123.05,true,1651043583,1651043583123,1651043583123456
>2,Shanghai,112.95,true,1651043583,1651043583123,1651043583123456
>3,Shenzhen,100.0,false,1651043583,1651043583123,1651043583123456
>4,Guangxi,88.5,false,1651043583,1651043583123,1651043583123456
>^C

Приведённые выше команды записывают четыре строки в формате CSV с разделителями-запятыми в только что созданный топик csv_test.
После завершения подготовки перейдём к созданию потока данных Kafka для загрузки данных.

2 Создание потока данных Kafka

В браузере введите IP-адрес (обычно это IP-адрес master-узла) и номер порта хоста MatrixGate:

http://<IP>:8240

После успешного входа в систему вы попадёте на главный интерфейс, который включает два представления: Представление процессов и Представление задач.

  • Представление процессов: отображает активные процессы MatrixGate.
  • Представление задач: содержит список всех задач импорта данных.
    Каждый процесс MatrixGate может управлять несколькими задачами.

Нажмите Импорт данных из Kafka.

Выберите Kafka в качестве источника данных. В последующих шагах в качестве примера используется формат CSV.

2.1 Интеграция потока данных CSV

2.1.1 Выбор или создание целевой таблицы

Вы можете выбрать существующую таблицу или создать новую для получения данных из Kafka.

2.1.1.1 Использование существующей таблицы

Выберите существующую таблицу и укажите режим импорта данных. Допустим, выбранная таблица — это postgres в схеме public базы данных test.

2.1.1.2 Создание новой таблицы

Создайте новую таблицу. Допустим, схема — public, имя таблицы — test1.

После успешного создания появится красный индикатор New.

Выберите режим импорта данных и стратегию автоматического секционирования. По умолчанию используется auto_partitioning.

2.1.2 Конфигурация исходных данных

После выбора целевой таблицы настройте подключение к исходным данным. Выполните следующие шаги:

  • В левой панели укажите адрес и порт брокера Kafka в указанном формате. При наличии нескольких URL-адресов разделяйте их запятыми.
  • Укажите метод аутентификации, необходимый для подключения к Kafka, включая имя пользователя и пароль, если требуется.
  • В правой панели выберите или найдите топик(и) Kafka, к которым необходимо подключиться.

2.1.3 Сопоставление полей

Настройте образец данных и выполните сопоставление полей из источника с полями целевой таблицы (существующей или новой).

2.1.3.1 Основная конфигурация

Выберите CSV в качестве формата исходных данных и предоставьте образец данных. Необходимо указать или вручную настроить формат образца данных для обеспечения точного сопоставления.

При настройке образца данных:

  • Если топик Kafka содержит полные данные, выберите одну из записей в топике в качестве образца.
  • Если данные в топике неполные (например, отсутствуют столбцы), выберите опцию «редактировать вручную» и предоставьте строку-образец, соответствующую структуре целевой таблицы. Этот образец используется только для анализа и сопоставления структуры полей и не будет импортирован.

Примечание!
Таблицы, созданные через графический интерфейс, по умолчанию используют механизм хранения MARS3. Подробнее см. MARS3.

2.1.3.2 Выполнение сопоставления

После основной настройки выполните сопоставление полей источника с полями целевой таблицы. Процедура немного отличается для существующих и новых таблиц.

  • Сопоставление с существующей таблицей

    Выберите поле источника и сопоставьте его с соответствующим полем назначения. Нажмите Сохранить, чтобы применить сопоставление.

    В качестве альтернативы используйте Автоматическое сопоставление для быстрого и точного сопоставления полей.

  • Сопоставление с новой таблицей

    Помимо сопоставления полей, обратите внимание на следующее: Если поле источника содержит UNIX-метку времени, а тип целевого столбца — timestamptz, рядом со значением вывода появляется флажок 存储为 timestamp. Установите этот флажок, чтобы автоматически преобразовать метку времени в формат timestamp базы данных.

    После сохранения правил сопоставления выберите Идентификатор устройства и Столбец времени:

    • Идентификатор устройства: выберите поле или комбинацию полей, однозначно идентифицирующих каждое устройство.
    • Столбец времени: выберите столбец типа timestamptz.

2.1.4 Отправка

Проверьте все параметры потока данных. Исправьте ошибки, если они есть, затем отправьте конфигурацию.

2.1.4.1 Существующая таблица

На странице отображаются все детали конфигурации. Подтвердите и нажмите Отправить.

2.1.4.2 Новая таблица

Отображается дополнительная информация об уникальном идентификаторе, образованном комбинацией Идентификатор устройства + Столбец времени. Подтвердите и нажмите Отправить.

После отправки появится страница подтверждения успешного создания. Поток данных Kafka успешно создан.

2.2 Интеграция потока данных JSON

В этом разделе описаны различия по сравнению с интеграцией формата CSV, которые в основном касаются шага mapping.

  • Во-первых, выберите JSON в качестве формата данных и настройте образец данных.

  • Во-вторых, индексация столбцов JSON следует семантике $["Listed"]. За синтаксисом обращайтесь к JSONPath.

  • В-третьих, JSON поддерживает многоуровневую вложенность. YMatrix позволяет сопоставлять поля из разных уровней вложенности. Нажмите Sample data в столбце , чтобы раскрыть вложенные структуры.

3 Приостановка и возобновление потоков данных Kafka

После создания потока данных Kafka через графический интерфейс YMatrix данные начинают постоянно загружаться в YMatrix. Если вам нужно временно приостановить загрузку — например, для отладки или обслуживания — вы можете сделать это непосредственно через графический интерфейс.

Для работающих потоков данных Kafka операции приостановки и возобновления доступны как в Представлении процессов, так и в Представлении задач.

Примечание!
Приостановка может занять от 1 до 10 секунд. В течение этого времени никакие другие операции над потоком недопустимы. После приостановки загрузка данных прекращается, а статус изменяется на «Приостановлено».
Приостановленный поток данных Kafka можно возобновить. После подтверждения действия загрузка немедленно возобновляется. Интерфейс обновляет текущий статус загрузки в течение 5 секунд.

Представление процессов

В выпадающем меню в разделе Действия доступны следующие опции:

  • Приостановить все задачи: приостанавливает все задачи в рамках данного процесса; дальнейшая загрузка не происходит.
  • Возобновить все задачи: возобновляет все приостановленные задачи в рамках данного процесса.
  • Завершить процесс: останавливает все задачи в процессе.
  • Очистить: удаляет процесс из списка (доступно только в определённых состояниях, например, после завершения).

Представление задач

В выпадающем меню в разделе Действия доступны:

  • Приостановить: приостанавливает выбранную задачу; загрузка данных прекращается.
  • Возобновить: возобновляет приостановленную задачу; загрузка продолжается.