One of the most frequently used transformations in Apache Spark is Join operation. Joins in Apache Spark allow the developer to combine two or more data frames based on certain (sortable) keys. The syntax for writing a join operation is simple but some times what goes on behind the curtain is lost. Internally, for Joins Apache Spark proposes a couple of Algorithms and then chooses one of them. Not knowing what these internal algorithms are, and which one does spark choose might make a simple Join operation expensive.
While opting for a Join Algorithm, Spark looks at the size of the data frames involved. It considers the Join type and condition specified, and hint (if any) to finally decide upon the algorithm to use. In most of the cases, Sort Merge join and Shuffle Hash join are the two major power horses that drive the Spark SQL joins. But if spark finds the size of one of the data frames less than a certain threshold, Spark puts up Broadcast Join as it’s top contender.
Broadcast Hash Join
Looking at the Physical plan of a Join operation, a Broadcast Hash Join in Spark looks like this
The above plan shows that the data frame from one of the branches broadcasts to every node containing the other data frame. In each node, Spark then performs the final Join operation. This is Spark’s per-node communication strategy.
Spark uses the Broadcast Hash Join when one of the data frame’s size is less than the threshold set in spark.sql.autoBroadcastJoinThreshold. It’s default value is 10 Mb, but can be changed using the following code
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 100 * 1024 * 1024)
This algorithm has the advantage that the other side of the join doesn’t require any shuffle. If this other side is very large, not doing the shuffle will bring notable speed-up as compared to other algorithms that would have to do the shuffle.
Broadcasting large datasets can also lead to timeout errors. A configuration spark.sql.broadcastTimeout sets the maximum time that a broadcast operation should take, past which the operation fails. The default timeout value is 5 minutes, but it can be set as follows:
Sort Merge Join
If neither of the data frames can be broadcasted, then Spark resorts to Sort Merge Join. This algorithm uses the node-node communication strategy, where Spark shuffles the data across the cluster.
Sort Merge Join requires both sides of the join to have correct partitioning and order. Generally, this is ensured by shuffle and sort in both branches of the join as depicted below
This algorithm composes of two steps. The first step exchanges and sorts the datasets and the second step merges the sorted data in the partition by iterating over the elements and according to the join key join the rows having the same value.
The shuffle and sort are very expensive operations and in principle, to avoid them it’s better to create Data frames from correctly bucketed tables. This makes join execution more efficient.
From spark 2.3, Merge-Sort join is the default join algorithm in spark. However, this can be turned down by using the internal parameter spark.sql.join.preferSortMergeJoin which by default is true.
Shuffled Hash Join
Shuffle Hash join works on the concept of map-reduce. It maps through the data frames and uses the values of the join column as output key. Then it Shuffles the data frames based on the output keys. Now, the rows from the different data frames with the same keys will end up in the same machine. So, in the reduce phase, spark joins the data frames.
Since preferSortMergeJoin is set to true by default, Spark chooses SortMergeJoin over Shuffled Hash Join. On the other hand, even if preferSortMergeJoin is set to False, Spark will choose the Shuffled Hash Join only if :
1. one side of the join is at least three times smaller than the other side
2. the average size of each partition is smaller than the autoBroadcastJoinThreshold.
Similarly to Sort Merge Join, Hash Join also requires the data to be partitioned correctly. So in general, it will introduce a shuffle in both branches of the join. However, as opposed to the former, it doesn’t require the data to be sorted, and because of that, it has the potential to be faster than Sort Merge Join.
Although Joins in Apache Spark internally choose the best Join algorithm, but a developer can influence that decision using hints. Specifying hint in the join syntax, the developer asks Spark to do something that it otherwise would not do. So, the developer has to remain extra cautious. Specifying hint without understanding the nature of the underlying data may lead to OOM errors or build a hash map for a large partition. On the other hand, if the developer is familiar with the underlying data, then without specifying the hint the developer might lose an occasion to optimize the Join operation.
1 thought on “Apache Spark’s Join Algorithms5 min read”
Comments are closed.