Scala in Business | Knoldus Newsletter – July 2014


We are back again with July 2014, Newsletter. Here is this Scala in Business | Knoldus Newsletter – July 2014

In this newsletter, you will find that how industries are adopting Typesafe Reactive Platform for scaling their applications and getting benefits.

So, if you haven’t subscribed to the newsletter yet then make it hurry and click on Subscribe Monthly Scala News Letter

news

Posted in Scala | Leave a comment

Knolx Session: Gatling – Stress Test Tool


Posted in Scala | Leave a comment

Play with Spark: Building Spark MLLib in a Play Spark Application


In our last post of Play with Spark! series, we saw how to integrate Spark SQL in a Play Scala application. Now in this blog we will see how to add Spark MLLib feature in a Play Scala application.

Spark MLLib is a new component under active development. It was first released with Spark 0.8.0. It contains some common machine learning algorithms and utilities, including classification, regression, clustering, collaborative filtering, dimensionality reduction, as well as some optimization primitives. For detailed list of available algorithms click here.

To add Spark MLLib feature in a Play Scala application follow these steps:

1). Add following dependencies in build.sbt file

libraryDependencies ++= Seq(
"org.apache.spark"  %% "spark-core"              % "1.0.1",
"org.apache.spark"  %% "spark-mllib"             % "1.0.1"
)

The dependency - “org.apache.spark”  %% “spark-mllib” % “1.0.1″ is specific to Spark MLLib.

As you can see that we have upgraded to Spark 1.0.1 (latest release of Apache Spark).

2). Create a file app/utils/SparkMLLibUtility.scala & add following code to it

package utils

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.classification.NaiveBayes

object SparkMLLibUtility {

 def SparkMLLibExample {

 val conf = new SparkConf(false) // skip loading external settings
                .setMaster("local[4]") // run locally with enough threads
                .setAppName("firstSparkApp")
                .set("spark.logConf", "true")
                .set("spark.driver.host", "localhost")
 val sc = new SparkContext(conf)

 val data = sc.textFile("public/data/sample_naive_bayes_data.txt")    // Sample dataset
 val parsedData = data.map { line =>
 val parts = line.split(',')
 LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
 }
 // Split data into training (60%) and test (40%).
 val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)
 val training = splits(0)
 val test = splits(1)

 val model = NaiveBayes.train(training, lambda = 1.0)
 val prediction = model.predict(test.map(_.features))

 val predictionAndLabel = prediction.zip(test.map(_.label))
 val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count()
 println("Accuracy = " + accuracy * 100 + "%")
 }
}

In above code we have used Naive Bayes algorithm as an example.

3). In above code you can notice that we have parsed data into Vectors object of Spark.

val parsedData = data.map { line =>
 val parts = line.split(',')
 LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
 }

Reason for using Vectors object of Spark instead of Vector class of Scala is that, Vectors object of Spark contains both Dense & Sparse methods for parsing both dense & sparse type of data. This allows us to analyze data according to its properties.

4). Next we observe that we have split data in 2 parts – 60% for training & 40% for testing.

// Split data into training (60%) and test (40%).
val splits = parsedData.randomSplit(Array(0.6, 0.4), seed = 11L)
val training = splits(0)
val test = splits(1)

5). Then we trained our model using Naive Bayes algorithm & training data.

val model = NaiveBayes.train(training, lambda = 1.0)

6). At last we used our model to predict the labels/class of test data.

 val prediction = model.predict(test.map(_.features))
 val predictionAndLabel = prediction.zip(test.map(_.label))
 val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count()
 println("Accuracy = " + accuracy * 100 + "%")

Then to find how good our model is, we calculated the Accuracy of the predicted labels.

So, we see that how easy it is to use any algorithm available in Spark MLLib to perform predictive analytics on data. For more information on Spark MLLib click here.

To download a Demo Application click here.

Posted in Agile, Play Framework, Scala, Spark | Tagged , , | Leave a comment

Play with Spark: Building Spark SQL in a Play Spark Application


In our last post of Play with Spark! series, we saw how to integrate Spark Streaming in a Play Scala application. Now in this blog we will see how to add Spark SQL feature in a Play Scala application.

Spark SQL is a powerful tool of Apache Spark. It allows relational queries, expressed in SQL, HiveQL, or Scala, to be executed using Spark. Apache Spark has a new type of RDD to support queries expressed in SQL format, it is SchemaRDD. A SchemaRDD is similar to a table in a traditional relational database.

To add Spark SQL feature in a Play Scala application follow these steps:

1). Add following dependencies in build.sbt file

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.0.0",
"org.apache.spark" %% "spark-sql"  % "1.0.0"
)

The dependency - “org.apache.spark”  %% “spark-sql” % “1.0.0″ is specific to Spark SQL.

2). Create a file app/utils/SparkSQL.scala & add following code to it

package utils

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import org.apache.spark.sql.SQLContext

case class WordCount(word: String, count: Int)

object SparkSQL {

 def simpleSparkSQLApp {
 val logFile = "public/README.md" // Should be some file on your system
 val driverHost = "localhost"
 val conf = new SparkConf(false) // skip loading external settings
                .setMaster("local[4]") // run locally with enough threads
                .setAppName("firstSparkApp")
                .set("spark.logConf", "true")
                .set("spark.driver.host", s"$driverHost")
 val sc = new SparkContext(conf)
 val logData = sc.textFile(logFile, 4).cache()
 val words = logData.flatMap(_.split(" "))

 val sqlContext = new SQLContext(sc)

 import sqlContext._

 val wordCount = words.map(word => (word,1)).reduceByKey(_+_).map(wc => WordCount(wc._1, wc._2))
 wordCount.registerAsTable("wordCount")

 val moreThanTenCount = wordCount.where('count > 10).select('word)

 println("Words occuring more than 10 times are : ")
 moreThanTenCount.map(mttc => "Word : " + mttc(0)).collect().foreach(println)

 }

}

Like any other Spark component, Spark SQL also runs on its own context. Here it is SQLContext. It runs on top of SparkContext. So, first we built sqlContext, so that we can use Spark SQL.

3). In above code you can notice that we have built a case class WordCount.

case class WordCount(word: String, count: Int)

This case class defines the Schema of Table in which we are going to store data in SQL format.

4). Next we observe that we have mapped variable wordCount to case class WordCount.

val wordCount = words.map(word => (word,1)).reduceByKey(_+_).map(wc => WordCount(wc._1, wc._2))
wordCount.registerAsTable("wordCount")

Here we are converting wordCount from RDD to SchemaRDD. Then we are registering it as a Table so that we can construct SQL queries to fetch data from it.

5). At last we notice that we have constructed a SQL query in Scala

val moreThanTenCounters = wordCount.where('count > 10).select('word)

Here we are fetching the words which occur more than 10 times in our text file. We have used Language-Integrated Relational Queries of Spark SQL which is available only in Scala. To know about other types of SQL queries supported by Spark SQL, click here.

To download a Demo Application click here.

Posted in Agile, Play Framework, Scala, Spark | Tagged , , | Leave a comment

Knolx session:Understanding nosql


BIG data!

Word is not relatively new now but still evolving. Changing requirement,scaling  applications beyond current limitation is what have been driving force behind big data development.  Here i am sharing a presentation to cover transitioning from traditional RDBMS to nosql world, nosql ecosystem and current era of hybrid design.

Happy programming! Keep innovating.

Image | Posted on by | Leave a comment

Dependency Injection @ scala


Scala is all about scalable,functional, immutability and most importantly less piece of code!
While building a framework using shared-nothing architecture, i came across the requirement of linking/referencing various modules(rather services) together.  Dependency injection was an obvious choice. Let’s take ahead this discussion with further exploration of Dependency Injection(DI) and various possible ways to accomplish it

Dependency Injection
It is basically providing the objects that an object needs (its dependencies) instead of having it construct them itself. It’s a very useful technique for testing, since it allows dependencies to be mocked or stubbed out.In other word, Dependency injection is a design pattern whose core principle is to separate behavior from dependency resolution.
There is an article by Martin Fowler that can provide more insights about DI.

Basically needs of DI are:
Unit testing
Mock Data Access Layer
Mock File System
Mock Email

Modular design
Multiple different  implementation
Upgradeable infrastructure

Separation of concern
Single Responsibility
Increase code Re-Use
Cleaner Modeling
Concurrent or independent development

To summarize, Dependency injection is a software design pattern in which any dependency object has should be provided that dependency instead of internally create with in an object.Actually this implementation Dependency Inversion Principle.Scala is a very rich and deep language that gives you several ways of doing DI solely based on language constructs, but nothing prevents you from using existing Java DI frameworks, if that is preferred.

Let’s talk about developer business, i.e. implementation.

Talking about scala and DI, there are multiple alternatives are available, few popular ones are:

1.Constructor injection
2.Cake pattern
3.Google guice

Constructor injection
As the name says itself!, Declare all required dependencies for an object as constructor arguments. Great! but how does it work?Well, lets take on this with an example

A UserService class that take user information from database and do some operation.
So first we need to get data from database for that , define a class UserDAL which have create ,update, delete methods on database rows.

class UserDAL {
/* dummy data access layer */
def getUser(id: String): User = {
val user = User("12334", "testUser", "test@knoldus.com")
println("UserDAL: Getting user " + user)
user
}
def create(user: User) = {
println("UserDAL: creating user: " + user)
}
def delete(user: User) = {
println("UserDAL: deleting user: " + user)
}
}
// case class is simple class which some extra factory methods .User class have information about user.
case class User(id: String, name: String, email: String)

Now define a UserService class , UserService class have some operation on data those provided by UserDAL class. UserService class have dependency  on UserDAL so UserService class declare UserDAL class as a constructor parameter.

class UserService(userDAL: UserDAL) {
 def getUserInfo(id: String): User = {
val user = userDAL.getUser(id)
println("UserService: Getting user " + user)
user
}

def createUser(user: User) = {
userDAL.create(user)
println("UserService: creating user: " + user)
}

def deleteUser(user: User) = {
userDAL.delete(user)
println("UserService: deleting user: " + user)
}

Something wrong in design. Any guesses ? Take a closer look
class UserService(userDAL: UserDAL) {
You answer is violation of Dependency Inversion Principle.
The principle states:
A. High-level modules should not depend on low-level modules. Both should depend on abstractions.
B. Abstractions should not depend on details. Details should depend on abstractions.

So we define a trait :

trait UserDALComponent {

def getUser(id: String): User
def create(user: User)
def delete(user: User)

}
class UserDAL extends UserDALComponent {
// a dummy data access layer that is not persisting anything
def getUser(id: String): User = {
val user = User("12334", "testUser", "test@knoldus.com")
println("UserDAL: Getting user " + user)
user
}
def create(user: User) = {
println("UserDAL: creating user: " + user)
}
def delete(user: User) = {
println("UserDAL: deleting user: " + user)
}

}
class UserService(userDAL: UserDALComponent) {

def getUserInfo(id: String): User = {
val user = userDAL.getUser(id)
println("UserService: Getting user " + user)
user
}

def createUser(user: User) = {
userDAL.create(user)
println("UserService: creating user: " + user)
}

def deleteUser(user: User) = {
userDAL.delete(user)
println("UserService: deleting user: " + user)
}

 

UserService depend on UserDALComponent(abstraction) not implemention.

Continue reading

Posted in Scala | Leave a comment

Knol Session : Introduction to Highchart


Posted in Scala | Leave a comment

Scaldi: Dependency injection library in scala


Scaldi is the lightweight dependency injection library written in scala and it provides scala DSL for binding dependencies and injecting them.

Its depends on three concepts.

Injector – container for bindings.

Module -  where we define bindings and it extends Injector.

Injectable – trait which provide DSL for injection.

In this blog we will see how can we use it with Akka.

1. First add scaldi library dependency in your project build.sbt


"org.scaldi" %% "scaldi-akka" % "0.4"

2. Suppose we have an actor as below which does sum of two integer and send result back to its sender.


class SumActor extends Actor {
def receive = {
case Sum(a, b) =>
sender ! SumCalculated(a + b)
}
}

Below are the messages used for communication between actors.


case class Sum(a: Int, b: Int)
case class SumCalculated(sum: Int)

3. Create an actor who delegate the job to this SumActor and uses scaldi library api to inject props of SumActor.


import akka.actor.Actor
import scaldi.Injector
import akka.actor.ActorRef
import akka.actor.PoisonPill

class SumCalculater(implicit inj: Injector) extends Actor with
AkkaInjectable {

val sumActorProps = injectActorProps[SumActor]

def receive = work

val work: Receive = {

case sum: Sum =>
val sumActor = context.actorOf(sumActorProps)
sumActor ! sum
context become doSumAndSendResult(sum, sender)

def doSumAndSendResult(sum: Sum, reportTo: ActorRef): Receive = {
case SumCalculated(sum) =>
println("calculated sum.....")
reportTo ! sum
}
}
}

4- Now we will create a simple application using scaldi as below.

AkkaInjectable trait  provide various methods for dependency injection.

injectActorRef – creates a new actor using actorRef factory.

injectActorProps – inject props for the actor.


import scaldi.Module
import akka.actor.ActorSystem
import akka.actor.Inbox
import scala.concurrent.duration._

object SumApplication extends App with AkkaInjectable {

implicit val applicationModule = new SumModule :: new AkkaModule
implicit val system = inject[ActorSystem]
val sumCalculater = injectActorRef[SumCalculater]
val inbox = Inbox.create(system)

inbox send (sumCalculater, Sum(3, 4))
val sum = inbox.receive(5 seconds)
println(sum)
}

class SumModule extends Module {
binding toProvider new SumCalculater
binding toProvider new SumActor
}

class AkkaModule extends Module {
bind[ActorSystem] to ActorSystem("SumActorSystem") destroyWith (_.shutdown())
}

In class SumModule we provide binding for all actors which we want to inject as dependency in  application. In class AkkaModule we provide binding for ActorSystem.

We can combine these two module using  :: operator.

Related code can be found in github repo.  Learn more about scaldi.

Posted in Scala | Leave a comment

Play with Spark: Building Apache Spark with Play Framework – (Part – 2)


Last week, we saw how to build a Simple Spark Application in Play using Scala. Now in this blog we will see how to add Spark’s Twitter Streaming feature in a Play Scala application.

Spark Streaming is a powerful tool of Spark. It runs on top of Spark. It gives the ability to process and analyze real-time streaming data (in batches) along with fault-tolerant characteristics of Spark.

To add Spark’s Twitter Streaming feature in a Play Scala application follow these steps:

1). Add following dependencies in build.sbt file

libraryDependencies ++= Seq(
"org.apache.spark"  %% "spark-core"              % "1.0.0",
"org.apache.spark"  %% "spark-streaming-twitter" % "1.0.0"
)

As you can see we have upgraded our application to Spark 1.0.0. The dependency - “org.apache.spark”  %% “spark-streaming-twitter” % “1.0.0″ is specific to Twitter Streaming. If you want to use Flume, Kafka, etc, then import respective dependency given here.

2). Create a file app/utils/TwitterPopularTags.scala & add following code to it

package utils

import org.apache.spark.streaming.{ Seconds, StreamingContext }
import StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import play.api.Play
import org.apache.spark.SparkConf
import twitter4j.TwitterFactory
import twitter4j.auth.AccessToken

object TwitterPopularTags {

 def TwitterStreamUtil {

 // Twitter Authentication credentials
 val consumerKey = Play.current.configuration.getString("consumer_key").get
 val consumerSecret = Play.current.configuration.getString("consumer_secret").get
 val accessToken = Play.current.configuration.getString("access_token").get
 val accessTokenSecret = Play.current.configuration.getString("access_token_secret").get

 // Authorising with your Twitter Application credentials
 val twitter = new TwitterFactory().getInstance()
 twitter.setOAuthConsumer(consumerKey, consumerSecret)
 twitter.setOAuthAccessToken(new AccessToken(accessToken, accessTokenSecret))

 val driverPort = 8080
 val driverHost = "localhost"
 val conf = new SparkConf(false) // skip loading external settings
 .setMaster("local[4]") // run locally with enough threads
 .setAppName("firstSparkApp")
 .set("spark.logConf", "true")
 .set("spark.driver.port", s"$driverPort")
 .set("spark.driver.host", s"$driverHost")
 .set("spark.akka.logLifecycleEvents", "true")

 val ssc = new StreamingContext(conf, Seconds(2))
 val filters = Seq("fifa")
 val stream = TwitterUtils.createStream(ssc, Option(twitter.getAuthorization()), filters)

 val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

 val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
 .map { case (topic, count) => (count, topic) }
 .transform(_.sortByKey(false))

 val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
 .map { case (topic, count) => (count, topic) }
 .transform(_.sortByKey(false))

 // Print popular hashtags
 topCounts60.foreachRDD(rdd => {
 val topList = rdd.take(5)
 println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
 topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
 })

 topCounts10.foreachRDD(rdd => {
 val topList = rdd.take(5)
 println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
 topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
 })

 ssc.start()
 ssc.awaitTermination()
 }

}

This function will get the most Popular Tags from Twitter of matching “filters” from Streaming Tweets using Spark Streaming. Also, Spark Streaming needs its own Port to stream in data, so, provide a separate port to it other than 9000 or the port on which Play is running.

3). Add Twitter App credentials in conf/application.conf like this

# Twitter Keys
consumer_key="consumer key of Twitter App"
consumer_secret="consumer secret of Twitter App"
access_token="access token of Twitter App"
access_token_secret="access token secret of Twitter App"

4). At last add following code in app/controllers/Application.conf

package controllers

import play.api._
import play.api.mvc._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import utils.TwitterPopularTags
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits._

object Application extends Controller {

 def index = Action {
 Future{TwitterPopularTags.TwitterStreamUtil}
 Ok(views.html.index(""))
 }

}

5). Now the application is ready to be run on localhost:9000.

To download a Demo Application click here.

Posted in Agile, Akka, Future, Non-Blocking, Play Framework, Reactive, Scala, Spark, Web | Tagged , , , , | 1 Comment

Knolx Session: Dependency Injection @ Scala


In this knols session ,I am going to discuss how to implement Depenency Injection (DI) in Scala.

Posted in Scala | Leave a comment