Overview:
Hello everyone, in this blog, we will see an example of Kafka connect in which we will take a MySQL table, stream it to a Kafka topic, and from there load it to Elasticsearch and index its content.
Installation:
Now first of all we will install MySQL and Elastic search to our local system. For installing simply run:
sudo install mysql
sudo install elasticsearch
The next step is to make sure you have the connectors. There are a few options:
- Download and install using the Confluent HubNow we need to load these connectors. Create a directory, such as /opt/connectors, and update config/connect-distributed.properties to include plugin.path=/opt/connectors. client (https://docs.confluent.io/home/connect/confluent-hub/client.html)
- Download from the Confluent Hub website (or from any other website where the connector you are interested in is hosted).
- Build from source code. To do this, you’ll need to:
-
- Clone the connector source:
git clone https://github.com/confluentinc/kafka-connect-elasticsearch
-
- Run mvn install -DskipTests to build the project.
- Repeat with the JDBC connector.
Loading the Kafka connectors:
Now we need to load these connectors. Create a directory, such as /opt/connectors, and update config/connect-distributed.properties to include plugin.path=/opt/connectors.
Then take the jars that were created under the target directory where you built each
connector and copy each one, plus their dependencies, to the appropriate subdirectories of the plugin.path :
gwen$ mkdir /opt/connectors/jdbc
gwen$ mkdir /opt/connectors/elastic
gwen$ cp …/kafka-connect-jdbc/target/kafka-connect-jdbc-10.3.x-
SNAPSHOT.jar /opt/connectors/jdbc
gwen$ cp ../kafka-connect-elasticsearch/target/kafka-connect-
elasticsearch-11.1.0-SNAPSHOT.jar /opt/connectors/elastic
gwen$ cp ../kafka-connect-elasticsearch/target/kafka-connect-
elasticsearch-11.1.0-SNAPSHOT-package/share/java/kafka-connect-
elasticsearch/* /opt/connectors/elastic
In addition, since we need to connect not just to any database but specifically to
MySQL, you’ll need to download and install a MySQL JDBC driver. The driver
doesn’t ship with the connector for license reasons. You can download the driver
from the MySQL website (https://dev.mysql.com/downloads/connector/j/) and then place the jar in /opt/connectors/jdbc.
After that, restart the Kafka Connect workers and check that the new connector plug-ins are
listed:
gwen$
bin/connect-distributed.sh config/connect-distributed.properties &
gwen$ curl http://localhost:8083/connector-plugins
[
{
"class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"type": "sink",
"version": "11.1.0-SNAPSHOT"
},
{
"class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"type": "sink",
"version": "10.3.x-SNAPSHOT"
},
{
"class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"type": "source",
"version": "10.3.x-SNAPSHOT"
}
However, from the above, we can see that we now have additional connector plug-ins available in our Connect cluster.
Creating table in MySQL:
After that, the next step is to create a table in MySQL that we can stream into Kafka using our
JDBC connector:
gwen$ mysql.server restart
gwen$ mysql --user=root
mysql> create database test;
Query OK, 1 row affected (0.00 sec)
mysql> use test;
Database changed
mysql> create table login (username varchar(30), login_time datetime);
Query OK, 0 rows affected (0.02 sec)
mysql> insert into login values ('gwenshap', now());
Query OK, 1 row affected (0.01 sec)
mysql> insert into login values ('tpalino', now());
Query OK, 1 row affected (0.00 sec)
Similarly, you can see, we created a database and a table and inserted a few rows as an
example.
Therefore, the next step is to configure our JDBC source connector. We can find out which con‐
figuration options are available by looking at the documentation, but we can also use
the REST API to find the available configuration options:
gwen$ curl -X PUT -d '{"connector.class":"JdbcSource"}' localhost:8083/
connector-plugins/JdbcSourceConnector/config/validate/ --header "content-
Type:application/json"
{
"configs": [
{
"definition": {
"default_value": "",
"dependents": [],
"display_name": "Timestamp Column Name",
"documentation": "The name of the timestamp column to use
to detect new or modified rows. This column may not be
nullable.",
"group": "Mode",
"importance": "MEDIUM",
"name": "timestamp.column.name",
"order": 3,
"required": false,
"type": "STRING",
"width": "MEDIUM"
},
<more stuff>
We asked the REST API to validate the configuration for a connector and sent it a configuration with just the class name (this is the bare minimum configuration necessary). In addition, as a response, we got the JSON definition of all available configurations. With this information in mind, it’s time to create and configure our JDBC connector:Now we need to load these connectors. Create a directory, such as /opt/connectors, and update config/connect-distributed.properties to include plugin.path=/opt/connectors.
echo '{"name":"mysql-login-connector", "config":{"connector.class":"JdbcSource-
Connector","connection.url":"jdbc:mysql://127.0.0.1:3306/test?
user=root","mode":"timestamp","table.whitelist":"login","vali-
date.non.null":false,"timestamp.column.name":"login_time","topic.pre-
fix":"mysql."}}' | curl -X POST -d @- http://localhost:8083/connectors --header
"content-Type:application/json"
{
"name": "mysql-login-connector",
"config": {
"connector.class": "JdbcSourceConnector",
"connection.url": "jdbc:mysql://127.0.0.1:3306/test?user=root",
"mode": "timestamp",
"table.whitelist": "login",
"validate.non.null": "false",
"timestamp.column.name": "login_time",
"topic.prefix": "mysql.",Now we need to load these connectors. Create a directory, such as /opt/connectors, and update config/connect-distributed.properties to include plugin.path=/opt/connectors.
"name": "mysql-login-connector"
},
"tasks": []
}
Let’s make sure it worked by reading data from the mysql.login topic:
gwen$ bin/kafka-console-consumer.sh --bootstrap-server=localhost:9092 --topic
mysql.login --from-beginning
Once the connector is running, if you insert additional rows in the login table, you
should immediately see them reflected in the mysql.login topic.
Now we need to load these connectors. Create a directory, such as /opt/connectors, and update config/connect-distributed.properties to include plugin.path=/opt/connectors.
Conclusion:
In conclusion, in this blog, we have seen an example of Kafka connect in which we have taken a MySQL table, stream it to a Kafka topic, and from there load it to Elasticsearch and indexed its content.
For more, you can refer to the Kafka documentation: https://kafka.apache.org/documentation/
For a more technical blog, you can refer to the knoldus blog: https://blog.knoldus.com/