Is using Accumulators really worth ? Apache Spark


Before jumping right into the topic you must know what Accumulators are ?

for that you can refer this blog.

Now we know what and why of Accumulators lets jump to the main point.

Description :- Spark automatically deals with failed or slow machines by re-executing failed or slow tasks.

Example :- if the node running a partition of a map() operation crashes,
Spark will rerun it on another node; and even if the node does not crash but is simply
much slower than other nodes, Spark can preemptively launch a “speculative” copy
of the task on another node, and take its result if that finishes.

Even if no nodes fail,Spark may have to rerun a task to rebuild a cached value that falls out of memory.The net result is therefore that the same function may run multiple times on the same data depending on what happens on the cluster.

Accumulators used in actions, Spark applies each task’s update to each accumulator only once. Thus, if we want a reliable absolute value counter, regardless of failures or multiple evaluations, we must put it inside an action like foreach().

An accumulator update within a transformation can occur more than once.While future versions of Spark may change this behavior to count the update only once.

now Lets take an Example for better  understanding

Example :- 

Div Date HomeTeam AwayTeam FTHG FTAG FTR
D1 14/08/15 Bayern Munich Hamburg 5 0 H
D1 15/08/15 Augsburg Hertha 0 1 A
D1 15/08/15 Darmstadt Hannover 2 2 D
D1 15/08/15 Dortmund M’gladbach 4 0 H

Football.csv

Description :-

Div = League Division
Date = Match Date (dd/mm/yy)
HomeTeam = Home Team
AwayTeam = Away Team
FTHG = Full Time Home Team Goals
FTAG = Full Time Away Team Goals
FTR = Full Time Result (H=Home Win, D=Draw, A=Away Win)

Here we have football data with us. This file is in csv format so all the characters are separated by ‘,’ (comma) symbol.

Problem Statement:- Here we have to get the total match played by a single Team.

Like :- user enters the name of team and the output will be a total number of match played by the that team.

Solution :-  Here we use Accumulator to get the result.

def getTotalMatchCount(teamName: String): Int = {

  val countAccumulator = sparkContext.accumulator[Int](0)

  //Transformation
  val mapData = dataRDD.map { line => if (line.split(",")(2) == teamName
    || line.split(",")(3) == teamName) {
    countAccumulator += 1
  }
    line
  }
  //Action
  mapData.count().toInt

  countAccumulator.value
}

In this function firstly i declare the Accumulator with starting value as zero.

then i perform transformation on my data. In transformation  i picked the value of 3rd anf 4th column of the file as this file’s content are separated by  ‘,’ (comma) so i perform split operation on each line. for each line that match the condition i incremented the value of Accumulator.

What went wrong here is if Spark  re-run the working again weather due to failure or slow processing thats why we need to care about the Accumulators.

Here i design this method like if you remove the accumulator part then the method still work. i does this to perform some Action.

References :- Learning Spark By Holden Karau, Andy Konwinski, Patrick Wendell & Matei Zaharia

Advertisements

About Akash Sethi

I am Akash Sethi. I am currently working at Knoldus Software LLP.
This entry was posted in apache spark, Scala, Spark and tagged , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s