The Async library in scala


OVERVIEW

In this blog we will explore the ‘Async’ library and will see how it is actually implemented; with some understanding of macros and reflection from the previous blog module [ HIT ME ] our voyage will be smooth and prosperous.

The Async Library ->

Async is an object with only two methods namely ‘async()’ and ‘await()’. The main use case of Async object is to conveniently and efficiently code in asynchronous way. It can be considered as another alternative of low-level callbacks or high order functions like map and flat-map. There could be complex scenarios when multiple Futures(Dont know Futures? refer this blog  on Future) are to be used to compute a final result, in such cases the Async library can be a Savior as it increase the readability and simplicity of the code and reduces code complexity for the programmer though internally the Async library is implemented using callbacks and our beloved macros. One important thing to note is that like the Future, the Async library also requires an execution context which is associated with a thread-pool. Now let us explore the methods of the Async library and understand how they are implemented

a). async() method – This method marks the beginning of the asynchronous code block. As the async construct is encountered, the execution of the code enclosed in async scope is either execute on the current thread or a thread from the thread pool. Inside the async scope we generally call the time consuming I/O operations like call to a web service, fetching content of the file, fetching data from the databases like elastic search etc. The code in the ‘async’ scope is executed synchronously. By drilling down ‘async’ we get :-

def async[T](body: T)(implicit execContext: ExecutionContext): Future[T] = macro internal.ScalaConcurrentAsync.asyncImpl[T]

From the above expression it can be concluded that ‘async’ is nothing but a macro which returns a Future value and is implemented by ‘asyncImpl’ method of ‘ScalaConcurrentAsync’ object of package ‘internal’.
Here,
‘body’ is the code enclosed in scope of ‘async’ construct
‘execContext’ is the implicit (default) context associated with ‘ForkJoin’ thread-pool.
b). await() method – As the name suggests, this method waits (silly). ‘await’ method actually waits for a future to complete its execution and can only be used inside a ‘async’ block but with some limitations(discussed below). The call to ‘await’ method are translated into the call to ‘onComplete’ of the associated Future by the async macro. By drilling down ‘await’ we get :-

def await[T](awaitable: Future[T]): T = ???

‘await’ takes a Future value as a parameter and returns the result of the Future as its output and as described above, the implementation of await is driven from the ‘onComplete’ method of the associated Future. First we will see an example on ‘async’, ‘await’ and then we will see how the stuff behave at compile time and run time.

Example ->

Now let us consider a simple application in which first and last name of an employee are read from 2 files (assuming I/O operation is a bit time consuming), and ‘async’ and ‘await’ are used to asynchronously perform the operation.

Gears for the application :-
1). Build Tool – Simple Build Tool (SBT)
2). IDE – IntelliJ Idea
3). Language – Scala

Build.Sbt dependencies :-

1). libraryDependencies += “org.scala-lang.modules” %% “scala-async” % “0.9.5”

It includes the async package in Scala thus enabling to use the Async library.

2.) libraryDependencies += “org.scalatest” %% “scalatest” % “2.2.5”

It is added for unit tests of the application.

3). libraryDependencies += “org.apache.commons” % “commons-io” % “1.3.2”

It is added to easily do the file I/O operations of reading the first and the last name from the files.

Now considering the code to return the full name of the employee :-

import java.io.File
import org.apache.commons.io.FileUtils
import scala.async.Async.{async,await}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

private def fetchFirstName(folder: String, file: String): Future[String] = {

async {

val first: String = FileUtils.readFileToString(new File(folder + "" + file), "utf-8").trim
longRunningTask() //Assume that fetching the content from file is a long running task
first
}
}

def fetchEmployeeName(folder: String): Future[String] = {

val fullName: Future[String] = async {
//The code below will spawn 2 different threads for the methods returning future results
val firstName: Future[String] = fetchFirstName(folder, "firstNameFile")
val lastName: Future[String] = fetchLastName(folder, "lastNameFile")
val result: String = await(firstName) +" "+ await(lastName)
result
}
fullName
}

In ‘fetchEmployeeName’ method, call to ‘fetchFirstName’ and ‘fetchLastName’ methods is a time consuming operation so it is enclosed inside the ‘async’ scope. The ‘await’ method waits for the Future to end and return corresponding first name and last name of the employee. The first name and last name values are then concatenated. Finally the ‘async’ method returns the Future of full name which may be a Success or Failure. The test case(using FunSuite) for the application is :

import org.apache.commons.io.FileUtils
import org.scalatest.FunSuite
import scala.concurrent.{Await, Future}
import scala.concurrent.duration._

test("Name fetched successfully") {

val folderName = "./src/test/resources";
val futureResult: Future[String] = asyncAwaitSimplified.fetchEmployeeName(folderName)
val first: String = FileUtils.readFileToString(new File(folderName + "/" + "firstNameFile"), "utf-8").trim
val last: String = FileUtils.readFileToString(new File(folderName + "/" + "lastNameFile"), "utf-8").trim
val actualResult: String = Await.result(futureResult, 10 seconds)
val expectedResult: String = first + " " + last
assert(actualResult === expectedResult)
}

The code above is easily readable and not at all complex, this is the advantage of using the Async library though it has some limitation of use which are explained with examples below :

1). ‘await’ requires a directly enclosing ‘async’ block

exp1

2). ‘await’ can not be used inside a nested object, trait, or class inside ‘async’

exp2
3). ‘await’ must not be used inside a Boolean short-circuit argument(&& is a Boolean short-circuit operator).

exp3
4). Return expressions are illegal inside an ‘async’ block.

exp4
5). ‘await’ must not be used inside an expression passed as an argument to a by-name parameter.

exp5

All the above uses of await are illegal and are reported as errors. In following section we will discuss how the Async library is actually implimented , what all transformations the code undergoes and what actually happens at the compile time and run time.
The code inside the async scope undergoes 2 levels of transformation :-

a). ‘A-Normal Form'(ANF) transformation ->

The code enclosed in the ‘async’ scope is transformed into state machine by the ‘async’ macro implimentation. But to do so the code must innitialy be normalized to a form that can further be transformed into a state machine. This intermediate normalized code is called ANF. The rules governing the formation of ANF are as follows :

I). All control flow constructs like ‘if’ and ‘match’ can be used as statements not as expression ie ‘if’ and ‘match’ should not return a value(statement) and instead compiler fabricated variable can be used to store the result of the actual ‘if’ and ‘match’ expressions.

II). Calls to ‘await’ are not allowed in compound expressions. It means that all the calls to the ‘await’ methods are removed from the code and instead compiler fabricated variables are used to store the result returned by call to the ‘await’ method.

Example :-

Actual code

val result = async {
val num1 = async {2}
val num2 = async{3}
if (await(num1) + await(num2) == 5){5}
else {-1}
}

ANF normalization of the actual code :-


{
val num1: scala.concurrent.Future[Int] = {
scala.concurrent.Future.apply[Int](2)(scala.concurrent.ExecutionContext.Implicits.global)
};

val num2: scala.concurrent.Future[Int] = {
scala.concurrent.Future.apply[Int](3)(scala.concurrent.ExecutionContext.Implicits.global)
};

val await$1: Int = scala.async.Async.await[Int](num1);

val await$2: Int = scala.async.Async.await[Int](num2);

var ifres$1: Int = 0; // await$1, await$2 and ifres$1 are the compiler generated variables

if( await$1 + await$2 == 5){
ifres$1 = 5
}
else{
ifres$1 = -1
}

ifres$1

}

b). The full async transform ->

The code after ANF normalization is now ready to be transformed into a class representing its state machine. Following things are taken care of while performing the final async transform :-

I). A class is synthesized that represents the state machine(by extending from StateMachine), its apply() method starts the evaluation of the async block’s body.

II). An ‘apply(value: Try[Any]): Unit’ method that will be called on completion of each future.

III). The resume(): Unit method that switches on the current state and runs the users code.

The code assosiated with the full async transformation which is compiler synthesised code is not discussed in details here(litle bit of abstarction is always welcomed :p). Finally, Corrosponding bytecode for the state machine code is generated at the end of the compilation process.
Let us now recapitulate the entire voyage by summing up the events that happen at runtime and compile time in life of ‘async’.

At COMPILE time ->

The code marked under the async scope is transformed into its corrosponding compiler synthesised state machine code by the ‘asyncImpl’ macro implimentation method of the ‘async’ macro from which the bytecode is generated at the end of the compilation process.

At RUN time ->

As the bytecode corrosponding to the async block is approched for its execution(state machine code), a thread from the threadpool is assigned to it and the execution takes place asyncronously.

It can be concluded that Async library provide the user an efficient and simple way to programm asynchronously.

GitLink of the code –  https://github.com/knoldus/async-await-example

References ->

1). http://docs.scala-lang.org/sips/pending/async.html

2). http://docs.scala-lang.org/overviews/reflection/overview.html

3). http://docs.scala-lang.org/overviews/macros/overview.html

Happy coding …….

This entry was posted in Scala. Bookmark the permalink.

One Response to The Async library in scala

  1. Pingback: Getting Asynchronous in Scala : Part 2 (macro, reflection and Async library) | Knoldus

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s