Sometimes we have a use-case in which we need to maintain state of paired Dstream to use it in next Dstream . So we are taking a example of stateful wordcount in socketTextStreaming. Like in wordcount example if word “xyz” comes twice is in first Dstream or window, it reduce it and its value is 2 but its state will lost in the next Dstream if in next Dstream a word “xyz” will come its value will be 1 so its inconsistent, so to maintain the state of key like in next Dstream a word “xyz” will come its value will be 3. We use updateStateByKey(func) for stateful transformation.
So first we make a spark context for making streaming context as follows
val conf = new SparkConf().setAppName("myStream").setMaster("local[*]")
val sc = new SparkContext(conf)
Now with the help of spark context we make a streaming context as follows
val ssc = new StreamingContext(sc, Seconds(10))
Here Seconds(10)
is the streaming interval
Now we use ssc context to open socketTextStream as follows
val lines = ssc.socketTextStream("localhost", 9999)
where localhost is interface and 9999 is port on which its listening.
we should use checkpoint to make it fault tolerance as follows
ssc.checkpoint("./checkpoints/")
Here ./checkpoints/ are the directory where all checkpoints are store.
val words = lines.flatMap(_.split(" "))
we split the line listen by the socketTextStream by space(” “)
val pairs = words.map(word => (word, 1))
we make a pair RDD to map it with tuple have value 1.
Now we use updateStateByKey(func) to make every word stateful through multiple Dstreams
val windowedWordCounts = pairs.updateStateByKey(updateFunc)
Now the main part of stateful transformation is updateFunc which is argument of updateStateByKey, we define it as follows
val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)
}
this function always take the two arguments one is values of current Dstream for that key and another one is old state of value. As you see above we simply fold all current values in current Dstream and add it with old state of key.
Now we simply save result in text file as follows
windowedWordCounts.saveAsTextFiles("result/result")
At last simply start stream with ssc.start()
Now just start socketTextStream with the help of following command in terminal
$nc -lk 9999
now you can see cursor blink in the terminal so whatever you write in this its listen by your spark streaming.
And now all you words value will be stateful even it will come in different Dstreams.
You can see the out in the result directory for reach stream it will save one RDD.
For source code you can go through link: https://github.com/phalodi/stateful-wordcount-spark
Reblogged this on pushpendupurkait.
Reblogged this on himanshu2014.
Reblogged this on Rishi Khandelwal.