Map reduce with Akka and Scala: The famous word count


After working for around a decade with Java and family, recently i tried my hands with Scala and Akka. Yes! changing taste buds is not at all easy. But working with Scala is fun!  Prior to this, every time when i start on building some APIs, First thing which comes in my mind is

Number of java classes or beans. Honestly a POJO is never more than having getters and setters. (Do i need them?)

Scala just made it easy. Let’s see how!

Java

public class Person
{
private int age;

public int getAge()
{
return this.age;
}

public void setAge(Integer age)
{
this.age=age
}

Honestly in previous java blogs i used to skip these setters and getters by simply mentioning as

// getters and setters!

With Scala it’s really easy.

class Person {
 var _age: Int = _
 def age = _age
 def age_=(value: Int) = _age = value
}

By the way, you can also define a property class for this and access it as property.
There are number of things which makes Scala having an added advantage over Java(e.g. immutability, utility functions etc.) but i will skip these for now and come back to topic, which we will walk through together.

Map Reduce
It was originally published as a google paper here. With Hadoop came into existence around late 2009, map reduce processing has been widely used across for large data processing. For more detail about Hadoop map reduce can be found here. It’s a model to perform parallel processing of data distributed across multiple data nodes.

Here we will take on famous word count example, which will read words from a file and will perform word count in map reduce manner.

I assume that user is aware Akka, in short it is all about asynchronous distributed message processing and can process millions of message per second on local box. Which enables application to utilize CPU and resources at the fullest.

Let’s discuss few of components(rather scala methods) to define and process a file on local box.

FileReceiver : An Akka actor to receive file name as an input message to initiate word count and finally broadcast message on completion.

class FileReceiver extends Actor {

  def receive =
    {
      case fileName: String =>
        val system = ActorSystem("receiversystem")
        val global = system.actorOf(Props(new CountAggregator(8)))
        val lineCollector = system.actorOf(Props(new LineCollector(system.actorOf(Props(new LocalAggregator(global))))))
        val router = system.actorOf(Props(new LineCollector(system.actorOf(Props(new LocalAggregator(global))))).withRouter(RoundRobinRouter(nrOfInstances = 8)))
        print(s"Started at ==>")
        println(System.currentTimeMillis())

        // determine line boundaries for number of chunks
        val file = new File(fileName)
        val chunkSize = 300000
        val count  = file.length()/chunkSize

        for (i <- 0 to count.intValue()) {
          val start = i * chunkSize //0, 10,20
          val end = chunkSize + start // 10,20,30
          router ! (fileName, start, end)  //send chunks
        }

        val remaining = chunkSize*count
        router ! (fileName, remaining, file.length()) //send out remaining chunk

        router ! Broadcast (true) //broadcast end of day message!

    }
}

LineCollector: Akka actor to receive file information and open file channel to read and further distribute lines as chunks message.(Local mapper)

/**
 *  Chunk processor
*/
class LineCollector(localAgg: ActorRef) extends Actor {
  def receive =
    {

      case (fileName: String, chunkStart: Int, chunkSize: Int) =>

        val file = new File(fileName)
        val channel = new RandomAccessFile(file, "r").getChannel();
        val mappedBuff = channel.map(FileChannel.MapMode.READ_ONLY, 0,file.length()) //map complete file

        // load only if it is not loaded!

        var endP = chunkSize
        // file size is greater than chunk
        if (endP >= file.length()) {
          endP = file.length().intValue - 1
        }

        if (chunkStart < file.length()) {
          var start = mappedBuff.get(chunkStart) // start character
          val startPosition = trail(chunkStart, mappedBuff, start, endP)

          var end = mappedBuff.get(endP) // end character

          val endPosition = if ((endP != file.length() - 1)) trail(endP, mappedBuff, end, endP) else endP // valid end character
          val stringBuilder = new StringBuilder(endPosition - startPosition)
          val size = endPosition - startPosition
          val byteArray = new Array[Byte](size)

          // prepare and send buffer to local combiner
          if (endPosition > startPosition) {
            for (i <- startPosition to endPosition) {
              val character = mappedBuff.get(i).asInstanceOf[Char]
              if (character == '\n') {
                stringBuilder.append(' ')
              } else {
                stringBuilder.append(character)
              }
            }
            localAgg ! stringBuilder.toString.split(" ").groupBy(x => x)  //sending chunks
          }

        }

      case (done: Boolean) =>
        localAgg ! done // end of day message
    }

  private def trail(startP: Int, charBuff: java.nio.MappedByteBuffer, start: Byte, length: Int): Int = {

    var s = start.asInstanceOf[Char]
    val position = startP
    var next = position

    // if start character is not space, keep backtracking to start with new character word
    if (position <= length) {
      while (!s.equals(' ') && position > 0) {
        s = charBuff.get(next).asInstanceOf[Char]
        next = next - 1
      }
    }

    if (position != next) next + 1 else position
  }

}

LocalAggregator : Akka actor to act as local chunk collector and perform word count aggregation. (e.g. Local reducer)


/**
 * Local(to line chunk collector) akka combiner
 */
class LocalAggregator(globalAgg: ActorRef) extends Actor {
  val wordCountMap = scala.collection.mutable.Map[String, Int]()
  def receive =
    {
      case countMap: Map[String, Array[String]] =>
        countMap map { case (k, v) => wordCountMap += ((k, wordCountMap.getOrElse(k, 0) + v.size)) }
      case complete: Boolean =>
        globalAgg ! wordCountMap
    }
}

CountAggregator: Akka actor to act as global line aggregator to publish final word count after successful reading of file.

/**
 * Global combiner to combine and print final output after aggregating results from local akka based combiners.
 */
	class CountAggregator(threadCount: Int) extends Actor {
	  val wordCountMap = scala.collection.mutable.Map[String, Int]()
	  var count: Integer = 0;
	  def receive =
	    {
	      case localCount: scala.collection.mutable.Map[String, Int] =>
	        //        count = count + 1
	        localCount map (x => wordCountMap += ((x._1, wordCountMap.getOrElse(x._1, 0) + x._2)))
	        count = count + 1
	        if (count == threadCount) {
	          println("Got the completion message ... khallas!")
	          onCompletion
	        }
	    }

	  // print final word count on output
	  def onCompletion() {

	        for (word <- wordCountMap.keys) {
	          print(word + "=>")
	          println(wordCountMap.get(word).get)
	        }
	        print(s"Completed at ==>")
	        println(System.currentTimeMillis())

	  }

	}

finally what we need is an App to run this. here it is!

object WordCountRunner extends App
{

 val receiverSystem = ActorSystem("receiversystem", ConfigFactory.load("application_remote"))

 val c = receiverSystem.actorOf(Props[FileReceiver],"receiver")
  c ! "/home/vivek/w2"  // 1 GB file
}

That’s it! We can write all this in single scala file. What about java ???
Also, try this out at your end it is incredibly fast on my local box(8 GB dual core Dell laptop)!

Oh yes! Forget to mention about

ConfigFactory.load("application_remote")

Load your Akka configuration file prepare and get Actorsystem up and running. Configuration looks very simple!

akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 5553
}
}
}

Just need to configure Remote actor provider, host and port and that’s it.

Have fun and happy programming.

References: http://dustinmartin.net/getters-and-setters-in-scala/

Advertisements
This entry was posted in Scala and tagged , , . Bookmark the permalink.

3 Responses to Map reduce with Akka and Scala: The famous word count

  1. Scott Gardner says:

    IMHO it’d be better not to simply copy-paste stuff from places and compile into one.
    And your first piece of Scala code doesn’t compile:
    class Person
    {
    var age:Int = _
    def age = _age
    def age_=(age: Int) = _age = age
    }

    At least add some references instead of just plagiarizing.

  2. apanimesh061 says:

    Hi,
    I was able to set up the project and also build it without any errors but when I run the program hags after it was read all the chunks. The program control does not go into CountAggregator! How should I make this work?

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s