As we know Spark is used at a wide range of organizations to process large datasets. It seems like spark becoming main stream. In this blog we will talk about Assimilation of Spark Streaming With Kafka. So, lets get started.
How Kafka can be integrated with Spark?
Kafka provides a messaging and integration platform for Spark streaming. Kafka act as the central hub for real-time streams of data and are processed using complex algorithms in Spark Streaming. Once the data is processed, Spark Streaming could be used to publish results into yet another Kafka topic.
Let’s see how to configure Spark Streaming to receive data from Kafka by creating a SBT project first and add the following dependencies in build.sbt.
val sparkCore = "org.apache.spark" % "spark-core_2.11" % "2.2.0" val sparkSqlKafka = "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.2.0" val sparkSql = "org.apache.spark" % "spark-sql_2.11" % "2.2.0" libraryDependencies ++= Seq(sparkCore, sparkSql, sparkSqlKafka)
Now, we need to create a Spark session.
val sparkConf: SparkConf = new SparkConf() .setAppName(sparkAppName) .setMaster(sparkMaster) .set("spark.executor.memory", sparkMemory) .set("spark.executor.core", sparkCores) .set("spark.sql.streaming.checkpointLocation", "PATH_TO_CHECKPOINT_LOCATION")
val spark: SparkSession = SparkSession .builder() .config(sparkConf) .getOrCreate()
Creating Kafka as Source
We can read messages in spark stream by subscribing to a particular topic.
val dataFrame: DataFrame = spark .readStream .format("kafka") .option("subscribepattern", "TOPIC") .option("kafka.bootstrap.servers", "KAFKA_SERVER_URL") .option("startingoffsets", "latest") .load() .selectExpr("CAST(value AS STRING) AS value")
subscribePattern : The pattern used to subscribe to topic(s).
kafka.bootstrap.servers : It consists of a comma-separated list of host:port.
startingOffsets : The start point when a query is started either “latest” or “earliest”.
Another topic will consume the messages from the previous topic.
dataFrame.writeStream .format("kafka") .option("truncate", value = false) .option("kafka.bootstrap.servers", "KAFKA_SERVER_URL") .option("topic", "TOPIC") .start() .awaitTermination()
So, we have created a basic example of Spark Streaming with Kafka. For more information you can refer to these blogs written by Jatin Demla and Ayush Tiwari. I hope you liked the blog.
Happy Coding!! 🙂
Reblogged this on akashsethi24.
Reblogged this on Coding, Unix & Other Hackeresque Things.