Kafka Integration

Apache Kafka is an open-source distributed event streaming platform. It can function as a messaging system for reading and writing data streams, enabling message publishing and subscription. It also supports building scalable stream-processing applications for real-time scenarios, and integrates with databases to securely store streaming data in a distributed, replicated, and fault-tolerant cluster. The "event streams" it produces act as the central nervous system of data transmission.

If you plan to write data from Kafka into a YMatrix cluster, this document is essential. YMatrix Database supports seamless Kafka integration, allowing continuous and automatic ingestion of Kafka data into YMatrix tables, with support for GUI-based operations.
Currently supported data formats are CSV and JSON. We will walk through a simple example to demonstrate how to integrate Kafka data using the YMatrix Management Platform.

1 Prerequisites

1.1 Set Up Kafka Environment

First, set up a healthy Kafka environment by following the official guide: Kafka Quickstart.

1.2 Create a Kafka Topic

After setting up Kafka on your server, navigate to the installation directory:

$ cd packages/kafka_2.13-3.2.0

Start the Kafka service and create a test topic (Topic), using the default port 9092:

$ bin/kafka-topics.sh --create --topic csv_test --bootstrap-server localhost:9092

1.3 Write Test Data

Write several test records interactively, ending with Ctrl-C:

$ bin/kafka-console-producer.sh --topic csv_test --bootstrap-server localhost:9092
>1,Beijing,123.05,true,1651043583,1651043583123,1651043583123456
>2,Shanghai,112.95,true,1651043583,1651043583123,1651043583123456
>3,Shenzhen,100.0,false,1651043583,1651043583123,1651043583123456
>4,Guangxi,88.5,false,1651043583,1651043583123,1651043583123456
>^C

The above commands write four comma-separated CSV lines to the newly created csv_test topic.
With preparation complete, we now proceed to create a Kafka data stream for data ingestion.

2 Creating a Kafka Data Stream

In your browser, enter the IP address (typically the master node's IP) and port number of the MatrixGate host:

http://<IP>:8240

After logging in successfully, you'll reach the main interface, which includes two views: Process View and Task View.

  • Process View: Displays active MatrixGate processes.
  • Task View: Lists all data import tasks.
    Each MatrixGate process can manage multiple tasks.

Click Import Data from Kafka.

Select Kafka as the data source. The following steps use CSV format as an example.

2.1 CSV Data Stream Integration

2.1.1 Select or Create Target Table

You can either select an existing table or create a new one to receive Kafka data.

2.1.1.1 Use Existing Table

Choose an existing table and specify the data import mode. Assume the selected table is postgres in schema public of database test.

2.1.1.2 Create New Table

Create a new table. Assume the schema is public and the table name is test1.

Upon successful creation, a red New indicator appears.

Select a data import mode and auto-partitioning strategy. By default, auto_partitioning is used.

2.1.2 Source Data Configuration

After selecting the target table, configure the source data connection. Complete the following steps to proceed:

  • In the left panel, enter the Kafka broker address and port in the format shown. For multiple URLs, separate them with commas.
  • Specify the authentication method required to connect to Kafka, including username and password if applicable.
  • In the right panel, select or search for the Kafka topic(s) to connect.

2.1.3 Mapping

Configure sample data and map fields from the source to the target table (existing or new).

2.1.3.1 Basic Configuration

Select CSV as the source data format and provide sample data. You must specify or manually configure the sample data format to ensure accurate mapping.

When configuring sample data:

  • If the Kafka topic contains complete data, select a record from the topic as the sample.
  • If the topic data is incomplete (e.g., missing columns), choose directly edit data and provide a sample row matching the target table’s structure. This sample is only used for parsing and mapping field structures and will not be imported.

Note!
Tables newly created via the GUI use the MARS3 storage engine by default. For more information, see MARS3.

2.1.3.2 Perform Mapping

After basic configuration, map source fields to target table fields. Procedures differ slightly between existing and new tables.

  • Mapping to Existing Table

    Select a source field and match it to the corresponding target field. Click Save to establish the mapping.

    Alternatively, use Auto Map for quick and accurate field matching.

  • Mapping to New Table

    In addition to field mapping, note the following: When a source field contains a UNIX timestamp and the target column type is timestamptz, a checkbox labeled 存储为 timestamp appears next to the output value. Check this box to automatically convert the timestamp to the database’s timestamp format.

    After saving mapping rules, select the Device ID and Time Column:

    • Device ID: Choose the field or combination of fields that uniquely identifies each device.
    • Time Column: Select the column of type timestamptz.

2.1.4 Submit

Review all data stream configurations. Correct any errors, then submit.

2.1.4.1 Existing Table

The page displays all configuration details. Confirm and click Submit.

2.1.4.2 New Table

Displays additional information about the unique identifier formed by Device ID + Time Column. Confirm and click Submit.

After submission, a success confirmation page appears. The Kafka data stream is now successfully created.

2.2 JSON Data Stream Integration

This section highlights differences from CSV format integration, primarily in the mapping step.

  • First, select JSON as the data format and configure sample data.

  • Second, indexing of JSON columns follows $["Listed"] semantics. Refer to JSONPath for syntax rules.

  • Finally, JSON supports multi-level nesting. YMatrix allows mapping fields from different nesting levels. Click Sample data in the column to expand nested structures.

3 Pause and Resume Kafka Data Streams

After creating a Kafka data stream via the YMatrix GUI, data begins ingesting continuously into YMatrix. If you need to temporarily pause ingestion—for debugging or maintenance—you can do so directly from the GUI.

For running Kafka data streams, pause and resume operations are available from both the Process View and Task View.

Note!
Pausing may take 1–10 seconds to take effect. During this time, no other operations are allowed on the stream. Once paused, data ingestion stops and the status changes to "Paused".
A paused Kafka data stream can be resumed. After confirming the resume action, ingestion restarts immediately. The UI updates with the latest ingestion status within 5 seconds.

Process View

The dropdown menu under Actions provides the following options:

  • Pause All Tasks: Pauses all tasks under this process; no further ingestion occurs.
  • Resume All Tasks: Resumes all paused tasks under this process.
  • Terminate Process: Stops all tasks in the process.
  • Clear: Removes the process from the list (only available for certain states, such as terminated).

Task View

The dropdown menu under Actions provides:

  • Pause: Pauses the selected task; ingestion stops.
  • Resume: Resumes the paused task; ingestion continues.