Map reduce with Akka and Scala: The famous word count


After working for around a decade with Java and family, recently i tried my hands with Scala and Akka. Yes! changing taste buds is not at all easy. But working with Scala is fun!  Prior to this, every time when i start on building some APIs, First thing which comes in my mind is

Number of java classes or beans. Honestly a POJO is never more than having getters and setters. (Do i need them?)

Scala just made it easy. Let’s see how!

Java

public class Person
{
private int age;

public int getAge()
{
return this.age;
}

public void setAge(Integer age)
{
this.age=age
}

Honestly in previous java blogs i used to skip these setters and getters by simply mentioning as

// getters and setters!

With Scala it’s really easy.

class Person
{
var age:Int = _
def age = _age
def age_=(age: Int) = _age = age
}

By the way, you can also define a property class for this and access it as property.
There are number of things which makes Scala having an added advantage over Java(e.g. immutability, utility functions etc.) but i will skip these for now and come back to topic, which we will walk through together.

Map Reduce
It was originally published as a google paper here. With Hadoop came into existence around late 2009, map reduce processing has been widely used across for large data processing. For more detail about Hadoop map reduce can be found here. It’s a model to perform parallel processing of data distributed across multiple data nodes.

Here we will take on famous word count example, which will read words from a file and will perform word count in map reduce manner.

I assume that user is aware Akka, in short it is all about asynchronous distributed message processing and can process millions of message per second on local box. Which enables application to utilize CPU and resources at the fullest.

Let’s discuss few of components(rather scala methods) to define and process a file on local box.

FileReceiver : An Akka actor to receive file name as an input message to initiate word count and finally broadcast message on completion.

class FileReceiver extends Actor {

  def receive =
    {
      case fileName: String =>
        val system = ActorSystem("receiversystem")
        val global = system.actorOf(Props(new CountAggregator(8)))
        val lineCollector = system.actorOf(Props(new LineCollector(system.actorOf(Props(new LocalAggregator(global))))))
        val router = system.actorOf(Props(new LineCollector(system.actorOf(Props(new LocalAggregator(global))))).withRouter(RoundRobinRouter(nrOfInstances = 8)))
        print(s"Started at ==>")
        println(System.currentTimeMillis())

        // determine line boundaries for number of chunks
        val file = new File(fileName)
        val chunkSize = 300000
        val count  = file.length()/chunkSize

        for (i <- 0 to count.intValue()) {
          val start = i * chunkSize //0, 10,20
          val end = chunkSize + start // 10,20,30
          router ! (fileName, start, end)  //send chunks
        }

        val remaining = chunkSize*count
        router ! (fileName, remaining, file.length()) //send out remaining chunk

        router ! Broadcast (true) //broadcast end of day message!

    }
}

LineCollector: Akka actor to receive file information and open file channel to read and further distribute lines as chunks message.(Local mapper)

/**
 *  Chunk processor
*/
class LineCollector(localAgg: ActorRef) extends Actor {
  def receive =
    {

      case (fileName: String, chunkStart: Int, chunkSize: Int) =>

        val file = new File(fileName)
        val channel = new RandomAccessFile(file, "r").getChannel();
        val mappedBuff = channel.map(FileChannel.MapMode.READ_ONLY, 0,file.length()) //map complete file

        // load only if it is not loaded!

        var endP = chunkSize
        // file size is greater than chunk
        if (endP >= file.length()) {
          endP = file.length().intValue - 1
        }

        if (chunkStart < file.length()) {
          var start = mappedBuff.get(chunkStart) // start character
          val startPosition = trail(chunkStart, mappedBuff, start, endP)

          var end = mappedBuff.get(endP) // end character

          val endPosition = if ((endP != file.length() - 1)) trail(endP, mappedBuff, end, endP) else endP // valid end character
          val stringBuilder = new StringBuilder(endPosition - startPosition)
          val size = endPosition - startPosition
          val byteArray = new Array[Byte](size)

          // prepare and send buffer to local combiner
          if (endPosition > startPosition) {
            for (i <- startPosition to endPosition) {
              val character = mappedBuff.get(i).asInstanceOf[Char]
              if (character == '\n') {
                stringBuilder.append(' ')
              } else {
                stringBuilder.append(character)
              }
            }
            localAgg ! stringBuilder.toString.split(" ").groupBy(x => x)  //sending chunks
          }

        }

      case (done: Boolean) =>
        localAgg ! done // end of day message
    }

  private def trail(startP: Int, charBuff: java.nio.MappedByteBuffer, start: Byte, length: Int): Int = {

    var s = start.asInstanceOf[Char]
    val position = startP
    var next = position

    // if start character is not space, keep backtracking to start with new character word
    if (position <= length) {
      while (!s.equals(' ') && position > 0) {
        s = charBuff.get(next).asInstanceOf[Char]
        next = next - 1
      }
    }

    if (position != next) next + 1 else position
  }

}

LocalAggregator : Akka actor to act as local chunk collector and perform word count aggregation. (e.g. Local reducer)


/**
 * Local(to line chunk collector) akka combiner
 */
class LocalAggregator(globalAgg: ActorRef) extends Actor {
  val wordCountMap = scala.collection.mutable.Map[String, Int]()
  def receive =
    {
      case countMap: Map[String, Array[String]] =>
        countMap map { case (k, v) => wordCountMap += ((k, wordCountMap.getOrElse(k, 0) + v.size)) }
      case complete: Boolean =>
        globalAgg ! wordCountMap
    }
}

CountAggregator: Akka actor to act as global line aggregator to publish final word count after successful reading of file.

/**
 * Global combiner to combine and print final output after aggregating results from local akka based combiners.
 */
	class CountAggregator(threadCount: Int) extends Actor {
	  val wordCountMap = scala.collection.mutable.Map[String, Int]()
	  var count: Integer = 0;
	  def receive =
	    {
	      case localCount: scala.collection.mutable.Map[String, Int] =>
	        //        count = count + 1
	        localCount map (x => wordCountMap += ((x._1, wordCountMap.getOrElse(x._1, 0) + x._2)))
	        count = count + 1
	        if (count == threadCount) {
	          println("Got the completion message ... khallas!")
	          onCompletion
	        }
	    }

	  // print final word count on output
	  def onCompletion() {

	        for (word <- wordCountMap.keys) {
	          print(word + "=>")
	          println(wordCountMap.get(word).get)
	        }
	        print(s"Completed at ==>")
	        println(System.currentTimeMillis())

	  }

	}

finally what we need is an App to run this. here it is!

object WordCountRunner extends App
{

 val receiverSystem = ActorSystem("receiversystem", ConfigFactory.load("application_remote"))

 val c = receiverSystem.actorOf(Props[FileReceiver],"receiver")
  c ! "/home/vivek/w2"  // 1 GB file
}

That’s it! We can write all this in single scala file. What about java ???
Also, try this out at your end it is incredibly fast on my local box(8 GB dual core Dell laptop)!

Oh yes! Forget to mention about

ConfigFactory.load("application_remote")

Load your Akka configuration file prepare and get Actorsystem up and running. Configuration looks very simple!

akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
enabled-transports = ["akka.remote.netty.tcp"]
netty.tcp {
hostname = "127.0.0.1"
port = 5553
}
}
}

Just need to configure Remote actor provider, host and port and that’s it.

Have fun and happy programming.

Posted in Scala | Tagged , , | Leave a comment

Knolx Session : Basics of Backbone.js


Basics of Backbone.js

It is a lightweight JavaScript library that adds structure to your client-side code (MVC framework).

Posted in Scala | Leave a comment

Scala in Business | Knoldus Newsletter – September 2014


Hello Folks

We are back again with September 2014, Newsletter. Here is this Scala in Business | Knoldus Newsletter – September 2014

In this newsletter you will get to know how organizations are getting benefits by using Typesafe Reactive Platform, how Scala can be beneficial for you and what trends are going in industries.

So, if you haven’t subscribed to the newsletter yet then make it hurry and click on Subscribe Monthly Scala News Letter

Screenshot from 2014-09-25 11:23:22

Posted in Agile, Akka, Amazon EC2, Cassandra, Cloud, Future, Java, LiftWeb, MongoDB, News, Non-Blocking, Play Framework, Reactive, Scala, Spark, Web | Leave a comment

Weather Information System using Scala.Js and HTML


Scala.Js- a Scala to JavaScript Compiler. If you are developing your web application in Scala, you don’t need to sacrifice JavaScript interoperability. You can write your web application entirely in Scala.

We have created a simple application “Weather Information System- Get the mood of your city on one click” using Scala.js and HTML.

To use this application, pull the code from https://github.com/knoldus/ScalaJs_Weather_Report.

weather

Posted in Agile, CSS, JavaScript, Scala, Web | Tagged , , , | Leave a comment

Knolx Session: Brief Introduction to MongoDB


In this presentation I explained few points about MongoDB and the ground rules to keep in mind while implementing Embedded and Reference document structure.

Posted in Scala | Leave a comment

And You Thought Option is Just Another Way to Handle null


Ok, to start with, the recommendation is to use None instead of null. Ideally if in Scala code, we end up getting a NPE then that is sin! Well at least at Knoldus :)

null in Java is used as a placeholder for denoting no value or a non-fatal error situation. It is widely accepted as a Billion dollar mistake of Java. The way to work around it is to have a NullObjectPattern which is present as a pattern in variety of languages and which people seldom use.

Scala is smart. It has an option which is a container. Now the container can either have something or it can be empty. Most of the times developers working in Scala use it in either of the two situations

First, when they are working with a Java API which possibly returns null. Since you would not want to work with null in Scala then you would wrap the Java API call in an Option. Something like this

def someCrappyJavaService = null   //> someCrappyJavaService: => Null
val result = Option(someCrappyJavaService)  //> result  : Option[Null] = None
if (result==None) println("bla")   //> bla

If you would notice, most of the developers coming from the Java world would be writing code like this.

However, Option provides us with lot of cool ways to implement our logic. For starters, instead of checking == you could instead do

def someCrappyJavaService = null     //> someCrappyJavaService: => Null
val result = Option(someCrappyJavaService)//> result  : Option[Null] = None
if (!result.isDefined) println("bla") //> bla

Ok, i see you throwing a stone at me. What is the big deal here? You could also do a getOrElse

def someCrappyJavaService = null
val result = Option(someCrappyJavaService).getOrElse("bla")
println(result)

Still, what is the big deal? Well, we have just started!

One of the cool feature of option is that it can be treated like a collection. What? Yes, you can call methods like map, flatMap, foreach and use it in for comprehensions. Let us see how that makes things easier for us.

for

We want to execute a portion of the logic only if we have got a value in the Option container. There is “a” way and then a better way to do it. See the code below

val result = None //Some("Hello")
def someProcessingLogic(x: String) = 
  println(s"I would execute when there is some value like $x")
//Standard Way
if (result != None) someProcessingLogic(result.get)
// Better Way!
for (res <- result) someProcessingLogic(res)

Now the for comprehension would evaluate only if res is not an empty box i.e. it has some value. But, what if there are multiple options that we want to compose together. For example the following code block
Continue reading

Posted in Scala | Tagged , | Leave a comment

Adding Scalastyle in a Multi-Module SBT Scala Project


Nowadays, building a Multi-Module SBT project is becoming very common. It helps us to keep multiple related projects in a single build. Moreover, each sub-project in the build has its own SBT default directory. Also, it generates its own build and works like any other project.

But, adding Scalastyle in a Multi-Module SBT Scala project is not easy because Scalastyle does not support Multi-Module SBT projects yet. So, in this post we will see how to add Scalastyle in a Multi-Module SBT project in an easy way.

In this following example we have a main project called parent and two sub-projects – child1 and child2. Now, lets get started with building the parent project.

1. To create multiple projects in a single build, we must first declare each project and also that how they relate in Build.scala file. So, here’s an example of a Build.scala file which defines a root project parent, where the parent project aggregates two sub-projects, child1 and child2.

import sbt._
import Keys._
import org.scalastyle.sbt.ScalastylePlugin

object BuildSettings {

 lazy val buildSettings =
   Defaults.defaultSettings ++
     Seq(
       version := "1.0",
       scalaVersion in ThisBuild := "2.11.2",
       parallelExecution in ThisBuild := false,
       scalacOptions := Seq(
         "-feature",
         "-language:implicitConversions",
         "-language:postfixOps",
         "-deprecation",
         "-encoding", "utf8",
         "-Ywarn-adapted-args"))

}

object SBTMultiBuild extends Build {

  import BuildSettings._

  // Common settings for all build modules.
  val commonSettings = buildSettings ++ Seq(ScalastylePlugin.Settings: _*)

  lazy val parent = Project(
    id = "parent",
    base = file(".")),
    settings = buildSettings) aggregate(child1, child2)

  lazy val child1 = Project(
    id = "child1",
    base = file("child1"),
    settings = commonSettings)

  lazy val child2 = Project(
    id = "child2",
    base = file("child2"),
    settings = commonSettings)
}

This code resides under parent/project/Build.scala. Having common buildSettings for all projects helps in managing all common settings for all projects under one object. Besides we also have commonSettings which contains Scalastyle settings in it, so that we don’t have to add it separately in all projects.

2. Next we have added commonSettings in all sub-projects

lazy val child1 = Project(
  id = "child1",
  base = file("child1"),
  settings = commonSettings)

lazy val child2 = Project(
  id = "child2",
  base = file("child2"),
  settings = commonSettings)

By adding commonSettings in all sub-projects we get Scalastyle settings added in all of them. So, we don’t have to add Scalastyle settings in sub-projects separately.

3. Now we have to add following line of code in parent/project/plugins.sbt file

// SBT plugin for ScalaStyle
addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.5.0")

This code will help SBT to add Scalastyle as a Plugin in the project.

4. At last we need to add scalastyle-config.xml file in the parent folder.

5. Now if we run – sbt scalastyle command on the parent project we can see the warnings on the terminal for all sub-projects in a single go.

knoldus@knoldus-desktop:~/parent$ sbt scalastyle
[info] Loading project definition from /home/knoldus/parent
[info] Compiling 2 Scala sources to /home/knoldus/parent/target/scala-2.11/sbt-0.13/classes...
[info] Set current project to parent (in build file:/home/knoldus/parent)
[info] Processed 1 file(s)
[info] Found 0 errors
[info] Found 0 warnings
[info] Found 0 infos
[info] Finished in 10 ms
[success] created: sbt.SettingKey$$anon$4@54449f9d
[info] Processed 2 file(s)
[info] Found 0 errors
[info] Found 0 warnings
[info] Found 0 infos
[info] Finished in 1 ms
[success] created: sbt.SettingKey$$anon$4@54449f9d
[warn] /home/knoldus/parent/child1/src/main/scala/sample.scala:16:55: magic.number.message
[info] Processed 1 file(s)
[info] Found 0 errors
[info] Found 1 warnings
[info] Found 0 infos
[info] Finished in 4 ms
[success] created: sbt.SettingKey$$anon$4@54449f9d
[warn]/home/knoldus/parent/child2/src/main/scala/sample.scala:10:8: method.name.message
[info] Processed 3 file(s)
[info] Found 0 errors
[info] Found 1 warnings
[info] Found 0 infos
[info] Finished in 1 ms
[success] Total time: 1 s, completed 14 Sep, 2014 1:02:50 PM

So, we saw in this post that how easy it is to add Scalastyle in a Multi-Module SBT Scala Project.

Posted in Agile, Architecture, Scala | Tagged , , , | Leave a comment

SCALA : Finding Patterns in Strings


To determine whether a String contains a regular expression pattern , create a “regex object” by using “.r” method on a string .

And then use the pattern with findFirstIn when you’re looking for one match, and findAllIn when looking for all matches.

First create a Regex for the pattern you want to search for, in this case, a sequence of one or more numeric characters:

scala> val numPattern = "[0-9]+".r

numPattern: scala.util.matching.Regex = [0-9]+

Next, create a sample String you can search:

scala> val address = "123 Main XYZ"

address: java.lang.String = 123 Main XYZ

The findFirstIn method finds the first match:

scala> val match1 = numPattern.findFirstIn(address)

match1: Option[String] = Some(123)

When looking for multiple matches, use the findAllIn method:

scala> val matches = numPattern.findAllIn(address)

matches: scala.util.matching.Regex.MatchIterator = non-empty iterator

As you can see, findAllIn returns an iterator, which lets you loop over the results:

scala> matches.foreach(println)

123

101

If findAllIn doesn’t find any results, an empty iterator is returned, so you can still write your code just like that—you don’t need to check to see if the result is null. If you’drather have the results as an Array, add the toArraymethod after the findAllIn call:

scala> val matches = numPattern.findAllIn(address).toArray

matches: Array[String] = Array(123, 101)

If there are no matches, this approach yields an empty Array. Other methods like toList, toSeq, and toVector are also available. You can also create regex object by importing the Regex class

scala> import scala.util.matching.Regex

import scala.util.matching.Regex

scala> val numPattern = new Regex("[0-9]+")

numPattern: scala.util.matching.Regex = [0-9]+

Posted in Scala | Tagged , | Leave a comment

Finagle By Twitter Engineer @ Knoldus


This week, Puneet Khanduri, member of Twitter core engineering team, introduced us to Finagle in the MeetUp organized by Knoldus. It was a great session. We are thankful to Puneet. It was quite inspiring and appreciated by all attendees.

IMG-20140910-WA0001

Please find slide deck of this meetup.

Posted in Agile, Scala | Tagged , | Leave a comment

Easiest Way to Implement Google Map in Scala.js


In this blog, I would explain how to implement Google Map in scala .js. If you are developing your web application in Scala and you want to write javascript functionality in Scala then you should go with scala.js.

Screenshot from 2014-09-10 22:51:44

To implement Google Map in scala.js, please follow below steps:-

1) Add below code in your HTML file

<script src="https://maps.googleapis.com/maps/api/js"></script>


<div id="map_canvas" style="height: 430px; width: 512px;"></div>

2) Add below function in your Scala class file

@JSExport
def initialize(lat: Double, long: Double) = {
val map_canvas = document.getElementById("map_canvas")
val map_options = lit(center = (jsnew(g.google.maps.LatLng))(lat, long), zoom = 3, mapTypeId = g.google.maps.MapTypeId.ROADMAP)
val gogleMap = (jsnew(g.google.maps.Map))(map_canvas, map_options)
val marker = (jsnew(g.google.maps.Marker))(lit(map = gogleMap, position = (jsnew(g.google.maps.LatLng)(lat, long))))
}

3) Call initialize function in your HTML file.

<script type="text/javascript">


com.knoldus.weather.Weather().initialize(42.2781410217,-74.9159927368)

</script>

For complete source code, go to https://github.com/knoldus/ScalaJs_Weather_Report

Posted in Agile, AJAX, CSS, JavaScript, jQuery, Scala, Web | Tagged , , | Leave a comment