Kafka Access

Apache Kafka is an open source distributed event streaming platform. It can be considered as a message system that reads and writes streaming data to help publish and subscribe to messages; it can also be used to write scalable streaming applications to deal with real-time response scenarios; it can also be connected to the database to safely store streaming data in a distributed, replica backup, and fault-tolerant cluster. It can be said that the "event flow" it produces is the "central nerve" of information data transmission.

If you are planning to use Kafka to write data to your YMatrix cluster, please do not miss this document. The YMatrix database supports Kafka seamless connection function, which can continuously and automatically import Kafka data into YMatrix tables and support graphic interface operations. The data formats currently connected can be CSV and JSON. We will use the simplest example to introduce how to use the YMatrix management platform to access Kafka data.

1 Preparation

1.1 Build a Kafka environment

First of all, you need to build a healthy Kafka environment, refer to the official guide Kafka Quickstart.

1.2 Create Kafka Theme

Now that you have set up the Kafka environment on the server, you can enter it through the following command or other methods.

$ cd packages/kafka_2.13-3.2.0

Then start the Kafka service and create a test topic (Topic), with the default port number 9092.

$ bin/kafka-topics.sh --create --topic csv_test --bootstrap-server localhost:9092

1.3 Write test data

Then write several test data in stripes, ending with ctrl-c:

$ bin/kafka-console-producer.sh --topic csv_test --bootstrap-server localhost:9092
>1,Beijing,123.05,true,1651043583,1651043583123,1651043583123456
>2,Shanghai,112.95,true,1651043583,1651043583123,1651043583123456
>3,Shenzhen,100.0,false,1651043583,1651043583123,1651043583123456
>4,Guangxi,88.5,false,1651043583,1651043583123,1651043583123456
>^C

Through the above command, 4 pieces of data are written to the newly created csv_test, which are 4 comma-segmented CSV lines. After the preparation work is completed, the following will introduce how to create a Kafka data stream to import data.

2 Create Kafka Data Stream

Enter the IP of the machine where MatrixGate is located (the default is the IP of the master node) and port number:

http://<IP>:8240

After successful login, you will enter the main interface, which is divided into a process view and a task view. The process view refers to the running MatrixGate processes, while the task view refers to all data import tasks. One MatrixGate process can contain multiple tasks.

Click on "Import Data from Kafka".

Select the Kafka data source. Let's first illustrate the steps using a CSV file as an example.

2.1 CSV Data Stream Integration

2.1.1 Select or Create Target Table

First, you can either select an existing table or create a new one to serve as the target table for receiving Kafka data.

2.1.1.1 Select Existing Table

In the target table field, select an existing table along with the data import mode. Here, we assume the selected table is test under the public schema in the postgres database.

In the target table field, select an existing table along with the data import mode. Here, we assume the selected table is test under the public schema in the postgres database.

2.1.1.2 Create New Table

Create a new table. Here, we assume the schema is public and the table name is test1.

After successful table creation, a red "New" label will appear.

Choose the data import mode and automatic partitioning policy for it. Here, the partitioning policy defaults to auto_partitioning.

2.1.2 Source Data

After selecting the target table, the next step is to set up the connection for the source data. To proceed correctly to the next step, you need to complete the following operations:

  • First, in the address box on the left - hand menu, enter the Kafka broker address and port number in the format shown in the figure. If there are multiple URLs, separate them with commas.
  • Then, specify the authentication method required to connect to Kafka and verify the username and password for Kafka.
  • Finally, in the right - hand menu, check or search for the Kafka topic you want to connect to.

2.1.3 Mapping

The third step is to configure sample data and map the existing or newly created table to import Kafka data.

2.1.3.1 Basic Configuration

Select the CSV source data format and specify or configure sample data. You need to specify or manually configure the sample data format to ensure accurate mapping.

When configuring sample data, you can either select from the topic or edit data directly.

  • If the Kafka topic contains complete data, simply select it from the topic as sample data.
  • When the data in the topic is incomplete (e.g., there are columns with null values), you can choose to edit data directly and provide a sample data entry according to the target table's field structure. This sample data is only used for field structure parsing and mapping and will not be imported into the target table.

Note!
By default, tables newly created via the graphical interface use the MARS3 storage engine. For more information, see MARS3.

2.1.3.2 Perform Mapping

After completing the basic mapping configuration, map the source data fields to the target table fields. There are differences in this step between existing tables and newly created tables.

  • Mapping for Existing Tables

    First, select a sample data entry. Then, in the mapping relationship, find the corresponding target field and set it up. After that, click Save to establish a mapping relationship.

    You can also use the one - click automatic mapping feature, which is both convenient and accurate.

  • Mapping for New Tables

    In addition to establishing mapping rules, there are a few more settings to consider when creating a new table: When the source field value is a UNIX timestamp and the target field type is selected as timestamptz, a checkbox labeled Store as timestamp will appear to the right of the output value. Check this box to automatically convert the output value to the database's timestamp format.

    After saving the mapping rules, you also need to select the "Device Identifier" and "Time Column." The "Device Identifier" should be a field or combination of fields with a unique constraint in the target table, while the "Time Column" should be a field of type timestamptz in the target table.

2.1.4 Submission

Finally, review all the configuration details of the data stream. If there are any errors, return to make changes. If everything is correct, proceed with the submission.

2.1.4.1 Existing Tables

The page displays all the configuration details of the data stream. After confirming the information, click the "Submit" button.

2.1.4.2 New Tables

Compared to existing tables, this section also displays the unique identifier information composed of "Device Identifier + Time Column." After confirming the information, click the "Submit" button.

Whether for existing or new tables, after successful submission, you will be directed to a success prompt page. At this point, the Kafka data stream has been successfully created.

2.2 JSON Data Stream Integration

This section mainly describes the differences compared to the CSV format integration, which are concentrated in the Mapping step.

  • First, select the JSON data format and configure sample data.

  • Second, the index for JSON columns uses the $["column_name"] syntax, following the JSONPath rules.

  • Finally, JSON data supports multi - level nesting. YMatrix allows you to create mapping rules based on source data fields from different levels. Click the in the Sample Data section to expand the nested levels.

3 Pausing and Resuming Kafka Data Streams

After you create a Kafka data stream using the YMatrix graphical interface, data will be continuously imported into YMatrix. If you need to temporarily pause the import of Kafka data due to service debugging or other reasons, you can do so directly from the graphical interface.

For running Kafka data streams, you can pause data import from both the Process View and the Task View.

Note!
The pause operation may take 1 - 10 seconds to take effect. During this period, no other operations can be performed on the data stream. After the pause operation is completed, Kafka data will stop being imported, and the status will switch to "Paused."
For Kafka data streams in the "Paused" state, you can resume data import. After confirming the resume operation, the data stream will immediately start importing again, and the page will refresh to display the latest import information within a maximum of 5 seconds.

Process View

The extended button under the operation menu allows you to pause all tasks, resume all tasks, terminate the process, and clear operations.

  • Pause all tasks: Pause all tasks under this process, stopping further imports.
  • Resume all tasks: Resume all tasks under this process, allowing imports to continue.
  • Terminate process: Exit all tasks in this process.
  • Clear: Remove from the list and stop displaying (only specific states of processes can be cleared, such as terminated processes)

Task View

The expand button under the operation menu can be used to pause and resume operations.

  • Pause: Pause this task and stop receiving imports.
  • Resume: Resume the task from the pause and continue the import.