Broadcast Join in Spark

Apache spark
Reading Time: 1 minute

Hello,
I am assuming here that the reader is familiar with joining two tables in SQL and joining two Dataframe in spark. In case not please see the info about joining two data frames https://www.datacamp.com/community/tutorials/joining-dataframes-pandas

Why do we need Broadcast Join?

Broadcast join in spark is preferred when we want to join one small data frame with the large one. the requirement here is we should be able to store the small data frame easily in the memory so that we can join them with the large data frame in order to boost the performance of the join.

What happens internally

When we call broadcast on the smaller DF, Spark sends the data to all the executor nodes in the cluster. Once the DF is broadcasted, Spark can perform a join without shuffling any of the data in the large DataFrame. We will see the sample code in the following lines

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object BroadcastJoin extends App{

  val spark = SparkSession.builder()
    .appName("blog.knoldus.com")
    .master("local")
    .getOrCreate()

  import spark.implicits._

  //  creating the employee directory which is bigger dataframe
  val employeeDF = Seq(
    ("Amit", "Bangalore"),
    ("Ankit", "California"),
    ("Abdul", "Pune"),
    ("Sumit", "California"),
    ("Riya", "Pune")
  ).toDF("first_name", "city")

  //  creating the citiesDf which is small df that will be broadcasted
  val citiesDF = Seq(
    ("California", "Usa"),
    ("Bangalore", "india"),
    ("Pune", "India")
  ).toDF("city", "country")

//  Now we will perform the join operation on employeeDF with broadcasted citiesDF

  var joinedDf = employeeDF.join(broadcast(citiesDF), employeeDF.col("city") === citiesDF.col("city"))
  
//  Now we will drop the city column from citiesDF as we don't want to keep duplicate column 
  joinedDf = joinedDf.drop(citiesDF.col("city"))
  
//  Finally we will see the joinedDF
  joinedDf.show()

}

Limitation:

We should only broadcast a smaller data frame so that it can fit in the memory of the executor node.

We should avoid broadcast join when both of the data frames are bigger.

knoldus

Written by 

Just another person who has some good exposure to Data Engineering. Scala | Spark | AKKA | Kafka