Spark Structured Streaming

Reading Time: 3 minutes

Overview

In Spark 2.0, structured streaming was added for building continuous applications. It let you apply processing logic on streaming data in pretty much the same way we work with batch data. It also provides scalable and fault-tolerant processing through checkpointing and write-ahead logs. Spark SQL provides a base for this processing engine. It is an engine to process data in real-time from sources and output data to external sources. The Spark SQL engine performs the computation incrementally and continuously provides the result as streaming data arrives.

Internally, by default, it uses a micro-batch processing engine. Data streams are processed as a series of small-batch jobs. With this also, end-to-end latency of 100 milliseconds can be achieved. However, in spark 2.3 a new latency processing mode called Continuous Processing is introduced. With the help of this, end-to-end latency of 1 ms can also be achieved.

Programming Model

The input data stream can be considered as an input table. Every time a data item that is coming can be considered as a new row being appended to the input table. The developer then defines a query to read data incrementally from the input table. Along with this, developers also define the triggers (let’s say 1 sec) to control when to update the results.

The last part of this model is the output mode. After processing it, spark needs to store it at some persistence storage. There are 3 types of storage modes available:-

  1. Append :- In this mode, it will output only newly processed rows since the last trigger.
  2. Complete :- In this mode, it will output all rows it has processed so far.
  3. Update :- In this mode, it will output only updated rows since last trigger. If no aggregation is used(means there is no updation on previous records), it will behave similarly to that of append mode.

Streaming Example

Please find the below example using Kafka as a source and the stream is triggered after every 10 seconds. Here the Kafka is being hit after every 10 seconds and incremental data is being fetched.

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.functions._



object SparkStreamingExample {
  def main(args: Array[String]): Unit = {
    val time = new org.apache.spark.streaming.Duration(10000l)
    val spark = SparkSession.builder().appName("Spark Structured Streaming Example")
    
    val sc = spark.SparkContext
    // Set Spark logging level to ERROR to avoid various other logs on console.
    sc.setLogLevel("ERROR")
    // Creating a streaming context of 10 sec
    val ssc = new StreamingContext(sc, time)
    
    
    val kafkaParams: Map[String, String] = Map("group.id" -> "test", 
		        "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
  			"key.deserializer" -> classOf[StringDeserializer],
  			"value.deserializer" -> classOf[StringDeserializer],
  			"auto.offset.reset" -> "earliest", // from where to start reading the offsets
  			"enable.auto.commit" -> (true: java.lang.Boolean))

   val topics = "test_topic"
   val stream = KafkaUtils.createStream(ssc, kafkaParams, topics)
   stream.print()
   ssc.start()
   ssc.awaitTermination()
   ssc.stop()
  }
}

We have successfully written our first streaming application using Kafka as a source and printed the results to the console. Alternatively, we can also use any storage like HDFS, ElasticSearch, or any other database to store our stream data in any output mode.

Hope you enjoyed the blog. Thanks for reading it.

Written by 

Amarjeet is a Software Consultant at Knoldus Software LLP. He has an overall experience of 2 years and 10 months.He has completed his Bachelor of Technology in Computer Science from National Institute of Technology, Hamirpur, Himachal Pradesh. He likes reading books, travelling and trekking.