Welcome back folks to this blog series of Spark Structured Streaming. This blog is the continuation of the earlier blog “Internals of Structured Streaming“. And this blog pertains to Stateful Streaming in Spark Structured Streaming. So let’s get started.
Let’s start from the very basic understanding of what is Stateful Stream Processing. But to understand that, let’s first understand what Stateless Stream Processing is.
In my previous blogs of this series, I’ve discussed Stateless Stream Processing.
Just a brief on Stateless
In the previous ETL pipeline which I discussed, where we took every record and parsed it out to JSON format and then wrote it to Parquet file. There we were doing something stateless because we processed each record individually i.e. we were not combining information with any other record (like count, average, etc). Even Spark didn’t need to remember any previous data from earlier micro-batches to process the current records.
Stateful Stream Processing
Let’s say we are counting the number of records that we have parsed or filtered. Then, in this case, we are combining information across multiple previous micro-batches. So to do this, we need to maintain some information (metadata) of the partial counts on each Executor. This is known as the State. Here count is the state and every selected record increments the count.
df.where(col(“data.type”) === lit(“type1”)).count()
Challenges with Stateful Stream Processing
- Ensuring fault-tolerance
- Exactly once delivery guarantee
Stateful Micro-Batch Processing
Structured Streaming does processing under the hood as micro-batches (default nature).
A state is versioned between micro-batches while the streaming query runs. So as the series of incremental execution plans are generated (discussed in Part 2), each execution plan knows what version of the state it needs to read from.
Each micro-batch reads the previous version of the state data i.e. the previous running count then updates it and creates a new version. Each of these versions gets check-pointed into the same check-point location that we have provided in the query.
With this, the Spark engine can recover from failure and ensure exactly-once delivery guarantee as it knows exactly from where it needs to restart from.
Note: In our case, the Sink is Parquet file!
Distributed State Files
These files are stored on each Executor’s memory and all the changes to these state files are backed by writing into WAL (Write Ahead Log) files into the check-pointed location like HDFS or S3.
The simplest one and the most used one in Stateful Streaming is the Streaming Aggregation.
There are a few types of aggregation:
a. Aggregation by key:
b. Aggregation by event time window:
eventsDF.groupBy(window(“timestamp”, “5 mins”))
c. Aggregate by both key and event time window:
.groupBy(col(“data.type”), window(“timestamp”, “5 mins”))
d. User-defined aggregate functions (UDAF)
As we can see that aggregating based on event time is simply applying a window as if the window is an additional key. We can specify the window duration (in this example, 5 mins) and also the window sliding duration.
When we specify only window duration, then it is called as Tumbling Window which means Spark will perform aggregation on data collected in every 5 minutes time frame.
When we specify both window duration and sliding duration, then it is called a Sliding Window which means that we want to aggregate by the window duration of 10 minutes moving every 5 minutes.
eventsDF .groupBy(col(“data.type”), window(“timestamp”, “10 mins”, “5 mins”)) .agg(avg(col(“data.clicks”)))
In case of Sliding Window, Spark takes care of figuring out which record should fall into which one or more window time frame and accordingly calculates and updates the count or average (aggregate result)
(Comment below if you need a dedicated blog on Tumbling Window and Sliding Window) 🙂
Next blog of this series is Handling Late Arriving Data.
This is all from this blog, hope you enjoyed it and it helped you!! Stay connected for more future blogs. Thank you!! 🙂