Assimilation of Spark Streaming With Kafka


As we know Spark is used at a wide range of organizations to process large datasets. It seems like spark becoming main stream. In this blog we will talk about Integration of Kafka with Spark Streaming. So, lets get started.

How Kafka can be integrated with Spark?

Kafka provides a messaging and integration platform for Spark streaming. Kafka act as the central hub for real-time streams of data and are processed using complex algorithms in Spark Streaming. Once the data is processed, Spark Streaming could be used to publish results into yet another Kafka topic.

Let’s see how to configure Spark Streaming to receive data from Kafka by creating a SBT project first and add the following dependencies in build.sbt.

val sparkCore = "org.apache.spark" % "spark-core_2.11" % "2.2.0"
val sparkSqlKafka = "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.2.0"
val sparkSql = "org.apache.spark" % "spark-sql_2.11" % "2.2.0"

libraryDependencies ++= Seq(sparkCore, sparkSql, sparkSqlKafka)

Now, we need to create a Spark session.

val sparkConf: SparkConf = new SparkConf()
  .setAppName(sparkAppName)
  .setMaster(sparkMaster)
  .set("spark.executor.memory", sparkMemory)
  .set("spark.executor.core", sparkCores)
  .set("spark.sql.streaming.checkpointLocation", "PATH_TO_CHECKPOINT_LOCATION")
val spark: SparkSession = SparkSession
  .builder()
  .config(sparkConf)
  .getOrCreate()

Creating Kafka as Source

We can read messages in spark stream by subscribing to a particular topic.

val dataFrame: DataFrame = spark
  .readStream
  .format("kafka")
  .option("subscribepattern", "TOPIC")
  .option("kafka.bootstrap.servers", "KAFKA_SERVER_URL")
  .option("startingoffsets", "latest")
  .load()
  .selectExpr("CAST(value AS STRING) AS value")

subscribePattern : The pattern used to subscribe to topic(s).

kafka.bootstrap.servers : It consists of a comma-separated list of host:port.

startingOffsets : The start point when a query is started either “latest” or  “earliest”.

Another topic will consume the messages from the previous topic.

dataFrame.writeStream
  .format("kafka")
  .option("truncate", value = false)
  .option("kafka.bootstrap.servers", "KAFKA_SERVER_URL")
  .option("topic", "TOPIC")
  .start()
  .awaitTermination()

So, we have created a basic example of Spark Streaming with Kafka. For more information you can refer to these blogs written by Jatin Demla and Ayush Tiwari. I hope you liked the blog.

Happy Coding!! 🙂

References


knoldus-advt-sticker


Advertisements
This entry was posted in Apache Kafka, apache spark, Scala and tagged , , , . Bookmark the permalink.

One Response to Assimilation of Spark Streaming With Kafka

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s