Welcome back folks to this blog series of Spark Structured Streaming. This blog is the continuation of the earlier blog “Understanding Stateful Streaming“. And this blog pertains to Handling Late Arriving Data in Spark Structured Streaming. So let’s get started.
Handling Late Data
With window aggregates (discussed in the previous blog) Spark automatically takes cares of late data. Every aggregate window is like a bucket i.e. as soon as we receive data for a particular new time window, we automatically open up a bucket and start counting the number of records falling in that bucket. These buckets stay open for data which may even come 5 hours late and it will still update that old bucket and thus incrementing the count.
As we can see from the diagram, even if the data comes late it just happens to update an older bucket.
Problem with this approach? (without watermarking)
The size of the State (discussed in the previous blog) will continue to increase indefinitely as we really don’t know when a bucket can be closed.
But practically a query is not going to receive data 1 week late or in that matter such late-arriving data is of no use to us.
So, to specify the information when to stop considering older buckets for the streaming query we use Watermark.
Watermark is moving threshold of how late the data is expected to be and accordingly the engine can drop old state.
Design of Watermark
Let’s imagine at any point of time we have seen the max event time in the data as 12:30 pm. The watermark time specifies the trailing gap of 10 minutes. Thus, watermark time is a moving threshold behind the max event time.
So, at this point of time when the max event time is 12:30 pm, the watermark would be 12:20 pm and the engine automatically tracks this. Thus data which is older than this watermark will be automatically dropped.
Any data which is late but not late than the watermark would be allowed to be aggregated. So buckets will be kept open for that 10 minutes duration. But any data which is older than the watermark time will be dropped and those buckets will be closed and the State will be cleaned up. So the State clean up happens automatically in aggregation.
To do watermarking, all we have to do is call the API withWatermark() on our query and specify the event time column with the watermark delay.
df.withWatermark(col("timestamp"), "10 minutes") .groupBy(window(col("timestamp"), "5 minutes")) .count()
Note: For streaming queries which does not have any stateful operation (see Part 3 for more details) and for batch queries the watermark is ignored.
Trade-off in Watermarking
Watermarking is a parameter that we can tweak between how much late data (like 1 day) to consider and how much large States to keep. If we want our application to handle a larger amount of delays then we have to incur the cost of keeping a large State because we are keeping more buckets open.
If we cannot afford to keep large State in memory because of the cost then we cannot consider too much late data.
This is all from this blog, hope you enjoyed it and it helped you!! Stay connected for more future blogs. Thank you!! 🙂