Deployment and use of Kafka connect 2 (add connector)
hangge 2021-06-04 10:42:01
    The above passage is adopted Kafka Self contained FileStreamSinkConnector and FileStreamSourceConnector these two items. Connector Demonstrated   Kafka Connect Some basic usage of ( Click to see ). On the market Kafka Connector quite a lot , Including open source and commercial versions of , You can choose according to the situation . This passage MQTT Connector Examples show how to add and use a new Connector.

3、 ... and 、 Add and use Connector

1, Installation configuration

(1) This time we still use standalone mode Kafka Connect, The first edit conf/connect-standalone.properties file :
vi conf/connect-standalone.properties

(2) Will last plugin.path Note cancel , And set up /opt/connectors For the plug-in Directory ( Back Connector Will be put in this directory ): 

(3) Because this article demonstrates MQTT -> Kafka, So only integrate MQTT Connect that will do . What I'm using here is Confluent Provided   MQTT Connector, First visit its official website ( Click on the access ) Download .

(4) After decompressing the compressed package , libs That's what we need in the folder jar package :

(5) First, create a... On the server /opt/connectors Catalog :
mkdir /opt/connectors

(6) Then go to the directory and create another kafka-connect-mqtt Catalog :
cd /opt/connectors
mkdir kafka-connect-mqtt

(7) Unzip the front libs All under file jar Bags are uploaded to kafka-connect-mqtt Under the table of contents :

(8) stay Kafka Of conf Create one in the directory connect-mqtt-source.properties file , The contents are as follows :
Tips : According to your actual situation, you can mainly change the highlighted part of the content ( source MQTT Of topic、 The goal is Kafka Of topic、 source MQTT Address 、 The goal is Kafka Address ). The following configuration indicates that all the /mqtt/# The message of the subject is written to Kafka Of my_mqtt In the theme .
name=mqtt-source
connector.class=io.confluent.connect.mqtt.MqttSourceConnector
tasks.max=1
mqtt.topics=/mqtt/#
kafka.topic=my_mqtt
mqtt.server.uri=tcp://192.168.60.2:1883
confluent.topic.bootstrap.servers=192.168.60.2:32410
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

2, Start the service

(1) Execute the following command to start Kafka Connect, Note that the command should be followed by the relevant configuration file :
./bin/connect-standalone.sh config/connect-standalone.properties config/connect-mqtt-source.properties
  • Because the above startup mode will automatically end the program after exiting the terminal , To make sure Kafka Connect Be able to run in the background all the time , You can add -daemon Parameters :
./bin/connect-standalone.sh -daemon config/connect-standalone.properties config/connect-mqtt-source.properties

(2) After startup, it can pass through jps Command to check whether it started successfully , If the following figure appears, it means the startup is successful :
If you are prompted jps No orders found , You can execute the following command to install :
  • yum install java-1.8.0-openjdk-devel.x86_64

(3) Kafka Connect Provides REST API It's convenient for us to manage ( Default port 8083), We go through /connector-plugins You can see that the interface does contain MqttSinkConnector and MqttSourceConnector

(4) adopt /connectors The interface can see mqtt source The configuration of has been added successfully :

3, Start testing

(1) First we use MQTTBox This client tool goes to MQTT Server sends a message :

(2) And then Kafka You can see it here my_mqtt This data has already appeared in this topic , Explain what you just added Connect Has to take effect .
Please bring the original link to reprint ,thank
Similar articles

2021-08-09

2021-08-09