This blog pertains to Apache SPARK 3.x, where we will find out how Spark’s new feature named Adaptive Query Execution (AQE) works internally. So let’s get started.
One of the major feature introduced in Apache Spark 3.0 is the new Adaptive Query Execution (AQE) over the Spark SQL engine.
So, in this feature, the Spark SQL engine can keep updating the execution plan per computation at runtime based on the observed properties of the data. For example, it can automatically tune the number of shuffle partitions while doing an aggregation, handle data skew in the join operation.
This makes it much easier to run Spark as we don’t need to configure these things in advance as a developer, now it is AQE’s responsibility. Thus making the life of the developer easier. 🙂 AQE will adapt and optimise based on our input data and also leads to better performance in many cases.
With AQE the new SQL plan looks like this:
Fig: Adaptive Query Plan
If you haven’t read my blog on Understanding Logical and Physical Plan, then I would highly recommend you to read this so that you can understand each phase of this plan.
1. Catalyst Optimizer and Tungsten Execution Engine was introduced in Spark 1.x
2. Cost-Based Optimizer was introduced in Spark 2.x
3. Adaptive Query Execution now got introduced in Spark 3.x
Let’s look into some example which Databricks recently published in their official blog and Youtube channel. (Links attached in the reference section)
Setting the number of shuffle partitions:
We all know that the default number of shuffle partition in Spark is 200 which causes a lot of pain during development activity. If the number is set to a small value it may lead to disk spills and if the number is set to a higher value it leads to increase in I/O.
How AQE can help us with this? Let’s see.
With AQE when Spark runs the initial phase of aggregation it observes the result size like we can have some ‘x’ number of partitions and thus it can set a different number of partitions for the reduce operation.
It uses coalesce to bring the number of partitions to fewer partitions and optimise the query for best performance based on what kind of data is supposed to come out of that aggregation, which AQE thinks will be the best.
Fig: Reducing Shuffle Partitions
With AQE, when we join two tables, Spark could observe how many records will be used in the Join after the initial stages of the join have actually happened and the choose the join algorithm that best optimizes the performance of the join.
The broadcast hash join is usually the most performant if one side of the join can fit well in memory. It gives us the benefit of performing a map-side join which eliminates the need for shuffling the data between executors.
This can be manually done without AQE too but needs to lot of configuration like the setting of the autoBroadcastJoinThreshold limit and lot more. It may also result in OOM if not configured properly. With AQE this becomes very easy as it becomes the responsibility of AQE to perform the broadcast hash join.
Thus, if the right side of the join is found to be way smaller than the other and also small enough to be broadcast, then the sort-merge join is converted to a broadcast hash join.
Fig: Changing Join Type
AQE can adapt both to the size of the data and to the skew on different keys. With AQE we don’t need to worry about tweaking skewed keys in a special way anymore like using Salting techniques. (Will discuss on handling skewness of data by AQE in another blog)
This results in huge speed-ups in SQL workloads.
The only thing we need to do is to set the SparkSession configuration so that Spark can do these magical things automatically.
To use the shuffle partitions optimisation we need to use –
For all configuration check the Spark Official Doc.
Here is the benchmark on TPC-DS queries by Databricks.
Fig: TPC-DS Query Benchmark
I hope this blog was a help for you in understanding the Adaptive Query Execution. Thank you 🙂