Demystifying Asynchronous Actions in Spark


What if we want to execute 2 actions concurrently on different RDD’s, Spark actions are always synchronous. Like if we perform two actions one after other they always execute in sequentially like one after other.

Let see example

val rdd = sc.parallelize(List(32, 34, 2, 3, 4, 54, 3), 4)
rdd.collect().map{ x => println("Items in the lists:" + x)}
val rddCount = sc.parallelize(List(434, 3, 2, 43, 45, 3, 2), 4)
println("Number of items in the list" + rddCount.count())

In the above exmaple 2 actions are perform one after other collect and count, both are execute synchronous. So count will always execute after collect will finish. The out of the above code is as follows

Screenshot from 2015-10-21 12:36:04

Now question is if we want to run spark jobs concurrently in async fashion.

So for above question answer is simple apache spark also provide a asyn action for concurrent execution of jobs, Few Asynchronous actions spark provide as follows

collectAsync() -> Returns a future for retrieving all elements of this RDD.
countAsync() -> Returns a future for counting the number of elements in the RDD.
foreachAsync(scala.Function1<T,scala.runtime.BoxedUnit> f) -> Applies a function f to all elements of this RDD.
foreachPartitionAsync(scala.Function1<scala.collection.Iterator,scala.runtime.BoxedUnit> f) ->
Applies a function f to each partition of this RDD.
takeAsync(int num) -> Returns a future for retrieving the first num elements of the RDD.

Now let us see what happen when we use async actions.

val rdd = sc.parallelize(List(32, 34, 2, 3, 4, 54, 3), 4)
rdd.collectAsync().map{ x => x.map{x=> println("Items in the list:"+x)} }
val rddCount = sc.parallelize(List(434, 3, 2, 43, 45, 3, 2), 4)
rddCount.countAsync().map { x =>println("Number of items in the list: "+x) }

So output of the above code is as follows

Screenshot from 2015-10-21 13:23:27

You can see in above output the result of the second job is come first because first job return future and execute second one but still have you noticed that jobs are execute one after other that’s means a job use all resources of cluster so another job will delayed.

So for take full advantage of Asynchronous jobs we need to configure job scheduler.

Job Scheduling

By default spark scheduler run spark jobs in FIFO (First In First Out) fashion. In FIFO scheduler the priority is given to the first job and then second and so on. If the jobs is not using whole cluster then second job is also run parallel but if first job is too big then second job will wait soo long even it take too less to execute. So for solution spark provide fair scheduler, fair scheduler jobs will execute in “round robin” fashion.

To configure job scheduler we need to set configuration for it as follows

val conf = new SparkConf().setAppName("spark_auth")
.setMaster("local[*]").set("spark.scheduler.mode", "FAIR")

After configure FAIR scheduling you can see both the jobs are running concurrently and share resources of the spark cluster.

So after this the out of the above code is as follows

Screenshot from 2015-10-21 13:35:53

You can see in above result both jobs are running concurrently. The result of both the actions are not wait for each other.

For above code you can checkout: https://github.com/knoldus/spark-scala-async

About sandeep

I m working as an software consultant in Knoldus Software LLP . I m working on scala, play, spark,hive, hdfs, hadoop and many big data technologies.
This entry was posted in apache spark, Scala, Spark and tagged , , , . Bookmark the permalink.

9 Responses to Demystifying Asynchronous Actions in Spark

  1. Nice, but remember to transform before collect or use foreach directly.

  2. sandeep says:

    Yes if we need a transformation then we can do transformation but here i want to show async actions so here i dont think to show transformations.

  3. Pingback: Demystifying Asynchronous Actions in Spark | sandeepknol

  4. Pingback: Asynchronous Actions in Spark | Applied Math Bytes

  5. Nice post. Thanks for sharing.

  6. Hello Sandeep,am trying to run your code,but it throws error,saying

    17/06/19 11:15:35 INFO TaskSchedulerImpl: Cancelling stage 0
    17/06/19 11:15:35 INFO DAGScheduler: ResultStage 0 (collectAsync at test.scala:20) failed in Unknown s due to Job aborted due to stage failure: Task serialization failed: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
    This stopped SparkContext was created at:

    org.apache.spark.SparkContext.(SparkContext.scala:76)
    test.test$.main(test.scala:11)
    test.test.main(test.scala)

    The currently active SparkContext was created at:

    org.apache.spark.SparkContext.(SparkContext.scala:76)
    test.test$.main(test.scala:11)
    test.test.main(test.scala)

    java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
    This stopped SparkContext was created at:

    org.apache.spark.SparkContext.(SparkContext.scala:76)
    test.test$.main(test.scala:11)
    test.test.main(test.scala)

    The currently active SparkContext was created at:

    org.apache.spark.SparkContext.(SparkContext.scala:76)
    test.test$.main(test.scala:11)
    test.test.main(test.scala)

    at org.apache.spark.SparkContext.assertNotStopped(SparkContext.scala:100)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1407)
    at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:996)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:918)
    at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:862)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1613)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

    but if i use,
    println(“synchronous count”+rdd.count())
    it runs without errors.Can you guide me on this.
    Thank you.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s