Introduction to Accumulators : Apache Spark

Table of contents
Reading Time: 2 minutes

Whats the Problem  : Function like map() , filter() can use variables defined outside them in the driver program but each task running on the cluster gets a new copy of each variable, and updates from these copies are not propagated back to the driver.

The Solution : Spark provides two type of shared variables.

1.    Accumulators
2.    Broadcast variables

Here we are only interesting in Accumulators . if you wants to read about Broadcast variables then you can refer this Blog

Accumulators provides a simple syntax for aggregating values from worker nodes back to the driver program. One of the most common use of Accumulator is count events that may help in debugging process.

Example :-  to understand Accumulators better lets take a example of football.

Div Date HomeTeam AwayTeam FTHG FTAG FTR
D1 14/08/15 Bayern Munich Hamburg 5 0 H
D1 15/08/15 Augsburg Hertha 0 1 A
D1 15/08/15 Darmstadt Hannover 2 2 D
D1 15/08/15 Dortmund M’gladbach 4 0 H

Football.csv

Description :-

Div = League Division
Date = Match Date (dd/mm/yy)
HomeTeam = Home Team
AwayTeam = Away Team
FTHG = Full Time Home Team Goals
FTAG = Full Time Away Team Goals
FTR = Full Time Result (H=Home Win, D=Draw, A=Away Win)

Here we have football data with us. This file is in csv format so all the characters are separated by ‘,’ (comma) symbol.

Problem Statement:- Here we have to get the total match played by a single Team.

Like :- user enters the name of team and the output will be a total number of match played by the that team.

Solution :-  Here we use Accumulator to get the result.

def getTotalMatchCount(teamName: String): Int = {

val sparkConf = new SparkConf().setAppName("Football").setMaster("local")
val sparkContext = new SparkContext(sparkConf)
val dataRDD = sparkContext.textFile("/home/akash/German Football.csv")
 //Accumulator
 val countAccumulator = sparkContext.accumulator[Int](0)

 //Transformation
 val filterData = dataRDD.filter(line => line.split(",")(2) == teamName 
|| line.split(",")(3) == teamName)

 //Action
 filterData.foreach(line => countAccumulator += 1)

 countAccumulator.value
}

In this function firstly i declare the Accumulator with starting value as zero.

then i perform transformation on my data. In transformation  i picked the value of 3rd anf 4th column of the file as this file’s content are separated by  ‘,’ (comma) so i perform split operation on each line.

then in action i increment the value of accumulator

this can be done with better approach like on filter data i can perform the action count() it still gives me same result. but just to show the usage of Accumulator i follow this approach.

Sparks not just only support Int type of Accumulators. Out of the box, Spark supports accumulators of type Double, Long, and Float.

Spark also includes an API to define custom accumulator types and custom aggregation operations. Custom accumulators need to extend AccumulatorParam, which is covered in the Spark API documentation.

References :- Learning Spark By Holden Karau, Andy Konwinski, Patrick Wendell & Matei Zaharia

Written by 

Principal Architect at Knoldus Inc

4 thoughts on “Introduction to Accumulators : Apache Spark2 min read

Comments are closed.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading