Welcome back to another imp topic of apache spark. Today will learn about one of the optimization techniques used in spark called Joins.
Apache spark supports many types of joins, few come under the regular join types and others are some advanced join types. To know details about regular one please refer the link
let’s start with what is optimization in Spark, and all the types of joins that optimize the spark query performance.
What is Optimization
Spark optimization techniques are used to modify the settings and properties of Spark to ensure that the resources are utilized properly and the jobs are executed quickly.
All this ultimately helps in processing data efficiently. The joins types we are going to learn as the scope of this blog are
- Broadcast Join
- Shuffle Hash Join
- Sort Merge Join
Broadcast Join Working
- Spark broadcast joins are perfect for joining a large DataFrame with a small DataFrame.
- Broadcast joins are easier to run on a cluster.
- Spark can “broadcast” a small DataFrame by sending all the data in that small DataFrame to all nodes in the cluster.
- After the broadcast, small DataFrame Spark can perform a join without shuffling any of the data in the large DataFrame.
- let’s check the below example
we are using a simple data set of the employee to department relationship. Employees of an imaginary company are assign to one department they work for which is reference by an ID (depID) As we have just a few departments but multiple employees we will pick the department table for broadcasting.
case class Employee(name:String, age:Int, depId: String) case class Department(id: String, name: String) val employeesRDD = sc.parallelize(Seq( Employee("Mary", 33, "IT"), Employee("Paul", 45, "IT"), Employee("Peter", 26, "MKT"), Employee("Jon", 34, "MKT"), Employee("Sarah", 29, "IT"), Employee("Steve", 21, "Intern") )) val departmentsRDD = sc.parallelize(Seq( Department("IT", "IT Department"), Department("MKT", "Marketing Department"), Department("FIN", "Finance & Controlling") )) val employeesDF = employeesRDD.toDF val departmentsDF = departmentsRDD.toDF // materializing the department data val tmpDepartments = broadcast(departmentsDF.as("departments")) employeesDF.join(broadcast(tmpDepartments), $"depId" === $"id", // join by employees.depID == departments.id "inner").show() //Below is the output of our broadcast join: +-----+---+-----+---+--------------------+ | name|age|depId| id| name| +-----+---+-----+---+--------------------+ | Mary| 33| IT| IT| IT Department| | Paul| 45| IT| IT| IT Department| |Peter| 26| MKT|MKT|Marketing Department| | Jon| 34| MKT|MKT|Marketing Department| |Sarah| 29| IT| IT| IT Department| +-----+---+-----+---+--------------------+
Shuffle Hash Join
When the table is relatively large, the use of broadcast may cause driver- as well as executor-side memory issues, then shuffle Hash Join is the right choice.
It is an expensive join as it involves both shuffling and hashing. Also, it requires memory and computation for maintaining a hash table.
Shuffle Hash Join is performed in two steps :
Step 1 : Shuffling: The data from the Join tables are partitioned based on the Join key. It does shuffle the data across partitions to have the same Join keys of the record assigned to the corresponding partitions. Step 2- Hash Join: A classic single node Hash Join algorithm is performed for the data on each partition.
NOTE: To use the Shuffle Hash Join, spark.sql.join.preferSortMergeJoin needs to be false
When to use:
Shuffle hash join works well-
1. when the dataframe are distributed evenly with the keys you are used to join and
2. when dataframes has enough number of keys for parallelism.
Sort Merge Join
Shuffle Sort-merge Join (SMJ) involves shuffling of data to get the same Join key with the same worker, and then performing the Sort-merge Join operation at the partition level in the worker nodes.
NOTE: This is Spark’s default join strategy, Since Spark 2.3 the default value of spark.sql.join.preferSortMergeJoin has been changed to true.
Partitions are sorted on the Join key before the Join operation.
It has 3 phases:
- Shuffle Phase: Both large tables will be repartitioned as per the Join keys across the partitions in the cluster.
- Sort Phase: Sort the data within each partition parallelly.
- Merge Phase: Join the sorted and partitioned data. It is merging the dataset by iterating over the elements and joining the rows having the same value for the Join keys.
Table for the Join strategies Supported by the Join types
In this blog, we learned advanced types of joins supported in apache spark, which help in optimizing the query performance, Apache Spark Optimisation Techniques can process and analyze large datasets very efficiently. The only need to take care of is the scenarios which decide where to fit which type of join.
Hope you learn basics of Joins & its working.
Stay tuned for upcoming blogs