Shared Variables in Distributed Computing

Reading Time: 4 minutes

In a general spark computation code, we define some variables and then some methods to manipulate them. Running a job on a cluster, spark ensures that every node in the cluster receives its own copy of these methods and variables. If a node updates the variable value, other nodes, including the driver, remain unaware of the changes made. Most of the time this behavior is acceptable, but there might be instances when we want a node’s updated variable to be shared with other nodes in the cluster. To cater to such use cases, Apache Spark provides a concept of Shared Variables in Distributed Computing.

Since it would be inefficient to support general, read-write shared variables across tasks, Spark provides some alternate options. It offers two types of shared variables in it’s distributed computing environment: Broadcast Variables and Accumulators.

Distributed Caching with Broadcast Variables

Broadcast Variables are read-only variables that are cached in a serialized format in all executor nodes of a cluster. Since they are already cached, they are not shipped every time a task computes on them. Hence, this reduces network I/O involved in the spark job.

Broadcast Variables

Consider a use case where we have a mapping of pincodes and their respective cities. We want to use this predefined mapping to find cities associated with an input list of pincodes.

A code without the use of broadcast variables look like this:

//Mapping of pincode to city
>>> val pincode_mapping = Map(11 -> "Delhi",
      12 -> "Haryana",
      14 -> "Punjab",
      17 -> "Himachal Pradesh",
      18 -> "Jammu",
      19 -> "Kashmir")

>>> val list_of_pincodes: List[Int] = List(14, 19, 18, 18, 12, 17, 11, 19, 17, 19, 19)

// RDD of input pincodes
>>> val rdd_of_pincodes: RDD[Int] = spark.sparkContext.parallelize(list_of_pincodes)

>>> val rdd_of_cities = rdd_of_pincodes.map(pincode => pincode_mapping(pincode))
>>> print(rdd_of_cities.collect().mkString("\n"))

while the code to implement the same using broadcast variables is:

// Convert the lookup table to broadcast
//  Cache the mapping of pincodes to cities in every executor.
>>> val pincode_mapping_broadcast: Broadcast[Map[Int, String]] = spark.sparkContext.broadcast(pincode_mapping)

// Access value from broadcast variable
>>> val rdd_of_cities: RDD[String] = rdd_of_pincodes.map(pincode => pincode_mapping_broadcast.value.get(pincode).get)

So, you can create a Broadcast variables using the code:

val broadcast_value = spark.sparkContext.broadcast(value)

and to access it’s value, use the following code:

val actual_value = broadcast_value.value

broadcast() call does not send these broadcast variables to the executors, but their first execution sends them.

Working of Broadcast variables

When we run a spark job containing Broadcast variables, spark does the following processes:

  • It breaks the job into stages that have distributed shuffling. Spark executes the Actions within the stage.
  • Later Stages are also broken into tasks
  • Spark broadcasts the common data (reusable) needed by tasks within each stage.
  • The broadcasted data is cached in serialized format and deserialized before executing each task.

While running spark in local mode and with a small dataset, the run time improvements with Broadcast Variables are not visible. But as the code breaks down into multiple tasks, all accessing the same large dataset, the use case for broadcast variables come into play.

Accumulate Results using Accumulators

Another set of Shared Variables in Distributed Computing is called Accumulators. While Broadcast Variables are read-only, Spark Accumulators are read-write values that implement shared variables that can be operated on (added to), from various tasks running as a part of the job.

Accumulators can help with a number of potential use cases including for example counting the number of failed records across a complete cluster, the total number of records associated with a product ID, or the number of basket check-outs in a window.

Accumulators

Spark’s out of the box concept of Accumulators allow multiple workers to write to a shared variable, but does not allow them to read it. Only the Driver node can read the accumulator’s value using .value with the name of the accumulator.

To initialise a numeric accumulator with an initial value, use the following code:

// Initialise an accumulator with name "Failed records accumulator"
>>> val failed_records_acc = sc.longAccumulator("Failed records accumulator")

Accumulators are “added” to through an associative and commutative operation only and thus, they can be efficiently supported in parallel. So, to increment the value of an accumulator, use the following code:

....
>>> records.map{
//update the value of the accumulator
if(map.failed) failed_records_acc += 1 
}
.....

Spark not only support numeric accumulator, but also allow the programmers to create their own types by subclassing AccumulatorV2 abstract class and implementing its various methods.

/**
 * A custom accumulator for string concatenation
 */
class StringAccumulator(private var _value: String) extends AccumulatorV2[String, String] {
 
    def this() {
        this("")
    }
    //Accumulates the input to the current value
    override def add(newValue: String): Unit = {
        _value = value + " " + newValue.trim
    }
    //Merge another same type accumulator into the current one
    override def merge(other: AccumulatorV2[String, String]): Unit = {
        add(other.value)
    }
    //Resets the accumulator
    override def reset(): Unit = {
        _value = ""
    }
}

Caveats of Accumulators

When using accumulators there are some caveats that we as programmers need to be aware of,

  1. Spark evaluates Computations inside transformations lazily. As a result, unless some action happens, spark does not execute accumulators inside functions like map() or filter().
  2. Spark guarantees to update accumulators inside actions only once. So, restarting a task and recomputing the lineage updates the accumulators only once.
  3. Spark does not guarantee this for transformations. So, restarting a task and recomputing the lineage might update the accumulator value more than once.

A good rule of thumb to follow is to use accumulators only for data you would consider to be a side effect of your main data processing application.

Knoldus-blog-footer-image