Advanced Spark SQL Joins: an Optimization Technique

Reading Time: 4 minutes

Welcome back to another imp topic of apache spark. Today will learn about one of the optimization techniques used in spark called Joins.

Apache spark supports many types of joins, few come under the regular join types and others are some advanced join types. To know details about regular one please refer the link

let’s start with what is optimization in Spark, and all the types of joins that optimize the spark query performance.

What is Optimization

Spark optimization techniques are used to modify the settings and properties of Spark to ensure that the resources are utilized properly and the jobs are executed quickly.

All this ultimately helps in processing data efficiently. The joins types we are going to learn as the scope of this blog are

  • Broadcast Join
  • Shuffle Hash Join
  • Sort Merge Join

Broadcast Join Working

  • Spark broadcast joins are perfect for joining a large DataFrame with a small DataFrame.
  • Broadcast joins are easier to run on a cluster.
  • Spark can “broadcast” a small DataFrame by sending all the data in that small DataFrame to all nodes in the cluster.
  • After the broadcast, small DataFrame Spark can perform a join without shuffling any of the data in the large DataFrame.
  • let’s check the below example

we are using a simple data set of the employee to department relationship. Employees of an imaginary company are assign to one department they work for which is reference by an ID (depID) As we have just a few departments but multiple employees we will pick the department table for broadcasting.

case class Employee(name:String, age:Int, depId: String)
case class Department(id: String, name: String)

val employeesRDD = sc.parallelize(Seq( 
    Employee("Mary", 33, "IT"), 
    Employee("Paul", 45, "IT"), 
    Employee("Peter", 26, "MKT"), 
    Employee("Jon", 34, "MKT"), 
    Employee("Sarah", 29, "IT"),
    Employee("Steve", 21, "Intern")
val departmentsRDD = sc.parallelize(Seq( 
    Department("IT", "IT  Department"),
    Department("MKT", "Marketing Department"),
    Department("FIN", "Finance & Controlling")

val employeesDF = employeesRDD.toDF
val departmentsDF = departmentsRDD.toDF

// materializing the department data
val tmpDepartments = broadcast("departments"))

   $"depId" === $"id",  // join by employees.depID == 

//Below is the output of our broadcast join:

| name|age|depId| id|                name|
| Mary| 33|   IT| IT|      IT  Department|
| Paul| 45|   IT| IT|      IT  Department|
|Peter| 26|  MKT|MKT|Marketing Department|
|  Jon| 34|  MKT|MKT|Marketing Department|
|Sarah| 29|   IT| IT|      IT  Department|

Shuffle Hash Join

When the table is relatively large, the use of broadcast may cause driver- as well as executor-side memory issues, then shuffle Hash Join is the right choice. 

It is an expensive join as it involves both shuffling and hashing. Also, it requires memory and computation for maintaining a hash table.

Shuffle Hash Join is performed in two steps :

Step 1 : 
Shuffling: The data from the Join tables are partitioned based on the Join key. It does shuffle the data across partitions to have the same Join keys of the record assigned to the corresponding partitions.

Step 2- 
Hash Join: A classic single node Hash Join algorithm is performed for the data on each partition.

NOTE: To use the Shuffle Hash Join, spark.sql.join.preferSortMergeJoin needs to be false

When to use:

Shuffle hash join works well-
1. when the dataframe are distributed evenly with the keys you are used to join and
2. when dataframes has enough number of keys for parallelism.

Sort Merge Join

Shuffle Sort-merge Join (SMJ) involves shuffling of data to get the same Join key with the same worker, and then performing the Sort-merge Join operation at the partition level in the worker nodes.

NOTE: This is Spark’s default join strategy, Since Spark 2.3 the default value of spark.sql.join.preferSortMergeJoin has been changed to true.

Partitions are sorted on the Join key before the Join operation.

It has 3 phases:

  • Shuffle Phase: Both large tables will be repartitioned as per the Join keys across the partitions in the cluster.
  • Sort Phase: Sort the data within each partition parallelly.
  • Merge Phase: Join the sorted and partitioned data. It is merging the dataset by iterating over the elements and joining the rows having the same value for the Join keys.

Table for the Join strategies Supported by the Join types


In this blog, we learned advanced types of joins supported in apache spark, which help in optimizing the query performance, Apache Spark Optimisation Techniques can process and analyze large datasets very efficiently. The only need to take care of is the scenarios which decide where to fit which type of join.

Hope you learn basics of Joins & its working.

Stay tuned for upcoming blogs

Written by 

Chitra Sapkal is a software consultant at Knoldus Inc. having experience of 2 years. Knoldus does Big Data product development on Scala, Spark, and Functional Java. She is a self-motivated, passionate person who is recognized as a good team player, Her hobbies include playing badminton and travelling.