Overview:
In this blog, I am going to discuss how to use Kafka Connect to stream data. In this blog, I’ll be taking an example and then elaborate on it. That example will be based on how can we stream data which is file test.tx to a destination which is also a file, test.sink.txt with the help of Kafka connect. I will be using standalone mode in the example.
We will use 2 connectors for this:
- FileStreamSource reads the data from the test.txt file and publishes it to Kafka topic: connect-test
- FileStreamSink which will consume data from the connect-test topic and write to the test.sink.txt file.
Now let us see the configuration file for the Source at kafka_2.11-0.10.0.0\config\connect-file-source.properties:
name=local-file-source
connector.class=FildatateStreamSource
tasks.max=1
file=test.txt
topic=connect-test
We got to define the connector.class, the greatest of tasks will we make, the file name that will be read by the connector, and the subject where information will be published.
Below is the configuration file for the Sink at kafka_2.11-0.10.0.0\config\connect-file-sink.properties
name=local-file-sink
connector.class=FileStreamSink
task.max=1
file=test.sink.txt
topics=connect-test
There is one more imp[ortant configuration file located at: kafka_2.11-0.10.0.0\config\connect-standalone.properties. Now, we need to define the address of the Kafka broker, the keys, and the values converters.
bootstrap.servers=localhost:9092
key.conveert=org.apache.kafka.connect.json.JsonConverter
value.convert=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
Time to run the example:
Now, we are all set to run the example. Before running the example, you have to make sure you have already deployed and ready the Apache Kafka 0.9.x or 0.10.x
Starting Kafka broker:
We first should cd to the Kafka distribution folder.
cd /opt/kafka_2.11-0.10.0.0
Starting Zookeeper
./bin/zookeeper-server-start.sh config/zookeeper.properties &
Start Kafka Server
./bin/kafka-server-start.sh config/server.properties
Start the source and Sink connectors:
./bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
Since, after writing this command, the connector is ready for reading the content from the test.txt file.
Starting the source connector:
For starting the source connector, we have to write some content to the test.txt file.
echo 'hello' >> test.txt
echo 'halo' >> test.txt
echo 'salut' >> test.txt
Checking if the Source connector feeds the test.txt content into the topic connect-test or not:
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic connect-tdatatest
The output should be:
{"schema":{"type":"string","optional":false},"payload":"hello"}
{"schema":{"type":"string","optional":false},"payload":"halo"}
{"schema":{"type":"string","optional":false},"payload":"salut"}
Checking if the Sink connector writes content to use Kafka Connect to stream data to the test.sink.test or not:
cat test.sink.txt
The output should be:
hello
halo
salut
Conclusion:
In conclusion, in this blog, we have learned about how to stream data with the help of Kafka connect / connectors.
For more, you can refer to the Kafka documentation: https://kafka.apache.org/documentation/
For a more technical blog, you can refer to the knoldus blog: https://blog.knoldus.com/