Power of Parallel Processing in Akka


If you have been following the Inphina blog and our work with Scala related technologies, you would have noticed a spurt in our blog feeds. The reason being that we have recently come out of a very successful massively scalable framework created with Akka. Inphina holds an active stake in the product and the details of the framework would follow once the embargo is over. Meanwhile, in this post I would like to share some wins that we achieved with Akka.

For the matter of benchmarks look at the graph below. As of now we are able to process over 2 million messages coming on the RabbitMQ server in a matter of less than 100 seconds. You might argue that this number does not tell you anything unless we know what we are processing. Fair point, but in this post we are not debating that. What we are trying to show is that once you have a lot of Akka actors running in parallel under a master worker kind of relationship, you can do wonders on the number of cores that you have.

The bisecting line that you see in the graph is the one that we got when we had our Akka actors listening to the queue with the help of AMQP client. See the earlier post. On the flick of a switch you can go parallel in the framework or hybrid as the case may be depending on your messages. When we switched to full parallel, we had actors consuming messages from the queue which would in-turn hand off the message to a Master Actor which would spawn Child Actors. This is based on the standard master-worker relationship. We are launching the number of workers either based on a configuration depending on the number of cores on which the framework is deployed or on the basis of the number of plugins which need to be launched in parallel.

Let us look at some code now, following is the process message of the Master Actor

class ParallelProcessor extends Actor {
…
...
 def receive = {
 case MessageFromQueue(message) => process(message)

 case ChildMessage(plugin, message) =>
 val transformedMessage = processMessage(message, plugin)
      self reply Result(message, plugin.target)

    case Result(message, target) =>
 parallelCounter -= 1
 messageList ++= List(new Result(message, target))

 if (parallelCounter == 0) {
        logger.info("**** REDUCER CALLED ---- ")
 val reducedMessage = reduceMessages(messageList)
 postPluginProcessing(reducedMessage)
 self ! PoisonPill
      }

  }
…
...

def process(message: VajraPMMessage) = {
 val numberOfWorkers = VajraPM.numberOfWorkers
 parallelCounter = currentStep.pluginsInStep.length
 val workers = Vector.fill(fetchWorkerCount(numberOfWorkers, parallelCounter))(actorOf[ParallelProcessor].start)
 val router = Routing.loadBalancerActor(CyclicIterator(workers)).start
    currentStep.pluginsInStep foreach { plugin =>
 router ! new ChildMessage(plugin, transformer.createVajraMessage(message.toXml().toString))
    }
 router ! Broadcast(PoisonPill)
 router ! PoisonPill
  }

...

As you would notice, the master actor (ParallelProcessor) receives a message. The case MessageFromQueue gets called and we call the process(message). As you would notice in the process(message), we spawn a number of child workers which is the same actor ParallelProcessor again.

Let us zoom into the process method, we create a router to handle the even delivery of ChildMessage to the workers. Right now, the router uses a CyclicIterator to distribute messages.

val workers = Vector.fill(fetchWorkerCount(numberOfWorkers))(actorOf[ParallelProcessor].start)
 val router = Routing.loadBalancerActor(CyclicIterator(workers)).start
    currentStep.pluginsInStep foreach { plugin =>
 router ! new ChildMessage(plugin, transformer.createVajraMessage(message.toXml().toString))

An interesting thing that we do here is that once the router has passed all the messages to the child workers, we issue a poisonpill to the child workers so that they can die after processing. We also pass a poison pill to the router.
Caution: If you are using a work stealing dispatcher, please ensure that your poison pills are not being eaten by actors which are yet to receive messages thus causing unintended behavior.

Let us see how the child receives the message, again we have the same receive method

 def receive = {
 case MessageFromQueue(message) => process(message)

 case ChildMessage(plugin, message) =>
 val transformedMessage = processMessage(message, plugin)
      self reply Result(message, plugin.target)

    case Result(message, target) =>
 parallelCounter -= 1
 messageList ++= List(new Result(message, target))

 if (parallelCounter == 0) {
        logger.info("**** REDUCER CALLED ---- ")
 val reducedMessage = reduceMessages(messageList)
 postPluginProcessing(reducedMessage)
 self ! PoisonPill
      }

  }

This time, we fall into the block of case ChildMessage(plugin, message) =>
Here, every child does some processing and passes back the results to the master with the self reply Result(message, plugin.target)

Finally, the results come back to the master, where it keeps a count of the number of results coming back. If you recall, the counter (parallelCounter) was set in the process method where we spawned child workers either on the basis of the number of parallel steps or depending on the number of cores that we have on the machine on which the framework is running.

 def receive = {
 case MessageFromQueue(message) => process(message)

 case ChildMessage(plugin, message) =>
 val transformedMessage = processMessage(message, plugin)
      self reply Result(message, plugin.target)

    case Result(message, target) =>
 parallelCounter -= 1
 messageList ++= List(new Result(message, target))

 if (parallelCounter == 0) {
        logger.info("**** REDUCER CALLED ---- ")
 val reducedMessage = reduceMessages(messageList)
 postPluginProcessing(reducedMessage)
 self ! PoisonPill
      }
  }

The case Result(message, target) => does a reduction logic on all the results coming back from the child workers.

Thus, with the master worker logic we were able to reduce the processing time considerably.

One of the things that we fell into was that we ended up creating a lot of new native threads on our OS as a result of which we were getting limited by the number of native threads which could be used. For this, we used a thread-pool instead of spawning a new thread. See Akka Dispatchers and our blog related on dispatchers. I would cover the threading issue in detail in another post.

Advertisements

About Vikas Hazrati

Vikas is the Founding Partner @ Knoldus which is a group of software industry veterans who have joined hands to add value to the art of software development. Knoldus does niche Reactive and Big Data product development on Scala, Spark and Functional Java. Knoldus has a strong focus on software craftsmanship which ensures high-quality software development. It partners with the best in the industry like Lightbend (Scala Ecosystem), Databricks (Spark Ecosystem), Confluent (Kafka) and Datastax (Cassandra). To know more, send a mail to hello@knoldus.com or visit www.knoldus.com
This entry was posted in Scala and tagged , , , , . Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s