A Quick Demo: Kafka to Flink to Cassandra

Reading Time: 3 minutes

Hi Folks!! In this blog, we are going to learn how we can integrate Flink with Kafka and Cassandra to build a simple streaming data pipeline.

Apache Flink is a framework and distributed processing engine. it is used for stateful computations over unbounded and bounded data streams.
Kafka is a scalable, high performance, low latency platform. It allows reading and writing streams of data like a messaging system.
Cassandra: A distributed and wide-column NoSQL data store.

Minimum Requirements and Installations

To start the application, you will need Kafka, and Cassandra installed locally on your machine. The minimum requirements for the application:

Java 1.8+, scala 2.12.2, Flink 1.9.0 , sbt 1.3.12, Kafka 2.3.0 , Cassandra 3.10.

Dependencies

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-connector-kafka" % "1.9.0",
  "org.apache.flink" %% "flink-streaming-scala" % "1.9.0" ,
  
  "org.json4s" %% "json4s-native" % "3.6.10",

  // cassandra
  "org.apache.flink" %% "flink-connector-cassandra" % "1.9.0"
)

Connecting to Kafka and reading streams.

import org.apache.flink.streaming.api.scala._
implicit lazy val formats = org.json4s.DefaultFormats

// Open Kafka connection and Streaming car data through topic.    
val properties: Properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092");
    properties.setProperty("group.id", "testKafka");

val kafkaConsumer = new FlinkKafkaConsumer[String]("car.create", new SimpleStringSchema(), properties)

// parsing JSON string into Car case class using json4s
val carDataStream = streamExecutionEnvironment.addSource(kafkaConsumer)
	.flatMap(raw => JsonMethods.parse(raw).toOption)
      	.map(_.extract[Car])

In the above code snippet, reading JSON data from Kafka Topic “car.create” which contains information about Cars. And deserializes the message as a JSON string using SimpleStringSchema. Then parsing JSON string into Scala case class using json4s. The Car model looks like below:

case class Car(
	Name: String,
	Miles_per_Gallon: Option[Double],
	Cylinders: Option[Long],
	Displacement: Option[Double],
	Horsepower: Option[Long],
	Weight_in_lbs: Option[Long],
	Acceleration: Option[Double],
	Year: String,
	Origin: String)

By the use of the Flink streaming engine and reading the JSON data from the Kafka topic, we will get DataStream[Car] as a result. You can apply some Transformations to the Car DataStream. Then sink the resultant DataStream to the Cassandra Database.

import org.apache.flink.streaming.api.scala._
createTypeInformation[(String, Option[Long], Option[Long])]

//Creating car data to sink into cassandraDB.
val sinkCarDataStream = sinkCarStream.map(car =>
 (car.Name, car.Cylinders.orNull, car.Horsepower.orNull))

//Open Cassandra connection and Sinking car data into cassandraDB.
CassandraSink.addSink(sinkCarDataStream)
.setHost("127.0.0.1")
.setQuery("INSERT INTO example.car(Name, Cylinders, Horsepower) values (?, ?, ?);")
.build

We are all set with our handy code. You can find complete source code here.

Now lets start the Kafka and Cassandra services locally to test it.

Running Cassandra:

Go to the Cassandra bin directory and run the below command to start cassandra server:

./cassandra 

Then, go inside the cassandra shell by running command:

./cqlsh

In the shell, run below commands to create Keyspace example and table car into cassandra-

CREATE  KEYSPACE [IF NOT EXISTS] example 
   WITH REPLICATION = { 
      'class' : 'SimpleStrategy', 'replication_factor' : 1 } 
   };

CREATE TABLE [IF NOT EXISTS] example.car("Name" text primary key, "Cylinders" int, "Horsepower" int)

Running Kafka:

Go inside your kafka directory:

  • Start Zookeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
  • Start Kafka server:
bin/kafka-server-start.sh config/server.properties
  • Create Kafka Topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic car.create
  • Start Kafka Producer:
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic car.create

Go inside the project and open a terminal and run the below commands:

sbt clean compile
sbt run

Produce some sample messages in the kafka topic car.create

{“Name”:”saab 99e”, “Miles_per_Gallon”:25, “Cylinders”:4, “Displacement”:104, “Horsepower”:95, “Weight_in_lbs”:2375, “Acceleration”:17.5, “Year”:”1970-01-01″, “Origin”:”Europe”} {“Name”:”amc gremlin”, “Miles_per_Gallon”:21, “Cylinders”:6, “Displacement”:199, “Horsepower”:90, “Weight_in_lbs”:2648, “Acceleration”:15, “Year”:”1970-01-01″, “Origin”:”USA”} {“Name”:”chevy c20″, “Miles_per_Gallon”:10, “Cylinders”:8, “Displacement”:307, “Horsepower”:200, “Weight_in_lbs”:4376, “Acceleration”:15, “Year”:”1970-01-01″, “Origin”:”USA”}

Result

Go to the cassandra shell and run the below command:

select * from example.car;

You will get Name of the cars, Number of Cylinders used, and Horsepower of a cars into the cassandra Database that streams from kafka.

Thanks for reading. Stay connected for more future blogs.

Written by 

Exploring Big Data Technologies.

2 thoughts on “A Quick Demo: Kafka to Flink to Cassandra4 min read

    1. Hi Holly, I am a big fan of Akka. I know that Akka streams, Akka Persistence, and Akka Distributed Data are very powerful components of Akka to create such data pipelines that could be highly concurrent and distributed with low latency. I would definitely look into it. Though I am not too much aware of Cloudflow, I know Lightbend already created a solution that simplifying the development, deployment, and operations of complex, multi-component streaming data pipelines. I Hope, I will touch with Cloudflow soon.

Comments are closed.