Spark Unconstructed | Deep dive into DAG

kafka with spark

Apache Spark is all the rage these days. People who work with Big Data, Spark is a household name for them. We have been using it for quite some time now. So we already know that Spark is lightning-fast cluster computing technology, it is faster than Hadoop MapReduce.
If you ask any of these Spark techies, how Spark is fast, they would give you a vague answer by saying Spark uses DAG to carry out the in-memory computations.
So, how far is this answer satisfiable?
Well to a Spark expert, this answer is just equivalent to a poison.

Let’s try to understand how exactly spark is handling our computations through DAG.

DAG: Directed Acyclic Graph

DAG is a much-recognized term in Spark. It describes all the steps through which our data is being operated. In one line “DAG is a graph denoting the sequence of operations that are being performed on the target RDD”. DAG in Apache Spark is a set of Vertices and Edges, where vertices represent the RDDs and the edges represent the Operation to be applied on RDD.
When an action is called on Spark RDD at a high level, DAG is created and is submitted to the DAG scheduler.

DAGScheduler

DAG is created by the DAGScheduler. Now, what is it?

DAGScheduler is the high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run the job. It then submits stages as TaskSets to an underlying TaskScheduler implementation that runs them on the cluster.

It transforms a logical execution plan (i.e. RDD lineage of dependencies built using RDD transformations) to a physical execution plan (using stages).

But before that let’s understand DAGScheduler’s two fundamental concepts: Jobs and Stages.

Jobs

A job is a top-level work item (computation). When an action is called the processing gets started and a Job is created which is then submitted to DAGScheduler to be computed.

Stages

A stage is a physical unit of execution. It is a step in a physical execution plan.
A stage is a set of parallel tasks — one task per partition (of an RDD that computes partial results of a function executed as part of a Spark job).
In other words, a Spark job is a computation with that computation sliced into stages.

There are two types of Stages:

  1. ResultStage
    A ResultStage is the final stage in a job that applies a function to one or many partitions of the target RDD to compute the result of an action.
  2. ShuffleMapStage
    ShuffleMapStage is an intermediate stage in the physical execution DAG that corresponds to a ShuffleDependency. A ShuffleMapStage may contain multiple pipelined operations, e.g. map and filter, before shuffle operation.

Workflow

Whenever an action is called over an RDD, it is submitted as an event of type DAGSchedulerEvent by the Spark Context to DAGScheduler. It is submitted as a JobSubmitted case.

The first thing done by DAGScheduler is to create a ResultStage which will provide the result of the spark job which is submitted.
Now to execute the submitted job, we need to find out on which operation our RDD is based on. So backtracking begins.

In backtracking, when we find that the current operation is dependent of a shuffle operation (this is called a shuffle dependency) a new stage is created (shuffleMapStage) which is placed before the current stage (Result stage).

This new stage’s output will be the input to our ResultStage. And the backtracking continues.
If we find another shuffle operation happening then again a new shuffleMapStage will be created and will be placed before the current stage (also a ShuffleMapStage) and the newly created shuffleMapStage will provide an input to the current shuffleMapStage.

Hence all the intermediate stages will be ShuffleMapStages and the last one will always be a ResultStage.

In other words, there will only be one ResultStage and can be any number of ShuffleMapStages in between (from 0 to n depending on the number of shuffle operation in an RDD)

Screenshot from 2018-07-31 10-23-46

Why a new stage is formed when there is shuffling of data?

DAGScheduler splits up a job into a collection of stages.

Each stage contains a sequence of narrow transformations (that can be completed without shuffling the entire data set) separated at shuffle boundaries, i.e. where shuffle occurs.

Stages are thus a result of breaking the RDD graph at shuffle boundaries.

Shuffle boundaries introduce a barrier where stages/tasks must wait for the previous stage to finish before they fetch map outputs.

There are two advantages of breaking tasks into stages:

  • After every shuffle operation, a new stage is created so that whenever data is lost due to shuffle(network I/O) only the previous stage will be calculated for fault tolerance.
  • For executing operations in one go Spark groups the operation which doesn’t need to share data between executors (when one partition requires the data from another partition to complete some operation like groupBy).

So, after DAGScheduler has done its work of converting this job into stages, it hands over the stage to TaskScheduler for its execution which will do the rest of the computation.

I hope it helped you understand DAG in depth and if you have any questions you can ask in the comments box below. 🙂

References:

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-dagscheduler.html

 


knoldus-advt-sticker

 

Written by 

Ramandeep Kaur is a Software Consultant, having experience of more than 1.5 years. She is a Java enthusiast and has knowledge of languages like C, C++, C#, and Scala. She is familiar with Object Oriented Programming Paradigms and also has a great interest in relational database technologies. Her hobbies include reading novels and listening music.

Leave a Reply

%d bloggers like this: