Spark Streaming: Unit Testing DStreams

Frankly, I don’t think there’s any need of telling us, “The Developers”, the need for proper testing or Unit testing to be correct(QAs, Don’t be flattered :P). The unit test cases are the quickest way to know there’s something wrong with our code.

Unit testing is important because it is one of the earliest testing efforts performed on the code and the earlier defects are detected, the easier they are to fix.”

Spark Streaming

Spark streaming is the API provided by spark alongside Spark-Core API. It is used for scalable, high-throughput, fault-tolerant stream processing of live data streams and supports many data sources like Kafka, HDFS etc.

It takes a continuous stream of data for ingestion and through Spark Streaming API this stream is converted to batches of input data. These batches are then processed by spark engine. To convert this stream of data to small batches and then process them, the DStream API is used as an abstraction.


DStream or Discretized Stream is an abstraction provided by the Spark Streaming. DStream denotes a series of RDDs (i.e small immutable batches of data) which makes DStream resilient. We can easily apply any transformation on input data stream using DStream (using map, flatMap ….). We only have to provide our business logic to the DStream and we can easily achieve the required result.

For example, the following method removes duplicate messages from the stream of (key, value) pairs using mapWithState.

def distinct(dStream: DStream[(Int, String)]): DStream[(Int, String)] = {
    .flatMap {
      case Some(value) => Seq(value)
      case _ => Seq()

val dedup = (key: Int, value: Option[String], state: State[List[Int]]) => {
  (value, state.getOption()) match {
    case (Some(data), Some(keys)) if !keys.contains(key) =>
      state.update(key :: keys)
      Some(key, data)
    case (Some(data), None) =>
      Some(key, data)
    case _ =>


And another method ‘update’ which modifies the values of our DStreams

def update(dStream: DStream[(Int, String)]): DStream[(Int, String)] = {{
case (key, value) => (key, s"""{"value":[$value]}""")


So till here, we all know the basic way how spark streaming works. All is well. But how do we actually get to know if our business logic is working well with spark streaming and transforming the input stream in the way we want. So we need to test the DStream with input data and check the output of our Streaming code.


So what’s the problem? How to execute streaming logic in a test environment.

We can write Integration test cases and provide the actual environment in the integration test. But for unit testing, we need a testing environment which should not depend on any external application.


StreamingSuiteBase provides the testing environment for DStream. It sends the inputs as batches and performs the provided operation on these batches and provides as output. And these outputs can be matched to the expected result.

Import the following dependency into your build.sbt

"com.holdenkarau" %% "spark-testing-base" % "2.1.0_0.8.0" % Test

And mixin the trait ‘StreamingSuiteBase‘ into your spec

class StreamingOperationsSpec extends WordSpec with StreamingSuiteBase {

The trait StreamingSuiteBase provides the method ‘testOperation‘ which takes the input values, our business logic, expected output values as parameters and provides the result. Testing our method distinct and update as follows:

"StreamingOperations" should{

"remove duplicates" in{
val inputPair = List(List((1, "value"), (1, "value")))
val outputPair = List(List((1, "value")))

testOperation(inputPair, distinct _ , outputPair, ordered = false)

"update stream" in {
val inputPair = List(List((1, """{"name": "Steve"}, {"name": "Tony"}""")))
val outputPair = List(List((1, """{"value":[{"name": "Steve"}, {"name": "Tony"}]}""")))

testOperation(inputPair, update _, outputPair, ordered = false)

The method ‘testOperation‘ takes the output of the operation performed on the ‘inputPair’ and check whether it is equal to the ‘outputPair’ and just like this, we can test our business logic.

This short snippet lets you test your business logic without forcing you to create even a spark session. You can mock whole streaming environment and test your business logic easily.

This was a simple example of unary operations on DStreams. Similarly, we can test binary operations and window operations on DStreams.

You can find the code here.


This entry was posted in apache spark, Scala, Spark and tagged , , . Bookmark the permalink.

2 Responses to Spark Streaming: Unit Testing DStreams

  1. Pingback: Unit Testing Spark Streaming DStreams – Curated SQL

Leave a Reply

Fill in your details below or click an icon to log in: Logo

You are commenting using your account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s