As we all know optimization plays an important role in the success of spark SQL. Therefore, a lot of work has been done in this direction. Before spark 3.0, cost-based optimization was a major hit in which different stages related to cost (based on time efficiency and estimated CPU and I/O usage) are compared and executes the strategy which minimizes the cost. But, because of outdated statistics, it has become a sub-optimal technique. Therefore in spark 3.0, Adaptive Query Execution was introduced which aims to solve this by reoptimizing and adjusts the query plans based on runtime statistics collected during query execution. Thus re-optimization of the execution plan occurs after every stage as each stage gives the best place to do the re-optimization.
Enabling Spark Execution Plan
By default, adaptive query execution is disabled. For enabling it, set the spark.adaptive.enabled config property to true. Also, AQE is not supported for streaming datasets and is disabled during its execution.
Main Characteristics of AQE
1. Reducing Post shuffle Partition:-
Before spark 3.0, the developer needs to do optimization by increasing or decreasing the number of partitions based on the number of records. But, if someone chooses the wrong number of partitions, it leads to a lot of data transfer within the network and also data spill into the disk. After the introduction of spark 3.0, developers do not need to know the number of records. In this case, spark automatically selects the optimal number of partitions by looking into the metrics of the completed stage.
As you can see in the below image that without enabling the AQE it requires 200 partitions for shuffle operation
And after setting both configuration property i.e. “spark.sql.adaptive.enabled” and “spark.sql.adaptive.coalescePartitions.enabled” to true, you can easily spot the difference in the number of partitions as it requires only 6 partitions now for the same operation.
2. Dynamic Switching of Join Strategies
Spark supports a lot of join strategies but among all, broadcast hash join is the most effective one (performance-wise) but it works only when a table is small enough to get fit into each executor memory. It uses a peer-to-peer protocol in which a block of files can be shared by peers amongst each other. Hence they do not have to depend on a single node.
In Spark 2.x, the join strategy is selected before execution where the data size for different intermediate operations is unknown. Hence based on estimation, the selected strategy might not be the optimal one. But in Spark3.x by enabling AQE, shuffle can be avoided by replacing sort-merge join with the broadcast join if size conditions are met on any of the sides. Hence, it replans the join strategy at run time based on the most accurate join relation size.
3. Skew Join Optimization
Data Skew primarily means the non-uniform distribution of data among different partitions in a cluster. In this case, tasks operating on a large amount of data will slow down the whole stage which in turn will significantly downgrade the performance of queries, especially joins. This problem can be solved by increasing the parallelism or by increasing the cardinality with the help of join key restructuring.
AQE automatically detects this skew from shuffle file statistics and handles it by splitting the partitions into smaller sub-partitions, which will be joined to the corresponding partition from another side respectively. It takes effect when both “
spark.sql.adaptive.enabled” and “
spark.sql.adaptive.skewJoin.enabled” configurations are enabled.
Hope you enjoyed the blog. Thanks for reading.