Kafka connect with Cassandra Source Connector

Reading Time: 2 minutes
Getting started with the Kafka Connect Cassandra Source | Lenses.io Blog

In this blog we will learn how to read data from cassandra source connector and write it to the kafka topic. Cassandra source connector is used to read the data from the tables and write it into the Kafka topic in JSON format.

PREREQUISITES

  1. Installation of Apache Cassandra
  2. Installation of Kafka and Kafka connect

CONFIGURING CASSANDRA CONNECTOR

The connectors are configured using Kafka Connect Query Language (KCQL). KCQL and other basic details will be stored in a JSON formatted property file.

Let’s create a JSON file named cassandra-source-connect.json:

{
  "name": "data",

  "config": {

    "tasks.max": "1",

    "connector.class":
 
"com.datamountaineer.streamreactor.cassandra.source.connect.CassandraSourceConnector",

    "connect.cassandra.contact.points": "localhost",

    "connect.cassandra.port": 9040,

    "connect.cassandra.username": "cassandra",

    "connect.cassandra.password": "cassandra",

    "connect.cassandra.consistency.level": "LOCAL_ONE",

    "connect.cassandra.key.space": "blog",

    "connect.cassandra.import.mode": "incremental",

    "connect.cassandra.kcql": "INSERT INTO test_topic SELECT column name,

 column name FROM table name IGNORE cluster key PK cluster key WITHUNWRAP 

INCREMENTALMODE=TIMESTAMP",

    
  }
}

connect.cassandra.kcql : informs the connector that which table in the Cassandra cluster is to use, how to use the columns of the table, and where to publish the data.

The first part of the query informs the connector about the topic of the kafka on which we need to publish the data.

“INSERT INTO test_topic”

The next part of the query informs the connector about the kind of operation we want to perform on the table.

“SELECT, INSERT,FROM”

The next part of the statement, the primary key, tells the connector that which columns is used to manage the date/time. 

And finally the WITHUNWRAP option tells the connector to publish the data to the topic in the form of String rather than as JSON object.

{
  "schema": {

    "type": "struct",

    "fields": [{

      "type": "string",

      "optional": true,

      "field": "event_data"

    }],

    "optional": false,

    "name": "blog.pack_events"

  },

  "payload": {

    "event_data": "{sample data}"

  }

}

STARTING KAFKA

To run the kafka we need to first start the zookeeper followed by kafka broker using the following commands:

bin/zookeeper-server-start.sh config/zookeeper.properties

and

bin/kafka-server-start.sh config/server.properties

Once the kafka is up and running now we need to create four topic.

bin/kafka-topics.sh — create — topic test_topic -zookeeper localhost:2181 —

 replication-factor 1 — partitions 3

To verify that the topics are created run the following command:

bin/kafka-topics.sh — list — zookeeper localhost:2181

INSTALL THE CASSENDRA SOURCE CONNECTOR

We can download the connector using the link. Unzip the tar file and copy it to the libs folder of the Kafka install directory.

CONFIGURE KAFKA CONNECT

We need to tell Kafka Connect where the Kafka cluster is. In the config folder of Kafka find the file: connect-distributed.properties. Look for the bootstrap.servers key. Update that to point to the cluster.

bootstrap.servers=localhost:9092.

Now start the kafka connect.

ADDING THE CASSANDRA SOURCE CONNECTOR

Kafka Connect has a REST API to interact with connectors. We can do this by sending the property file (cassandra-source-connect.json) to Kafka Connect using REST API.

After successful loading of the connector, we can check the installed connectors using:

curl localhost:8083/connectors

That should return a list of the connectors by their configured names:

[“data”]

TESTING THE CASSANDRA SOURCE CONNECTOR

In order to test everything out, we will need to insert some data into our table.

We can check the Kafka topic by running the following command:

bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic test_topic

CONCLUSION

In this blog so far we have learnt :

  • Configuration of cassandra connector
  • Configuration of kafka connect
  • Implementing cassandra source to kafka connect
  • Testing the source connector