Assimilation of Spark Streaming With Kafka

spark streaming with kafka
Reading Time: 2 minutes

As we know Spark is used at a wide range of organizations to process large datasets. It seems like spark becoming main stream. In this blog we will talk about Assimilation of Spark Streaming With Kafka. So, lets get started.

How Kafka can be integrated with Spark?

Kafka provides a messaging and integration platform for Spark streaming. Kafka act as the central hub for real-time streams of data and are processed using complex algorithms in Spark Streaming. Once the data is processed, Spark Streaming could be used to publish results into yet another Kafka topic.

Let’s see how to configure Spark Streaming to receive data from Kafka by creating a SBT project first and add the following dependencies in build.sbt.

val sparkCore = "org.apache.spark" % "spark-core_2.11" % "2.2.0"
val sparkSqlKafka = "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.2.0"
val sparkSql = "org.apache.spark" % "spark-sql_2.11" % "2.2.0"

libraryDependencies ++= Seq(sparkCore, sparkSql, sparkSqlKafka)

Now, we need to create a Spark session.

val sparkConf: SparkConf = new SparkConf()
  .setAppName(sparkAppName)
  .setMaster(sparkMaster)
  .set("spark.executor.memory", sparkMemory)
  .set("spark.executor.core", sparkCores)
  .set("spark.sql.streaming.checkpointLocation", "PATH_TO_CHECKPOINT_LOCATION")
val spark: SparkSession = SparkSession
  .builder()
  .config(sparkConf)
  .getOrCreate()

Creating Kafka as Source

We can read messages in spark stream by subscribing to a particular topic.

val dataFrame: DataFrame = spark
  .readStream
  .format("kafka")
  .option("subscribepattern", "TOPIC")
  .option("kafka.bootstrap.servers", "KAFKA_SERVER_URL")
  .option("startingoffsets", "latest")
  .load()
  .selectExpr("CAST(value AS STRING) AS value")

subscribePattern : The pattern used to subscribe to topic(s).

kafka.bootstrap.servers : It consists of a comma-separated list of host:port.

startingOffsets : The start point when a query is started either “latest” or  “earliest”.

Another topic will consume the messages from the previous topic.

dataFrame.writeStream
  .format("kafka")
  .option("truncate", value = false)
  .option("kafka.bootstrap.servers", "KAFKA_SERVER_URL")
  .option("topic", "TOPIC")
  .start()
  .awaitTermination()

So, we have created a basic example of Spark Streaming with Kafka. For more information you can refer to these blogs written by Jatin Demla and Ayush Tiwari. I hope you liked the blog.

 

 

Happy Coding!! 🙂

References


knoldus-advt-sticker


Written by 

Charmy is a Software Consultant having experience of more than 1.5 years. She is familiar with Object Oriented Programming Paradigms and has familiarity with Technical languages such as Scala, Lagom, Java, Apache Solr, Apache Spark, Apache Kafka, Apigee. She is always eager to learn new concepts in order to expand her horizon. Her hobbies include playing guitar and Sketching.

2 thoughts on “Assimilation of Spark Streaming With Kafka2 min read

Comments are closed.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading