Is using Accumulators really worth ? Apache Spark

Before jumping right into the topic you must know what Accumulators are ?

for that you can refer this blog.

Now we know what and why of Accumulators lets jump to the main point.

Description :- Spark automatically deals with failed or slow machines by re-executing failed or slow tasks.

Example :- if the node running a partition of a map() operation crashes,
Spark will rerun it on another node; and even if the node does not crash but is simply
much slower than other nodes, Spark can preemptively launch a “speculative” copy
of the task on another node, and take its result if that finishes.

Even if no nodes fail,Spark may have to rerun a task to rebuild a cached value that falls out of memory.The net result is therefore that the same function may run multiple times on the same data depending on what happens on the cluster.

Accumulators used in actions, Spark applies each task’s update to each accumulator only once. Thus, if we want a reliable absolute value counter, regardless of failures or multiple evaluations, we must put it inside an action like foreach().

An accumulator update within a transformation can occur more than once.While future versions of Spark may change this behavior to count the update only once.

now Lets take an Example for better  understanding

Example :- 

Div Date HomeTeam AwayTeam FTHG FTAG FTR
D1 14/08/15 Bayern Munich Hamburg 5 0 H
D1 15/08/15 Augsburg Hertha 0 1 A
D1 15/08/15 Darmstadt Hannover 2 2 D
D1 15/08/15 Dortmund M’gladbach 4 0 H

Football.csv

Description :-

Div = League Division
Date = Match Date (dd/mm/yy)
HomeTeam = Home Team
AwayTeam = Away Team
FTHG = Full Time Home Team Goals
FTAG = Full Time Away Team Goals
FTR = Full Time Result (H=Home Win, D=Draw, A=Away Win)

Here we have football data with us. This file is in csv format so all the characters are separated by ‘,’ (comma) symbol.

Problem Statement:- Here we have to get the total match played by a single Team.

Like :- user enters the name of team and the output will be a total number of match played by the that team.

Solution :-  Here we use Accumulator to get the result.

def getTotalMatchCount(teamName: String): Int = {

  val countAccumulator = sparkContext.accumulator[Int](0)

  //Transformation
  val mapData = dataRDD.map { line => if (line.split(",")(2) == teamName
    || line.split(",")(3) == teamName) {
    countAccumulator += 1
  }
    line
  }
  //Action
  mapData.count().toInt

  countAccumulator.value
}

In this function firstly i declare the Accumulator with starting value as zero.

then i perform transformation on my data. In transformation  i picked the value of 3rd anf 4th column of the file as this file’s content are separated by  ‘,’ (comma) so i perform split operation on each line. for each line that match the condition i incremented the value of Accumulator.

What went wrong here is if Spark  re-run the working again weather due to failure or slow processing thats why we need to care about the Accumulators.

Here i design this method like if you remove the accumulator part then the method still work. i does this to perform some Action.

References :- Learning Spark By Holden Karau, Andy Konwinski, Patrick Wendell & Matei Zaharia

Written by 

Rachel Jones is a Solutions Lead at Knoldus Inc. having more than 22 years of experience. Rachel likes to delve deeper into the field of AI(Artificial Intelligence) and deep learning. She loves challenges and motivating people, also loves to read novels by Dan Brown. Rachel has problem solving, management and leadership skills moreover, she is familiar with programming languages such as Java, Scala, C++ & Html.

Leave a Reply

%d bloggers like this: