Whats the Problem : Function like map() , filter() can use variables defined outside them in the driver program but each task running on the cluster gets a new copy of each variable, and updates from these copies are not propagated back to the driver.
The Solution : Spark provides two type of shared variables.
1. Accumulators
2. Broadcast variables
Here we are only interesting in Accumulators . if you wants to read about Broadcast variables then you can refer this Blog
Accumulators provides a simple syntax for aggregating values from worker nodes back to the driver program. One of the most common use of Accumulator is count events that may help in debugging process.
Example :- to understand Accumulators better lets take a example of football.
Div | Date | HomeTeam | AwayTeam | FTHG | FTAG | FTR |
D1 | 14/08/15 | Bayern Munich | Hamburg | 5 | 0 | H |
D1 | 15/08/15 | Augsburg | Hertha | 0 | 1 | A |
D1 | 15/08/15 | Darmstadt | Hannover | 2 | 2 | D |
D1 | 15/08/15 | Dortmund | M’gladbach | 4 | 0 | H |
Football.csv
Description :-
Div = League Division
Date = Match Date (dd/mm/yy)
HomeTeam = Home Team
AwayTeam = Away Team
FTHG = Full Time Home Team Goals
FTAG = Full Time Away Team Goals
FTR = Full Time Result (H=Home Win, D=Draw, A=Away Win)
Here we have football data with us. This file is in csv format so all the characters are separated by ‘,’ (comma) symbol.
Problem Statement:- Here we have to get the total match played by a single Team.
Like :- user enters the name of team and the output will be a total number of match played by the that team.
Solution :- Here we use Accumulator to get the result.
def getTotalMatchCount(teamName: String): Int = { val sparkConf = new SparkConf().setAppName("Football").setMaster("local") val sparkContext = new SparkContext(sparkConf) val dataRDD = sparkContext.textFile("/home/akash/German Football.csv") //Accumulator val countAccumulator = sparkContext.accumulator[Int](0) //Transformation val filterData = dataRDD.filter(line => line.split(",")(2) == teamName || line.split(",")(3) == teamName) //Action filterData.foreach(line => countAccumulator += 1) countAccumulator.value }
In this function firstly i declare the Accumulator with starting value as zero.
then i perform transformation on my data. In transformation i picked the value of 3rd anf 4th column of the file as this file’s content are separated by ‘,’ (comma) so i perform split operation on each line.
then in action i increment the value of accumulator
this can be done with better approach like on filter data i can perform the action count() it still gives me same result. but just to show the usage of Accumulator i follow this approach.
Sparks not just only support Int type of Accumulators. Out of the box, Spark supports accumulators of type Double, Long, and Float.
Spark also includes an API to define custom accumulator types and custom aggregation operations. Custom accumulators need to extend AccumulatorParam, which is covered in the Spark API documentation.
References :- Learning Spark By Holden Karau, Andy Konwinski, Patrick Wendell & Matei Zaharia
Reblogged this on knoldermanish.
Reblogged this on pushpendupurkait.
Reblogged this on Play!ng with Scala.
Reblogged this on Rishi Khandelwal.