Kafka And Spark Streams: The happily ever after !!

Hi everyone, Today we are going to understand a bit about using the spark streaming to transform and transport data between Kafka topics.

The demand for stream processing is increasing every day. The reason is that often, processing big volumes of data is not enough. We need real-time processing of data especially when we need to handle continuously increasing volumes of data and also need to process it and maintain it as well.

Image result for let the data flow meme'

Recently, in one of our projects we faced such a requirement. Myself, being a newbie to Apache Spark, had only a little idea about what to do. So, I considered the best option to be the Apache Spark documentation. It did help me understand the basic concepts of Spark, about streaming and how to transport data using streams.

To give you a heads up, Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.
It provides a high-level abstraction called discretized stream or DStream, which represents a continuous stream of data.
DStreams can be created either from input data streams from sources such as Kafka, Flume, and Kinesis, or by applying high-level operations on other DStreams. Internally, a DStream is represented as a sequence of RDDs(Resilient Distributed Dataset).

Although in Spark 2.0, structured streaming was introduced, still a lot of big companies use the D-streams (Discretized Stream) or what we call the original Spark Streams.

Image result for spark streaming meme
We were also a bit confused about the same, but after a lot of discussions, we decided to go with the D-Streams only as they seemed to be a more logical choice.

So our next step was integrating the Kafka with the spark streaming.

This is where our challenge began.  Using the help of the documentation, I started the coding journey.

  • The very first thing we require is a SparkConf object which loads defaults from system properties and the classpath.
    val conf = new SparkConf().setMaster("local[*]")
    .setAppName("SimpleDStreamExample")

    Note: the local[*] gives it access to all the available cores. It can be changed as per requirement.

  • The next important thing is the KafkaParams which is nothing but a Map[String, Object]
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092",
      "key.deserializer" -> classOf[StringDeserializer], 
    // used as i am using a string serializer for the input kafka topic
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "spark-demo",
      "kafka.consumer.id" -> "kafka-consumer-01"
    )

    These are required by the input stream to know the whereabouts of the input data and how to fetch it.

  • Next, we require a StreamingContext that will take the SparkConf as one of the arguments :
    val ssc = new StreamingContext(conf, Seconds(1))
  • Once we are through with the StreamingContext, we now require an input stream that will actually stream the data (requires the KafkaParams)
    val inputStream = KafkaUtils.createDirectStream(ssc,
      PreferConsistent, Subscribe[String, String](Array(inputTopic), kafkaParams))

    This stream now contains the consumer records,i.e the records from the input topic.

    Here I am not playing with maintaining the state or checkpointing because that is a very different topic which has no reference here and also you can separately integrate it as well.

  • Once we have got the inputStream,  we can perform our required operations on that stream (operations like flatMap, map, filter etc) to get yourself a processed stream i.e the stream now contains the modified/transformed data.
    val processedStream = inputStream.map(record => record.value) //Any operation can be performed here.
    
      //checking the batches for data
    processedStream.print()
  • Once you are done with the operations, you will get a processedStream, which we now need to store in an output Kafka topic. Now, this is where I was not sure. I went through a lot of blogs but couldn’t find any decent solution for the same. But finally, after a lot of digging, I found a simple solution.
    rdd.foreach {
      case data: String => {
        val message = new ProducerRecord[String, String](outputTopic, data)
        producer.send(message).get().toString
      }
    })
    
    //producer here is the new producer  and the outputTopic is the Kafka topic where we need to store the processed data
  • Finally, your messages in the processed stream will be added to the output Kafka topic using the producer in the above case.

A piece of Cake. Right?

Image result for easy meme

We have also added our code for reference which you can find here: StreamingKafkaDataWithSpark

To Know more about the Spark Streaming and its integration with Kafka you can refer to the Apache Spark  Streaming Guide for Kafka integration.

In this blog, we tried to understand how we can use Spark Streaming to stream data between Kafka topics.
I hope you enjoyed reading this article. Stay tuned for more.  🙂

 


 

knoldus-advt-sticker


Written by 

Anmol Sarna is a software consultant having more than 1.5 years of experience. He likes to explore new technologies and trends in the IT world. His hobbies include playing football, watching Hollywood movies and he also loves to travel and explore new places. Anmol is familiar with programming languages such as Java, Scala, C, C++, HTML and he is currently working on reactive technologies like Scala, Cassandra, Lagom, Akka, Spark and Kafka.

2 thoughts on “Kafka And Spark Streams: The happily ever after !!

Leave a Reply

%d bloggers like this: