Stateful transformation on Dstream in apache spark with example of wordcount

Table of contents
Reading Time: 2 minutes

Sometimes we have a use-case in which we need to maintain state of paired Dstream to use it in next Dstream . So we are taking a example of stateful wordcount in socketTextStreaming. Like in wordcount example if word “xyz” comes twice is in first Dstream or window, it reduce it and its value is 2 but its state will lost in the next Dstream if in next Dstream a word “xyz” will come its value will be 1 so its inconsistent, so to maintain the state of key like in next Dstream a word “xyz” will come its value will be 3. We use updateStateByKey(func) for stateful transformation.

So first we make a spark context for making streaming context as follows

val conf = new SparkConf().setAppName("myStream").setMaster("local[*]")
val sc = new SparkContext(conf)

Now with the help of spark context we make a streaming context as follows

val ssc = new StreamingContext(sc, Seconds(10))

Here Seconds(10)is the streaming interval

Now we use ssc context to open socketTextStream as follows

val lines = ssc.socketTextStream("localhost", 9999)

where localhost is interface and 9999 is port on which its listening.

we should use checkpoint to make it fault tolerance as follows

ssc.checkpoint("./checkpoints/")

Here ./checkpoints/ are the directory where all checkpoints are store.

val words = lines.flatMap(_.split(" "))

we split the line listen by the socketTextStream by space(” “)

val pairs = words.map(word => (word, 1))

we make a pair RDD to map it with tuple have value 1.

Now we use updateStateByKey(func) to make every word stateful through multiple Dstreams

val windowedWordCounts = pairs.updateStateByKey(updateFunc)

Now the main part of stateful transformation is updateFunc which is argument of updateStateByKey, we define it as follows

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}

this function always take the two arguments one is values of current Dstream for that key and another one is old state of value. As you see above we simply fold all current values in current Dstream and add it with old state of key.

Now we simply save result in text file as follows

windowedWordCounts.saveAsTextFiles("result/result")

At last simply start stream with ssc.start()

Now just start socketTextStream with the help of following command in terminal

$nc -lk 9999

now you can see cursor blink in the terminal so whatever you write in this its listen by your spark streaming.

Screenshot from 2015-06-24 19:29:56

And now all you words value will be stateful even it will come in different Dstreams.

You can see the out in the result directory for reach stream it will save one RDD.

For source code you can go through link: https://github.com/phalodi/stateful-wordcount-spark

3 thoughts on “Stateful transformation on Dstream in apache spark with example of wordcount2 min read

Comments are closed.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading