In this Spark Structured Streaming series of blogs, we will have a deep look into what structured streaming is in a very layman language. So let’s get started.
Structured streaming is a stream processing engine built on top of the Spark SQL engine and uses the Spark SQL APIs. It is fast, scalable and fault-tolerant. It provides rich, unified and high-level APIs in the form of DataFrame and DataSets that allows us to deal with complex data and complex variation of workloads. Similar to the batch processing of Spark, it also has a rich ecosystem of data sources from which it can read from and write to.
The philosophy behind the development of Structured Streaming is that,
“We as end user should not have to reason about streaming”.
What that means is that we as end-user should only write batch like queries and its Spark’s job to figure out how to run it on a continuous stream of data and continuously update the result as new data flows in.
The thought/realization which the developers of Structured Streaming had and which lead to its development is,
“We can always treat a Stream of Data as an Unbounded Table”.
Which means that every record in the data stream is like a row that is appended to the table.
Thus, we can represent the batch (static bounded data) as well as streams (unbounded data) as tables, which allows us to express our computations with respect to tables and Spark figures out how to actually run it on either static data or streaming data.
Structure of Streaming Query
To have an understanding of the structure of a streaming query, let’s look into a simple streaming query.
(Example is taken from Databricks)
Let’s say we have set up an ETL pipeline, where we are getting JSON data from Kafka and we want to parse this data and convert it into a structured form, finally write it into a Parquet file. Also, we want to get end-to-end failure guarantees as we don’t want any failure to drop the data or create duplicate data.
Reading the data (Defining Source)
The first step is to create a DataFrame from Kafka i.e. we need to specify where to read the data from. In this case, we need to specify the following things:
- Format of the source as “kafka”
- The IP address of the Kafka brokers (bootstrap servers)
- The topic name from where the data is to be consumed.
- Offsets from where we want to consume data. It can be earliest, latest or any custom offset.
There are multiple built-in supported sources like File, Kafka, Kinesis. We can also have multiple input streams and can join or union the streams together. (Will discuss this in an upcoming blog)
spark.readStream // Defining Source .format("kafka") .option("kafka.bootstrap.servers", "...") .option("subscribe", "topicName") .option("startingOffsets", "latest") .load // Transformation .select($"value".cast(StringType)) .select(from_json($"value", schema).as("data")) // Defining Sink .writeStream .format("parquet") .option("path", "...") // Processing Trigger .trigger(Trigger.ProcessingTime(1.minutes)) // Output Mode .outputMode(OutputMode.Append) // Checkpoint Location .option("checkpointLocation", "...") .start()
These above lines of code return a DataFrame, which is a single unified API for manipulating both batch and streaming data in Spark using the same APIs.
Transformation (Business Logic)
Now, since we have the data in a DataFrame we can perform all the DataFrame operations in Spark.
In this case, the data received from Kafka is in binary format, so the next step would be to transform the data for converting it from binary value to string value and then parse the string value into a specific JSON format by applying an appropriate schema on it.
Note: In streaming, Spark cannot infer the schema as each micro-batch has only a limited number of data. So, inferring the schema from such less amount of data is not a good practice.
Writing the data (Defining Sink)
Now, as we have the parsed data in JSON format we want to write it into a Parquet file. For this, we need to specify the following things:
- Format of the sink as “parquet”
- The path to which we are writing data to.
We specify the time interval at which we want to trigger this query.
E.g. If we specify the processing trigger interval as 1 minute, that means Spark will collect all the data that it receives in that 1 minute (this is known as a micro-batch) and then process it at exactly 60 seconds.
Note: The default processing trigger is 0, which means Spark will trigger the query with whatever data it has collected as soon as possible.
We can define different output modes based on our query such as – append, complete and update. This different output mode makes sense with different queries. Each mode is applicable to certain types of queries only.
i. Append: “This is the default mode where only the new rows are added in the Result Table and the last trigger is written to the sink. This is applicable for only those queries where rows added to the Result Table are not expected to change.”
Most applicable to scenarios when we are not doing any aggregate operation in the query, then probably we are only appending new rows (after transformation) from each micro-batch.
ii. Complete: “The entire updated Result Table will be written to the sink after every trigger. This is supported for aggregate queries. It is up to the storage connector to decide how to handle the writing of the entire table.”
Most applicable to scenarios when we are doing some aggregation on a column, so every time when there is new data there is a need to do the aggregation on the data processed so far.
Note: Spark does not store the entire streaming data in memory, but it will use the metadata (State) information of the previous aggregation for this. (Will discuss Stateful streaming in an upcoming blog)
There are few Dataframe operations which are not supported with streaming such as distinct, sorting, etc as for those transformations Spark would need to store the entire data in memory i.e. metadata information won’t be helpful in these case.
iii. Update: Only the rows that were updated in the Result Table since the last trigger will be written to the sink. If the query doesn’t contain aggregations, it will be equivalent to Append mode.
So, the user has control over what data has to be pushed to the sink or external system as the results get updated with new data.
We need to mention the check-point location to save the metadata and other necessary information so that in case of any failures the engine can restart by delivering exactly once delivery semantic. The check-point location should be fault-tolerant storage like HDFS, AWS S3 bucket, Azure Blob Storage, etc.
The final step is the call the start() API.
This is all from this blog, what happens under the hood when we call the start() API is discussed in the next blog.
Next part of this blog is – The Internals of Structured Streaming
Hope you enjoyed this blog and it helped you!! Stay connected for more future blogs. Thank you!! 🙂