Быстрый старт
Развертывание
Моделирование данных
Подключение
Запись данных
Миграция
Запросы
Операции и обслуживание
Типовое обслуживание
Секционирование
Резервное копирование и восстановление
Масштабирование
Зеркалирование
Управление ресурсами
Безопасность
Мониторинг
Настройка производительности
Устранение неполадок
Справочник
Руководство по инструментам
Типы данных
Хранилище данных
Выполняющая система
Потоковая передача
Восстановление после сбоев
Конфигурация
Индексы
Расширения
Справочник по SQL
Часто задаваемые вопросы
Apache Kafka — это открытая распределённая платформа для потоковой передачи событий. Она может использоваться как система обмена сообщениями для чтения и записи потоков данных, обеспечивая публикацию и подписку на сообщения. Также поддерживает создание масштабируемых приложений для обработки потоков в реальном времени и интеграцию с базами данных для безопасного хранения потоковых данных в распределённом, реплицированном и отказоустойчивом кластере. Создаваемые ею «потоки событий» выполняют функцию центральной нервной системы передачи данных.
Если вы планируете записывать данные из Kafka в кластер YMatrix, данный документ является обязательным для ознакомления. YMatrix Database поддерживает бесшовную интеграцию с Kafka, позволяя непрерывно и автоматически загружать данные из Kafka в таблицы YMatrix с возможностью управления через графический интерфейс.
В настоящее время поддерживаются форматы данных CSV и JSON. Мы рассмотрим простой пример, демонстрирующий, как интегрировать данные Kafka с помощью Платформы управления YMatrix.
Сначала настройте работоспособную среду Kafka, следуя официальному руководству: Kafka Quickstart.
После установки Kafka на сервере перейдите в каталог установки:
$ cd packages/kafka_2.13-3.2.0
Запустите службу Kafka и создайте тестовый топик (Topic), используя порт по умолчанию 9092:
$ bin/kafka-topics.sh --create --topic csv_test --bootstrap-server localhost:9092
Интерактивно запишите несколько тестовых записей, завершив ввод комбинацией клавиш Ctrl-C:
$ bin/kafka-console-producer.sh --topic csv_test --bootstrap-server localhost:9092
>1,Beijing,123.05,true,1651043583,1651043583123,1651043583123456
>2,Shanghai,112.95,true,1651043583,1651043583123,1651043583123456
>3,Shenzhen,100.0,false,1651043583,1651043583123,1651043583123456
>4,Guangxi,88.5,false,1651043583,1651043583123,1651043583123456
>^C
Приведённые выше команды записывают четыре строки в формате CSV с разделителями-запятыми в только что созданный топик csv_test.
После завершения подготовки перейдём к созданию потока данных Kafka для загрузки данных.
В браузере введите IP-адрес (обычно это IP-адрес master-узла) и номер порта хоста MatrixGate:
http://<IP>:8240
После успешного входа в систему вы попадёте на главный интерфейс, который включает два представления: Представление процессов и Представление задач.
Нажмите Импорт данных из Kafka.

Выберите Kafka в качестве источника данных. В последующих шагах в качестве примера используется формат CSV.

Вы можете выбрать существующую таблицу или создать новую для получения данных из Kafka.
Выберите существующую таблицу и укажите режим импорта данных. Допустим, выбранная таблица — это postgres в схеме public базы данных test.
Создайте новую таблицу. Допустим, схема — public, имя таблицы — test1.

После успешного создания появится красный индикатор New.
Выберите режим импорта данных и стратегию автоматического секционирования. По умолчанию используется auto_partitioning.

После выбора целевой таблицы настройте подключение к исходным данным. Выполните следующие шаги:

Настройте образец данных и выполните сопоставление полей из источника с полями целевой таблицы (существующей или новой).
Выберите CSV в качестве формата исходных данных и предоставьте образец данных. Необходимо указать или вручную настроить формат образца данных для обеспечения точного сопоставления.
При настройке образца данных:
Примечание!
Таблицы, созданные через графический интерфейс, по умолчанию используют механизм хранения MARS3. Подробнее см. MARS3.
После основной настройки выполните сопоставление полей источника с полями целевой таблицы. Процедура немного отличается для существующих и новых таблиц.
Сопоставление с существующей таблицей
Выберите поле источника и сопоставьте его с соответствующим полем назначения. Нажмите Сохранить, чтобы применить сопоставление.
В качестве альтернативы используйте Автоматическое сопоставление для быстрого и точного сопоставления полей.
Сопоставление с новой таблицей
Помимо сопоставления полей, обратите внимание на следующее:
Если поле источника содержит UNIX-метку времени, а тип целевого столбца — timestamptz, рядом со значением вывода появляется флажок 存储为 timestamp. Установите этот флажок, чтобы автоматически преобразовать метку времени в формат timestamp базы данных.
После сохранения правил сопоставления выберите Идентификатор устройства и Столбец времени:
timestamptz.Проверьте все параметры потока данных. Исправьте ошибки, если они есть, затем отправьте конфигурацию.
На странице отображаются все детали конфигурации. Подтвердите и нажмите Отправить.
Отображается дополнительная информация об уникальном идентификаторе, образованном комбинацией Идентификатор устройства + Столбец времени. Подтвердите и нажмите Отправить.
После отправки появится страница подтверждения успешного создания. Поток данных Kafka успешно создан.
В этом разделе описаны различия по сравнению с интеграцией формата CSV, которые в основном касаются шага mapping.
Во-первых, выберите JSON в качестве формата данных и настройте образец данных.
Во-вторых, индексация столбцов JSON следует семантике $["Listed"]. За синтаксисом обращайтесь к JSONPath.
В-третьих, JSON поддерживает многоуровневую вложенность. YMatrix позволяет сопоставлять поля из разных уровней вложенности. Нажмите Sample data в столбце ⊕, чтобы раскрыть вложенные структуры.
После создания потока данных Kafka через графический интерфейс YMatrix данные начинают постоянно загружаться в YMatrix. Если вам нужно временно приостановить загрузку — например, для отладки или обслуживания — вы можете сделать это непосредственно через графический интерфейс.
Для работающих потоков данных Kafka операции приостановки и возобновления доступны как в Представлении процессов, так и в Представлении задач.
Примечание!
Приостановка может занять от 1 до 10 секунд. В течение этого времени никакие другие операции над потоком недопустимы. После приостановки загрузка данных прекращается, а статус изменяется на «Приостановлено».
Приостановленный поток данных Kafka можно возобновить. После подтверждения действия загрузка немедленно возобновляется. Интерфейс обновляет текущий статус загрузки в течение 5 секунд.
В выпадающем меню в разделе Действия доступны следующие опции:
В выпадающем меню в разделе Действия доступны: