Kafka Access

Apache Kafka is an open source distributed event streaming platform. It can be regarded as a message system that reads and writes streaming data to help publish and subscribe to messages; it can also be used to write scalable stream processing applications to deal with real-time response scenarios; it can also be connected to the database to safely store streaming data in a distributed, replica backup, and fault-tolerant cluster. It can be said that the "event flow" it produces is the "central nerve" of information data transmission.

If you are planning to use Kafka to write data to your YMatrix cluster, please do not miss this document. The YMatrix database supports Kafka seamless connection function, which can continuously and automatically import Kafka data into YMatrix tables and supports graphical interface operations. The data formats currently connected can be CSV and JSON. We will use the simplest example to introduce how to use the YMatrix management platform to access Kafka data.

1 Preparation

1.1 Build a Kafka environment

First of all, you need to build a healthy Kafka environment, refer to the official guide Kafka Quickstart.

1.2 Create Kafka Theme

Now that you have set up the Kafka environment on the server, you can enter it through the following command or other methods.

$ cd packages/kafka_2.13-3.2.0

Then start the Kafka service and create a test topic (Topic), with the default port number 9092.

$ bin/kafka-topics.sh --create --topic csv_test --bootstrap-server localhost:9092

1.3 Write test data

Then write several test data in stripes, ending with ctrl-c:

$ bin/kafka-console-producer.sh --topic csv_test --bootstrap-server localhost:9092
>1,Beijing,123.05,true,1651043583,1651043583123,1651043583123456
>2,Shanghai,112.95,true,1651043583,1651043583123,1651043583123456
>3,Shenzhen,100.0,false,1651043583,1651043583123,1651043583123456
>4,Guangxi,88.5,false,1651043583,1651043583123,1651043583123456
>^C

Through the above command, 4 pieces of data are written to the newly created csv_test, which are 4 comma-segmented CSV lines. After the preparation work is completed, the following will introduce how to create a Kafka data stream to import data.

2 Create Kafka Data Stream

Enter the IP of the machine where MatrixGate is located (the default is the IP of the master node) and port number:

http://<IP>:8240

After successfully logging in, enter the main interface and select Kafka data source. First, use the CSV file as an example to illustrate the steps.

2.1 CSV data stream access

2.1.1 Select or create a new target table

First, you can select an existing table or a new table to serve as the target table for receiving Kafka data.

2.1.1.1 Select an existing table

Select the existing table and data import mode in the target table column. Here it is assumed that the selected table is test in the postgres database public mode.

![](https://img.ymatrix.cn/ymatrix_home/kafka modification 11_1702004504.jpeg)

2.1.1.2 Select New Table

Create a new table, here assuming the pattern is public and the table name is test1.

![](https://img.ymatrix.cn/ymatrix_home/kafka changes 8_1700805926.jpeg)

The table is successfully built and the red New logo appears.

![](https://img.ymatrix.cn/ymatrix_home/kafka re-creates table 9_1700710008.jpeg)

Select [Data Import Mode] (/doc/5.2/datainput/upsert) and automatic partitioning policy for it. Here, the partitioning policy defaults to auto_partitioning.

![](https://img.ymatrix.cn/ymatrix_home/kafka re-create table 10_1700710019.jpeg)

2.1.2 Source data

After the target table selection is completed, the second step is to set the connection of the source data. You need to complete the following to proceed to the next step correctly:

  • First, enter the Kafka broker address and port in the address box in the menu bar on the left according to the format in the figure. If there are multiple URLs, you need to split them with commas.
  • Then, connect to the required verification method of Kafka and verify the username and password in Kafka.
  • Finally, check or search for the Kafka topic you want to connect to in the menu bar on the right.

![](https://img.ymatrix.cn/ymatrix_home/kafka changes 1_1700793890.jpg)

2.1.3 Mapping

The third step is to configure the sample data and map existing tables or create new tables to import Kafka data.

2.1.3.1 Basic configuration

Select the CSV source data format and specify or configure sample data. You need to specify or manually configure the sample data format to ensure accurate mapping.

![](https://img.ymatrix.cn/ymatrix_home/kafka modification 2_1700709871.jpeg)

When configuring sample data, you can select `from the topic or edit data directly.

  • If there is complete data in the Kafka topic (Topic), select it as sample data from the topic.
  • When the data in the topic is incomplete (if there is a null column), you can choose Edit data directly and give us a sample data according to the field structure of the target table. This sample data is only used for the analysis and mapping of field structures and will not be imported into the target table.

![](https://img.ymatrix.cn/ymatrix_home/kafka modification 3_1700709879.jpeg)

Notes!
The newly created tables using the graphical interface are used by default with the MARS3 storage engine. For more information, please see MARS3

2.1.3.2 Execute mapping

After the simple mapping configuration is completed, map the source data field to the target table field. There are differences in the operation of creating a new table in this step.

  • Map already has tables

    First select a Sample data, and then find the corresponding target field in the mapping relationship. After setting it, click Save to establish a mapping relationship.

    ![](https://img.ymatrix.cn/ymatrix_home/kafka modification 5_1702024811.jpeg)

    It can also be automatically mapped directly with one click, which is convenient and maintains the correctness of the mapping.

    ![](https://img.ymatrix.cn/ymatrix_home/kafka modification 6_1702024858.jpeg)

  • Map new table

    In addition to establishing mapping rules, you also need to pay attention to the following settings when creating a new table: When the source field value is a UNIX timestamp and the target field type is selected as timestamptz, a check box Stored as timestamp will be displayed to the right of the output value. If you check this box, the output value result will be automatically converted to the database's timestamp format.

    ![](https://img.ymatrix.cn/ymatrix_home/kafka re-creates table 15_1702024932.jpeg)

    After the mapping rules are saved, you also need to select "Device ID" and "Time Column". "Device Identification" selects fields or combinations of fields in which the target table has unique constraints, and "Time Column" selects fields with the target table type timestamptz.

    ![](https://img.ymatrix.cn/ymatrix_home/kafka re-create table 14_1700710129.jpeg)

2.1.4 Submit

Finally, confirm all configuration information of the data stream, return to the modification if it is wrong, and submit it if it is correct.

2.1.4.1 Already have tables

The page displays all configuration information of the data flow. After confirming the information, click the "Submit" button.

![](https://img.ymatrix.cn/ymatrix_home/kafka changes 7_1700798991.jpg)

2.1.4.2 Create a new table

It displays more unique identification information composed of "device ID + time column" than existing tables. After confirming the information, click the "Submit" button.

![](https://img.ymatrix.cn/ymatrix_home/kafka re-creates table 13_1700710110.jpeg)

Whether it is an existing table or a new table, after the submission is successful, it will enter the creation success prompt page. At this point, the Kafka data flow has been successfully created.

2.2 JSON data stream access

Here we mainly describe the differences from the CSV format access, focusing on the mapping step.

  • First, select the JSON data format and configure the sample data.

    ![](https://img.ymatrix.cn/ymatrix_home/kafka modified json17_1700710210.jpeg)

  • Secondly, the index of the JSON column is indexed using the semantics of $["column name"], and the index rules refer to JSONPath.

  • Finally, JSON data supports multi-level nesting, and YMatrix supports selecting source data fields at different levels to create mapping rules. Click in the Sample Data column to expand the nesting level.

    ![](https://img.ymatrix.cn/ymatrix_home/kafka modified json18_1700710227.jpeg)

3 Pause and recovery of Kafka data streams

When you create Kafka data streams using the YMatrix graphical interface, the data is continuously imported into YMatrix. At this time, if you want to temporarily pause the import of Kafka data due to service debugging or other reasons, you can also operate directly in the graphical interface.

3.1 Pause Kafka data flow

For running Kafka data streams, data import can be paused from the list page operation entry.

Notes!
The pause operation may take 1-10 seconds to take effect, and no other operations can be performed on the data stream; after the pause operation is completed, the Kafka data will be stopped importing, and the status will be switched to "Paused".

![](https://img.ymatrix.cn/ymatrix_home/kafka pauses 1_1698390318.jpeg)

![](https://img.ymatrix.cn/ymatrix_home/kafka pauses 2_1698390324.jpeg)

3.2 Recover Kafka Data Stream

For Kafka data streams in the "Paused" state, data imports can be restored from the list page operation portal. After confirming the operation, the data stream will be immediately restored to import; the page will resume displaying the latest import information after up to 5 seconds.

![](https://img.ymatrix.cn/ymatrix_home/kafka pauses 3_1698390331.jpeg)