Having Issue How To Order Streamed Dataframe ?


A few days ago, i have to perform aggregation on streaming dataframe. And the moment, i apply groupBy for aggregation, data gets shuffled. Now the situation arises how to maintain order?

Yes, i can use orderBy with streaming dataframe using Spark Structured Streaming, but only in complete mode. There is no way of doing ordering of streaming data in append mode and update mode.

I have tried different ways to solve this issue. Like, if i go with spark structured streaming. I might sort the streamed data in batches but not across batches.

I started finding solutions with different technologies like Apache Flink, Apache storm etc. What i faced at the end is disappointment. 😦

A bit of light at the end of the tunnel

Luckily there is Apache Kafka Stream which provides the facility of accessing its StateStore.  Kafka Stream provides Processor API.

The low-level Processor API provides a client to access stream data and to perform our business logic on the incoming data stream and send the result as the downstream data. It is done via extending abstract class AbstractProcessor and overriding the init, punctuate,close and  process method which contains our logicThis process method is called once for every key-value pair.

Where the High-Level DSL provides ready to use methods with functional style, the low-level processor API provides you the flexibility to implement processing logic according to your need. The trade-off is just the lines of code you need to write for specific scenarios. For more information, refer the references.

So, the abstract idea is after aggregating the dataframe,write it to kafka. Read it as a KStream and apply the business logic using low-level processor API to sort the data and write it back to kafka.

val builder = new KStreamBuilder

//add the source processor node that takes Kafka topic "input-topic" as input
builder
.addSource("source", "input-topic")
// add the MyProcessor node which takes the source processor as its upstream processor
.addProcessor("p", new ProcessorSupplier[String, String] {
override def get(): Processor[String, String] = new MyProcessor
}, "source")
.addStateStore(Stores.create("state")
.withStringKeys()
.withStringValues()
.persistent().build(), "p")
// add the sink processor node that takes Kafka topic "output-topic" as output
.addSink("sink", "output-topic", "p")

Here the main idea is to keep on adding record in listbuffer until it reaches to certain size, let’s say 20. As buffer size reaches 20, we move to else part where we will iterate the listbuffer and parse every record to extract that specific column which will sort the record. We are going to make listbuffer of tuple2, one element of tuple2 is that specific column and element2 is consumed value from kafka. After that, we will sort the listbuffer of tuple2 on the basis of extracted column and send only second element to kafka. After that, we will drop the all element of listbuffer. This process will run continuously. We can also handle  late data and system shutdown by saving listbuffer in KeyValueStore according to requirement.

class MyProcessor extends AbstractProcessor[String, String] {
  var processorContext: ProcessorContext = _
  var keyValueStore: KeyValueStore[String, String] = _
  val listBuffer = new scala.collection.mutable.ListBuffer[String]
  val localStateStoreName = "state"
  val localStateStoreKey = "key"
  val pattern = "yyyy-MM-dd HH:mm:ss.SSS"
  val dateTimeFormatter = DateTimeFormat.forPattern(pattern)
 override def init(context: ProcessorContext): Unit = {
    processorContext = context
    keyValueStore = processorContext.getStateStore(localStateStoreName).asInstanceOf[KeyValueStore[String, String]]

 }
  override def process(key: String, value: String): Unit = {
           if(listBuffer.size < 20)
               listBuffer += value
           else{
             val tempList = listBuffer.map { str =>
             val jValue = JsonMethods.parse(str)
             val dateLongstr = (jValue \ "time").extract[String]
             val dateLong = LocalDateTime.parse(dateLongstr, dateTimeFormatter).toDateTime.getMillis
           (dateLong, str)
         }
          val sortedList = tempList.sortBy(_.1)
          sortedList.foreach { case (_, record) =>
          processorContext.forward(localStateStoreKey, record) // Sending sorted data to output-topic
          processorContext.commit()
         }
       listBuffer = listBuffer.drop(listBuffer.size)
       listBuffer += value
     }
 }
  override def punctuate(timestamp: Long): Unit = {
  }
  override def close(): Unit = {
    keyValueStore.close()
  }
}

So, here i have implemented the idea in MyProcessor. In my case, i am having three columns in value i.e time,col1,col2. I have extracted time column so that i can sort the record on the basis of time. After sorting, each record is being sent to kafka topic. Now I can consume it as a dataframe again. 😀

Conclusion:

Ordering of Streaming Data is always a hard problem. But with Kafka Streams we can now sort the streamed data using its Lower Level Processor APIs. The main aim of this blog is not to talk how to use low-level processor API but to make you familiar with the idea of how to sort the streamed data.

Hope, this blog will help you 🙂

References:

  1. http://docs.confluent.io/current/streams/developer-guide.html#processor-api
  2. http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html

knoldus-advt-sticker


Advertisements

About Mahesh Chand Kandpal

Explorer + Technology Enthusiast + Foodie + Movie Buff
This entry was posted in Apache Kafka, apache spark, big data, Scala, Spark, Streaming and tagged , , , , , , , , , , . Bookmark the permalink.

One Response to Having Issue How To Order Streamed Dataframe ?

  1. Reblogged this on Mahesh's Programming Blog and commented:

    How To Order Streaming Dataframe?

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s