Data Science & Spark:- Data visualization for processing out the key factors using Zeppelin.

What is it a data scientist yearn for most? For me it’s Data Visualization. We know that under those massive layers of junk there is always something important hidden that can get noticed with the visualization. More importantly we need analytics for establishing relationship among all the values which could be quite hassling if done manually given how large and unstructured the dataset we may have.

While working on spark, I stumbled upon the most basic and very well structured dataset found from “Kaggle” competition “Titanic”. Now the very first thing I wanted was to visualize this dataset in graphical form to understand better relationship among constraints.

One of the way to do that is via Zeppelin. Zeppelin provides a web based notebook that allow user to build reports for data analytic. In this blog I am going to explain about Zeppelin installation,configuration and graphical representation on Titanic data set.

Few important things to know about Zeppelin before we begin:-

1. It’s an open source tool

2. It does not require any other specific scripting language to learn. One can use Scala for data upload and manipulation (just like Spark) and SQL queries for graphical representation

3. It can be easily set up with the Spark Cluster.

I have set up Zeppelin via incubator downloaded from Github repository. Since I have Spark 1.4.0 installed on my system. I executed command “sudo maven -X clean package -Pspark-1.4 -Dhadoop.version=2.2.0 -Phadoop-2.2 -DskipTests” (here -X enables exceptions,if there are any, to be displayed). Once the downloading process is completed it would look like this.

set up

One can configure the set up if required to change the port number, assign some extra features like external cluster set up as master the setting can be done in One can start Zeppelin by following command

incubator-zeppelin/bin$./ start

1. In browser on localhost:8080 we get :-

zeppelin home page

2. From “Notebook” tab we can create new notebook:- These Notebooks act like an interface in which one can type and execute text based commands.

2.1 In first step we have created a spark context which reads data from csv.

2.2 Then we have created a case class and mapped the CSV content.

2.3 And in last step we simply assigned it to data frames in temporary table called “titanic”

data mapped into df for SQL query processing

3. Now we can write a sql query to get dataset on the chart :-

The queries should be prefixed with %sql to invoke sql interpreter.

basic query

4. Right next to these chart options we can see “Settings” tab. These is used to maintain the variables for graphs, such as for following diagram which elements should present y-axis , which should present x-axis, their grouping attribute etc. could be selected.

plotting of data

This plotting of data allows one to get data insights and which helps in selection of predictive model would be best for the requirements of our data set.

Posted in Scala | 1 Comment

Spark Streaming Gnip :- An Apache Spark Utility to pull Tweets from Gnip in realtime

We all are familiar with Gnip, Inc. which provides data from dozens of social media websites via a single API. It is also known as the Grand Central Station for social media web. One of its popular API is PowerTrack which provides Tweets from Twitter in realtime along with the ability to filter Twitter’s full firehose, giving its customers only what they are interested in.

This ability of Gnip’s PowerTrack has a number of applications and can be applied anywhere, such as with Spark Streaming. Yes!!!, we can integrate Gnip’s PowerTrack with Apache Spark’s Streaming library and build a powerful utility which can provide us Tweets from Twitter in real-time. Also, we can apply all available features of Apache Spark on Gnip’s PowerTrack data to do real-time analysis.

In this blog, we will see a utility which will help us to pull Tweets from Gnip using Spark Streaming and have better handling of Gnip’s PowerTrack data.

Following are steps for implementation :-

1. Setup build.sbt file

name := "spark-streaming-gnip"

version := "1.0"

scalaVersion := "2.11.6"

organization := "com.knoldus"

libraryDependencies ++= Seq(
 "org.apache.spark" %% "spark-core" % "1.4.0",
 "org.apache.spark" %% "spark-streaming" % "1.4.0"

2. Create a ReceiverInputDStream for Gnip :-

Now we need to create a ReceiverInputDStream for Gnip which will create a connection with Gnip, using the supplied Gnip authentication credentials and return a set of all tweets during each interval.

class GnipInputDStream(
 ssc_ : StreamingContext,
 url: String,
 username: String,
 password: String,
 storageLevel: StorageLevel) extends ReceiverInputDStream[String](ssc_) {

 override def getReceiver(): Receiver[String] = {
 new GnipReceiver(url, username, password, storageLevel)

class GnipReceiver(
 url: String,
 username: String,
 password: String,
 storageLevel: StorageLevel) extends Receiver[String](storageLevel) with Logging {

 private var gnipConnection: HttpURLConnection = _
 private var stopped = false

 def onStart() {
   try {
     val newGnipConnection = getConnection(url, username, password)
     val inputStream = newGnipConnection.getInputStream()
     val responseCode = newGnipConnection.getResponseCode()
     if (responseCode >= 200 && responseCode <= 299) {
       val reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(inputStream), "UTF-8"))
     } else {
     logError("Error receiving tweets")
     logInfo("Gnip receiver started")
     stopped = false
   } catch {
     case e: Exception => restart("Error starting Gnip stream", e)

 def onStop() {
   stopped = true
   logInfo("Gnip receiver stopped")

3. Utility for Gnip Input Stream :-

At last we need a utility for Gnip Input Stream which will create an input stream that returns tweets received from Gnip.

object GnipUtils {

 * Create an input stream that returns tweets received from Gnip.
 * @param ssc          StreamingContext object
 * @param url          Gnip Powertrack Url
 * @param username     Gnip account username
 * @param password     Gnip account password
 * @param storageLevel Storage level to use for storing the received objects
 def createStream(
   ssc: StreamingContext,
   url: String,
   username: String,
   password: String,
   storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String] =
   new GnipInputDStream(ssc, url, username, password, storageLevel)


4. Example :-

Now we are ready to use our Gnip input stream receiver using Apache Spark Streaming API. Here is a short example of how to use above utility:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import com.knoldus.spark.streaming.gnip.GnipUtils

object GnipStreaming extends App {

 val sparkConf: SparkConf = new SparkConf().setAppName("spark-streaming-gnip").setMaster("local[*]")
 val sc: SparkContext = new SparkContext(sparkConf)
 val ssc: StreamingContext = new StreamingContext(sc, Seconds(10))
 val gnipDstream = GnipUtils.createStream(ssc, "", "", "knoldus")




As we can see in the example above, using this utility we can easily integrate Spark Streaming with Gnip’s PowerTrack, by just providing it Gnip authentication credentials and Resource (PowerTrack) Url.

The full source code for the utility package can be downloaded from here.

Posted in Agile, Scala, Spark | Tagged , , , , , , | 3 Comments

Configuring SonarQube with Scoverage plug-in : The Complete Guide

This blog will guide you through the successful configuration of Scoverage plug-in with SonarQube for Scala source code statement coverage analysis.

How Does it Work?

The Scoverage plug-in for SonarQube reads the report generated by sbt scoverage plug-in and generate several reports like

  • Statement Coverage % Analysis
  • Lines covered by test
  • Drilling down report to the file level

The greatest advantage of SonarQube is the support for several programming languages for code quality rules. Unfortunately, it doesn’t support Scala. That is why scoverage plug-in exists for SonarQube. After adding Scoverage plug-in SonarQube will add Scala-specific rules for statement coverage. It can also support multi-language projects like Scala with Java and so on. Continue reading

Posted in Scala, Test, Tutorial | Tagged , , , , , | Leave a comment

Start/Deploy Apache Spark application programmatically using spark launcher

Sometimes we need to start our spark application from the another scala/java application. So we can use SparkLauncher. we have an example in which we make spark application and run it with another scala application.

Let see our spark application code.

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

object SparkApp extends App{
val conf=new SparkConf().setMaster("local[*]").setAppName("spark-app")
val sc=new SparkContext(conf)
val rdd=sc.parallelize(Array(2,3,2,1))

This is our simple spark application, make a jar of this application using sbt assembly, now we make a scala application through which we start this spark application as follows:

import org.apache.spark.launcher.SparkLauncher

object Launcher extends App {

val spark = new SparkLauncher()


In the above code we use SparkLauncher object and set values for its like

setSparkHome(“/home/knoldus/spark-1.4.0-bin-hadoop2.6″) is use to set spark home which is use internally to call spark submit.

.setAppResource(“/home/knoldus/spark_launcher-assembly-1.0.jar”) is use to specify jar of our spark application.

.setMainClass(“SparkApp”) the entry point of the spark program i.e driver program.

.setMaster(“local[*]”) set the address of master where its start here now we run it on loacal machine.

.launch() is simply start our spark application.

Its a minimal requirement you can also set many other configurations like pass arguments, add jar , set configurations etc.

For source code you can check out following git repo:

Spark_laucher is our spark application

launcher_app is our scala application which start spark application

Change path according to you and make a jar of Spark_laucher, run launcher_app and see result RDD in this directory as a result of spark application because we simple save it as a text file.

Posted in Scala | 1 Comment

Stateful transformation on Dstream in apache spark with example of wordcount

Sometimes we have a use-case in which we need to maintain state of paired Dstream to use it in next Dstream . So we are taking a example of stateful wordcount in socketTextStreaming. Like in wordcount example if word “xyz” comes twice is in first Dstream or window, it reduce it and its value is 2 but its state will lost in the next Dstream if in next Dstream a word “xyz” will come its value will be 1 so its inconsistent, so to maintain the state of key like in next Dstream a word “xyz” will come its value will be 3. We use updateStateByKey(func) for stateful transformation.

So first we make a spark context for making streaming context as follows

val conf = new SparkConf().setAppName("myStream").setMaster("local[*]")
val sc = new SparkContext(conf)

Now with the help of spark context we make a streaming context as follows

val ssc = new StreamingContext(sc, Seconds(10))

Here Seconds(10)is the streaming interval

Now we use ssc context to open socketTextStream as follows

val lines = ssc.socketTextStream("localhost", 9999)

where localhost is interface and 9999 is port on which its listening.

we should use checkpoint to make it fault tolerance as follows


Here ./checkpoints/ are the directory where all checkpoints are store.

val words = lines.flatMap(_.split(" "))

we split the line listen by the socketTextStream by space(” “)

val pairs = => (word, 1))

we make a pair RDD to map it with tuple have value 1.

Now we use updateStateByKey(func) to make every word stateful through multiple Dstreams

val windowedWordCounts = pairs.updateStateByKey(updateFunc)

Now the main part of stateful transformation is updateFunc which is argument of updateStateByKey, we define it as follows

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
val currentCount = values.foldLeft(0)(_ + _)
val previousCount = state.getOrElse(0)
Some(currentCount + previousCount)

this function always take the two arguments one is values of current Dstream for that key and another one is old state of value. As you see above we simply fold all current values in current Dstream and add it with old state of key.

Now we simply save result in text file as follows


At last simply start stream with ssc.start()

Now just start socketTextStream with the help of following command in terminal

$nc -lk 9999

now you can see cursor blink in the terminal so whatever you write in this its listen by your spark streaming.

Screenshot from 2015-06-24 19:29:56

And now all you words value will be stateful even it will come in different Dstreams.

You can see the out in the result directory for reach stream it will save one RDD.

For source code you can go through link:

Posted in Scala | 3 Comments

Apache spark + cassandra: Basic steps to install and configure cassandra and use it with apache spark with example


To build an application using apache spark and cassandra you can use the datastax spark-cassandra-connector to communicate with spark. Before we are going to communicate with spark using connector we should know how to configure cassandra. So following are prerequisite to run example smoothly.

Following steps to install and configure cassandra

If you are new to cassandra first we nee to install cassandra on our local machine.


next step is if you want to configure cassandra instance you can make changes in /conf/cassandra.yaml file. Some of important configurations are below:

cluster_name: ‘test_1node’

you can set the same cluster name its shows in which cluster node is connected

 listen_address: localhost

Right now we are using local instance otherwise we use ip address by that other node in cluster can communicate with this node using listen_address

rpc_address: localhost

rpc_address specify the ip or host name through which client communicate

pc_port: 9160

rpc_port for thrift protocol to listen for client

Now extract file using following command

$tar xzvf dsc-cassandra-2.1.6-bin.tar.gz

now run the cassandra instance on your local machine with the help on following command


when your cassandra instance will run correctly its shown the following info log on console

Node localhost/ state jump to normal

Now your cassandra instance is running smoothly. Next step to make a keyspace and table, for that we use cqlsh so run cqlsh by following command


Now we make a keyspace and table for this example we make table in with which show which spark version support connector version if you not incompatible version its throw exception

First we make keyspace as below

CREATE KEYSPACE spark WITH replication = {‘class': ‘SimpleStrategy’, ‘replication_factor': 1};

Now we make a table in this keyspace using following command

cqlsh> CREATE TABLE spark.try (spark float PRIMARY KEY ,connector float) ;

now your table schema is defined now we insert row of spark version and its compatible connector

cqlsh> INSERT INTO spark.try (spark, connector ) VALUES ( 1.4,1.4);

insert all data like above statement

Now to see data from table simply use command

cqlsh> SELECT * FROM spark.try ;

Now we jump to connect apache spark with our cassandra instance

first we make a sbt project and include depencencies of spark and cassandra-connector

name := "spark_cassandra"

version := "1.0"

scalaVersion := "2.10.4"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.2.0",
"com.datastax.spark" %% "spark-cassandra-connector" % "1.1.0"

No we make a main object for our application as follows

object SparkCassandra extends App {

Now we make a spark configurations for our application as follows

val conf = new SparkConf(true)
.set("","") .setAppName("cassandra").setMaster("local[*]")
.set("spark.cassandra.connection.native.port", "9042")
.set("spark.cassandra.connection.rpc.port", "9160")

.set(“”,”″) is used as a connection point to the cassandra cluster by default it is a master node

.setAppName(“cassandra”) is app name of the application which is use by the spark logs and show on the master UI of the cluster

.setMaster(“local[*]”) is use to specify master of the application

.set(“spark.cassandra.connection.native.port”, “9042”) is used to listen CQL native transport from clients

.set(“spark.cassandra.connection.rpc.port”, “9160”) is used by thrift protocol

These are the minimal requirement for connectivity to cassandra , you can also set more advance configuration, changes can be make in above configurations through cassandra.yaml file.

Now make a spark context with these configuration as follows

val sc = new SparkContext(conf)

Now you can directly make a RDD of the cassandra table as follows

val rdd = sc.cassandraTable("spark", "try")

Now you can simply perform any transformations and actions on this RDD, here we simply collect it and print on console

val file_collect=rdd.collect()           

it will give you following result on the console

Screenshot from 2015-06-23 16:32:11

For source code you can go to this repository

before run source code remember to make cassandra instance and start it as mention above

Posted in Scala, Spark | Tagged , , , , | 5 Comments

Easiest Way to connect with Couchbase using Scala

In this blog, we have explained how easily we can connect with Couchbase, store and fetch data  using scala through an example.

Tools and technologies used :

SBT 0.13.8
Scala 2.11.6
Couchbase Server 4.0 Beta
JDK 1.7

Please follow below instructions to use this example:-

1) Install and configure Couchbase

a) Download the couchbase server from here.

b)  If you are using ubuntu, then Install the package using the dpkg command as a privileged user under sudo. For example:

sudo dpkg -i couchbase-server-enterprise_4.0.0-beta-ubuntu14.04_amd64.deb

c) Get Couchbase Server up and running quickly.

d) You can configure your server using the instructions given here.

2) Set up build.sbt

3) Couchbase Connection

Connect with Couchbase using CouchbaseCluster

Selection_0334) Create Person case-class

5) Write store and fetch method to get data from Couchbase

6) Go to http://localhost:8091/ and create a bucket “person”

7) Now run

sbt test

8) Now go back to http://localhost:8091/, you will able to see person data in JSON format.
You can find complete source code here

Posted in couchbase, NoSql, Scala | Tagged , | 5 Comments

Shufflling and repartitioning of RDD’s in apache spark

To write the optimize spark application you should carefully use transformation and actions, if you use wrong transformation and action will make your application  slow. So when you are writing application some points should be remember to make your application more optimize.

1. Number of partitions when creating RDD

By default spark create one partition for each block of the file in HDFS it is 64MB by default. You can also pass second argument as a number of partition when creating RDD.Let see example of creating RDD of text file

val rdd= sc.textFile(“file.txt”,5)

above statement make a RDD of textFile with 5 partition. Now if we have a cluster with 4 cores then each partition need to process 5 minutes so 4 partition process parallel and 5 partition process after that whenever core will be free so it so final result will be completed in 10 minutes and resources also ideal while only one partition process.

So to overcome this problem we should make RDD with number of partition is equal to number of cores in the cluster by this all partition will process parallel and resources are also used equally

 2 . reduceByKey Vs. groupByKey

Let see example of word count you can process RDD and find the frequency of word using both the transformations groupByKey and reduceBykey

word count using reduceBykey

val wordPairsRDD = => (word, 1))

val wordCountsWithReduce = wordPairsRDD
  .reduceByKey(_ + _)

See in diagram how RDD are process and shuffle over the network

As you see in above diagram all worker node first process its own partition and count words on its own machine and then shuffle for final result

On the other hand if we use groupByKey for word count as follow

val wordCountsWithGroup = rdd
  .map(t => (t._1, t._2.sum))

Let see diagram how RDD are process and shuffle over the network using groupByKey


As you see above all worker node shuffle data and at final node it will be count words so using groupByKey so lot of unnecessary data will be transfer over the network.

So avoid using groupByKey as much as possible.

3. Hash-partition before transformation over pair RDD

Before perform any transformation we should shuffle same key data at the same worker so for that we use Hash-partition to shuffle data and make partition using the key of the pair RDD let see the example of the Hash-Partition data

val wordPairsRDD = => (word, 1)).
                   partitonBy(new HashPartition(4))

val wordCountsWithReduce = wordPairsRDD
  .reduceByKey(_ + _)

When we are using Hash-partition the data will be shuffle and all same key data will shuffle at same worker, Let see in diagram


In the above diagram you can see all the data of  “c” key will be shuffle at same worker node. So if we use tansformation over pair RDD we should use hash-partitioning.

4. Do not use collect() over a big dataset

Collect() action collect all elements of RDD and send it to master so if we use it on the big dataset sometimes it might be give out of memory because data set not fit into memory so filter the data before use collect() or use take and sampleTake action.

5. Use coalesce to repartition in decrease number of partition

Use coalesce if you decrease number of partition of the RDD instead of repartition. coalesce is usefull because its not shuffle data over network.


Posted in Scala | Tagged | 1 Comment

Play 2.4.X : Microservice Architecture using Play and Scala


This blog describes a basic Microservice architecture design using Play 2.4.X and Scala. The big idea behind microservices is to architect large, complex and long-lived applications as a set of cohesive services that evolve over time. The term microservices strongly suggests that the services should be small.

In short, the microservice architectural style is an approach to developing a single application as a suite of small services, each running in its own process and communicating with lightweight mechanisms, often an HTTP resource API.

Single node microservice architecture


Multiple node microservice architecture with load balancer


Dive into code level activities

Scalastyle : Check the code quality

To check code quality of all the modules

$ ./activator clean compile scalastyle

Scoverage : Check code coverage of test cases

To check code coverage of test cases for all modules

$ ./activator clean coverage test

By default, scoverage will generate reports for each project seperately. You can merge them into an aggregated report by invoking

$ ./activator coverageAggregate

Deployment : microservices

$ ./activator "project <service-name>" "run <PORT>"


Posted in Architecture, Microservices, Multi-Project Builds, Play 2.4.X, Play Framework, Scala, Web, Web Services | Tagged , , , , , , , , , , , , , | 5 Comments

Data Science & Spark :- Logistic Regression implementation for spam dataset

We all are bit familiar with term Data Science, as it is turning out to be a field with potential of new discoveries. Challenges of Data Science seem to be evolutionary, given the amount of data in the world is only going to increase. However, the development of tools and libraries to deal with these challenges can be somewhat called revolutionary. One such product of this revolution is Spark, Over the period of time We would be discussing about its implementations and experiments in details.

Starting with Data Science, a very broad explanatory term in itself, is about retrieving all the information from the trails of data people may leave behind in virtual or physical world. For examples it could be your product browsing history, List of items you have bought from a grocery stores etc. As written by Alpaydin “What we lack in knowledge, we make up for in Data” and Data is considered to be the cheapest raw material ever found ;).

Now question arise what to do with this data?. How does analysis on them help multinational corporations to cash fortune out of it?  The main purpose of this field, in general opinion, is to understand the nature of data to form better visualization, structures and models to achieve highly accurate results or predictions.

On the other hand Spark provides Mllib, a library, of functions of machine learning which allow one to invoke various algorithms on distributed datasets. As data is represented in form of RDDs in Spark.

In general way the machine learning pipeline can be considered as this


With above provided diagram, It can be assumed that the most fundamental task is to understand the data and relationship among it’s elements to extract features from it . Here is a simple example of logistic regression algorithm to differentiate between spam and non spam messages. The datasets provided here are two text files containing simple texts of spam and non-spam. The reason to select an algorithm for model on a dataset is based on different factors such as problem statement (Classification,Clustering, Regression etc.), Dataset structure, feature weight analysis and many more. However,with the arbitrary structure of dataset, we want to have a binary output from the input variables.

Logistic regression is a statistical model that calculate the probability of an object belonging to a particular class. As per our requirement we have a text message that could either belong to spam or normal message category.

Following are steps for implementation :-

1. Introduce the Mllib dependency in build.sbt file

name := “””spark-examples”””

version := “1.0”

scalaVersion := “2.11.5”

libraryDependencies ++=Seq(

“org.apache.spark” %% “spark-core” % “1.2.1”,

“org.apache.spark” %% “spark-mllib” % “1.2.1”)

2. Load the data :-

The dataset can be loaded with command

val data = sc.textFile(“Path to the file”)

3. Feature Extraction :-

Feature extraction is a process of finding the key elements from data that would play major role in outcome. We can use HashingTF to map text data to vectors as while identifying labeled points they take feature as Vector.

val hashingTF = new HashingTF()

Map and transform data

val features = => hashingTF.transform(data.split(”  “)))

4. Labeled Points :-

Labeled points are local vectors that are used to denote the target values. In case of binary classification these should be categorized into either 1(positive) or 0(negative) form.

val positive = => LabeledPoint(1, features))

val negative = => LabeledPoint(0, features))

val trainingData = positive.union(negative)

5. Model :-

Now the model we are implementing is basic Logistic Regression Classifier. We are here using binary class classifier, For multi class classifier labels could be start from 0,1,2,3… so on.

val model = new LogisticRegressionWithSGD().run(trainingData)

6. Test Data :-

Now we just need to feed a test data

val message = hashingTF.transform(“You have virus please reset your password“.split(”  “))


7. Result :-

Simply go to directory via terminal and write sbt run


The source code can be downloaded from here

Posted in Scala | 3 Comments