Spark 3.0 – Adaptive Query Execution With Example

Reading Time: 4 minutes

Introduction

Adaptive Query Execution (AQE) is one of the greatest features of Spark 3.0 which reoptimizes and adjusts query plans based on runtime statistics collected during the execution of the query.

Need of AQE

With each major release of Spark, it’s been introducing new optimization features in order to better execute the query to achieve greater performance. Before spark 3.0, cost-based optimization uses table statistics to determine the most efficient query execution plan of a structured query. But, because of the possibility of outdated statistics, it has become a sub-optimal technique. Therefore in spark 3.0, Adaptive Query Execution was introduced.

Enabling Adaptive Query Execution

Adaptive Query Execution is disabled by default. We can change the property by setting following :

spark.conf.set("spark.sql.adaptive.enabled",true)

Working of AQE

  • AQE improves the Catalyst Optimizer workflow by adjusting query plans based on runtime statistics collected during query execution
  • Spark will send statistics about the real size of the data in the shuffle files
  • for the next stage, it re-optimizes the logical plan to dynamically switch join strategies, shuffle partitions, etc.

Features of AQE

1.Switch join strategies

Spark supports a lot of join strategies but among all, Broadcast Hash Join (BHJ) is one of the performant join strategies using Spark. The strategy is use only with one condition when one of the joins tables is small enough to fit in memory within the broadcast threshold. Default broadcast-size threshold 10mb.

In Spark 2.x, join stratergy is selected basis of estimation of data generated from input file sizes, this estimation cannot be always accurate as filters and complex operators can interfere to modify the size meanwhile When AQE is enabled, it replans the join strategy at runtime based on the most accurate join relation size.

2.Reducing Post-shuffle Partitions.

Prior to the spark3.0 developer need to know the data as Spark doesn’t provide the optimal partitions. so after each shuffle operation the developer needs re-partition to increase or decrease the partitions based on the total number of records. let’s imagine someone chooses the wrong number of partitions, it leads to a lot of data transfer within the network, which increases the overhead & hampers the performance badly,

With Spark 3.0, after every stage of the job, Spark dynamically determines the optimal number of partitions by looking at the metrics of the completed stage.

Before Enabling AQE :

As you can see in the below image that without enabling the AQE it requires 200 partitions for shuffle operation

After Enabling AQE :

spark.conf.set(“spark.sql.adaptive.enabled”,true)
spark.conf.set(“spark.sql.adaptive.coalescePartitions.enabled”,true)

Same shuffle operation requires only 4 Partitions.

3.Optimizing Skew Join

Data skew is a problem of data distribution. Whenver dealing with the large amount of data on cluster there is possibility of unevenly distibution of data across the partitions, which in turn will significantly downgrade the performance of queries, especially joins.

AQE automatically detects this skew from shuffle file statistics and handles it by splitting the partitions into smaller sub-partitions, which will be join to the corresponding partition from the other side respectively.

Let’s understand better with example :

  • example of table A join table B, in which table A has a partition A0 significantly bigger than its other partitions.
  • The skew join optimization will thus split partition A0 into two subpartitions
  • join each of them to the corresponding partition B0 of table B.
  • Without this optimization, there would be four tasks running the sort merge join with one task taking a much longer time
  • After this optimization, there will be five tasks running the join.
  • resulting in each task will take roughly the same amount of time, resulting in an overall better performance.

TPC-DS performance gains from AQE

Below is a chart of the 10 TPC-DS queries having the most performance improvement by AQE. Most of these improvements have come from dynamic partition coalescing as well as dynamic join strategy switching.

To Know about TPC-DS schema refer link

Conclusion

AQE is a great addition to the Apache Spark optimize. It can dramatically speed up your queries. Please check the documentation for details on the configuration options. If you not yet read about the spark basics do check my blog on Spark, it will really help you to kick start your journey with Apche Spark.

Hope you enjoyed learning. Stay tuned for upcmoing blogs!

Written by 

Chitra Sapkal is a software consultant at Knoldus Inc. having experience of 2 years. Knoldus does Big Data product development on Scala, Spark, and Functional Java. She is a self-motivated, passionate person who is recognized as a good team player, Her hobbies include playing badminton and travelling.