Introduction to Accumulators : Apache Spark


Whats the Problem  : Function like map() , filter() can use variables defined outside them in the driver program but each task running on the cluster gets a new copy of each variable, and updates from these copies are not propagated back to the driver.

The Solution : Spark provides two type of shared variables.

1.    Accumulators
2.    Broadcast variables

Here we are only interesting in Accumulators . if you wants to read about Broadcast variables then you can refer this Blog

Accumulators provides a simple syntax for aggregating values from worker nodes back to the driver program. One of the most common use of Accumulator is count events that may help in debugging process.

Example :-  to understand Accumulators better lets take a example of football.

Div Date HomeTeam AwayTeam FTHG FTAG FTR
D1 14/08/15 Bayern Munich Hamburg 5 0 H
D1 15/08/15 Augsburg Hertha 0 1 A
D1 15/08/15 Darmstadt Hannover 2 2 D
D1 15/08/15 Dortmund M’gladbach 4 0 H

Football.csv

Description :-

Div = League Division
Date = Match Date (dd/mm/yy)
HomeTeam = Home Team
AwayTeam = Away Team
FTHG = Full Time Home Team Goals
FTAG = Full Time Away Team Goals
FTR = Full Time Result (H=Home Win, D=Draw, A=Away Win)

Here we have football data with us. This file is in csv format so all the characters are separated by ‘,’ (comma) symbol.

Problem Statement:- Here we have to get the total match played by a single Team.

Like :- user enters the name of team and the output will be a total number of match played by the that team.

Solution :-  Here we use Accumulator to get the result.

def getTotalMatchCount(teamName: String): Int = {

val sparkConf = new SparkConf().setAppName("Football").setMaster("local")
val sparkContext = new SparkContext(sparkConf)
val dataRDD = sparkContext.textFile("/home/akash/German Football.csv")
 //Accumulator
 val countAccumulator = sparkContext.accumulator[Int](0)

 //Transformation
 val filterData = dataRDD.filter(line => line.split(",")(2) == teamName 
|| line.split(",")(3) == teamName)

 //Action
 filterData.foreach(line => countAccumulator += 1)

 countAccumulator.value
}

In this function firstly i declare the Accumulator with starting value as zero.

then i perform transformation on my data. In transformation  i picked the value of 3rd anf 4th column of the file as this file’s content are separated by  ‘,’ (comma) so i perform split operation on each line.

then in action i increment the value of accumulator

this can be done with better approach like on filter data i can perform the action count() it still gives me same result. but just to show the usage of Accumulator i follow this approach.

Sparks not just only support Int type of Accumulators. Out of the box, Spark supports accumulators of type Double, Long, and Float.

Spark also includes an API to define custom accumulator types and custom aggregation operations. Custom accumulators need to extend AccumulatorParam, which is covered in the Spark API documentation.

References :- Learning Spark By Holden Karau, Andy Konwinski, Patrick Wendell & Matei Zaharia

About Akash Sethi

I am Akash Sethi. I am currently working at Knoldus Software LLP.
This entry was posted in apache spark, Scala, Spark and tagged , , , . Bookmark the permalink.

4 Responses to Introduction to Accumulators : Apache Spark

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s