
In this blog we will learn how to read data from cassandra source connector and write it to the kafka topic. Cassandra source connector is used to read the data from the tables and write it into the Kafka topic in JSON format.
PREREQUISITES
- Installation of Apache Cassandra
- Installation of Kafka and Kafka connect
CONFIGURING CASSANDRA CONNECTOR
The connectors are configured using Kafka Connect Query Language (KCQL). KCQL and other basic details will be stored in a JSON formatted property file.
Let’s create a JSON file named cassandra-source-connect.json:
{
"name": "data",
"config": {
"tasks.max": "1",
"connector.class":
"com.datamountaineer.streamreactor.cassandra.source.connect.CassandraSourceConnector",
"connect.cassandra.contact.points": "localhost",
"connect.cassandra.port": 9040,
"connect.cassandra.username": "cassandra",
"connect.cassandra.password": "cassandra",
"connect.cassandra.consistency.level": "LOCAL_ONE",
"connect.cassandra.key.space": "blog",
"connect.cassandra.import.mode": "incremental",
"connect.cassandra.kcql": "INSERT INTO test_topic SELECT column name,
column name FROM table name IGNORE cluster key PK cluster key WITHUNWRAP
INCREMENTALMODE=TIMESTAMP",
}
}
connect.cassandra.kcql : informs the connector that which table in the Cassandra cluster is to use, how to use the columns of the table, and where to publish the data.
The first part of the query informs the connector about the topic of the kafka on which we need to publish the data.
“INSERT INTO test_topic”
The next part of the query informs the connector about the kind of operation we want to perform on the table.
“SELECT, INSERT,FROM”
The next part of the statement, the primary key, tells the connector that which columns is used to manage the date/time.
And finally the WITHUNWRAP
option tells the connector to publish the data to the topic in the form of String rather than as JSON object.
{
"schema": {
"type": "struct",
"fields": [{
"type": "string",
"optional": true,
"field": "event_data"
}],
"optional": false,
"name": "blog.pack_events"
},
"payload": {
"event_data": "{sample data}"
}
}
STARTING KAFKA
To run the kafka we need to first start the zookeeper followed by kafka broker using the following commands:
bin/zookeeper-server-start.sh config/zookeeper.properties
and
bin/kafka-server-start.sh config/server.properties
Once the kafka is up and running now we need to create four topic.
bin/kafka-topics.sh — create — topic test_topic -zookeeper localhost:2181 —
replication-factor 1 — partitions 3
To verify that the topics are created run the following command:
bin/kafka-topics.sh — list — zookeeper localhost:2181
INSTALL THE CASSENDRA SOURCE CONNECTOR
We can download the connector using the link. Unzip the tar file and copy it to the libs
folder of the Kafka install directory.
CONFIGURE KAFKA CONNECT
We need to tell Kafka Connect where the Kafka cluster is. In the config folder of Kafka find the file: connect-distributed.properties
. Look for the bootstrap.servers
key. Update that to point to the cluster.
bootstrap.servers=localhost:9092
.
Now start the kafka connect.
ADDING THE CASSANDRA SOURCE CONNECTOR
Kafka Connect has a REST API to interact with connectors. We can do this by sending the property file (cassandra-source-connect.json
) to Kafka Connect using REST API.
After successful loading of the connector, we can check the installed connectors using:
curl localhost:8083/connectors
That should return a list of the connectors by their configured names:
[“data”]
TESTING THE CASSANDRA SOURCE CONNECTOR
In order to test everything out, we will need to insert some data into our table.
We can check the Kafka topic by running the following command:
bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic test_topic
CONCLUSION
In this blog so far we have learnt :
- Configuration of cassandra connector
- Configuration of kafka connect
- Implementing cassandra source to kafka connect
- Testing the source connector