Last week, we saw how to build a Simple Spark Application in Play using Scala. Now in this blog we will see how to add Spark’s Twitter Streaming feature in a Play Scala application.

Spark Streaming is a powerful tool of Spark. It runs on top of Spark. It gives the ability to process and analyze real-time streaming data (in batches) along with fault-tolerant characteristics of Spark.

To add Spark’s Twitter Streaming feature in a Play Scala application follow these steps:

1). Add following dependencies in build.sbt file

libraryDependencies ++= Seq(
"org.apache.spark"  %% "spark-core"              % "1.0.0",
"org.apache.spark"  %% "spark-streaming-twitter" % "1.0.0"
)

As you can see we have upgraded our application to Spark 1.0.0. The dependency – “org.apache.spark”  %% “spark-streaming-twitter” % “1.0.0” is specific to Twitter Streaming. If you want to use Flume, Kafka, etc, then import respective dependency given here.

2). Create a file app/utils/TwitterPopularTags.scala & add following code to it

package utils

import org.apache.spark.streaming.{ Seconds, StreamingContext }
import StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import play.api.Play
import org.apache.spark.SparkConf
import twitter4j.TwitterFactory
import twitter4j.auth.AccessToken

object TwitterPopularTags {

 def TwitterStreamUtil {

 // Twitter Authentication credentials
 val consumerKey = Play.current.configuration.getString("consumer_key").get
 val consumerSecret = Play.current.configuration.getString("consumer_secret").get
 val accessToken = Play.current.configuration.getString("access_token").get
 val accessTokenSecret = Play.current.configuration.getString("access_token_secret").get

 // Authorising with your Twitter Application credentials
 val twitter = new TwitterFactory().getInstance()
 twitter.setOAuthConsumer(consumerKey, consumerSecret)
 twitter.setOAuthAccessToken(new AccessToken(accessToken, accessTokenSecret))

 val driverPort = 8080
 val driverHost = "localhost"
 val conf = new SparkConf(false) // skip loading external settings
 .setMaster("local[4]") // run locally with enough threads
 .setAppName("firstSparkApp")
 .set("spark.logConf", "true")
 .set("spark.driver.port", s"$driverPort")
 .set("spark.driver.host", s"$driverHost")
 .set("spark.akka.logLifecycleEvents", "true")

 val ssc = new StreamingContext(conf, Seconds(2))
 val filters = Seq("fifa")
 val stream = TwitterUtils.createStream(ssc, Option(twitter.getAuthorization()), filters)

 val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

 val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
 .map { case (topic, count) => (count, topic) }
 .transform(_.sortByKey(false))

 val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
 .map { case (topic, count) => (count, topic) }
 .transform(_.sortByKey(false))

 // Print popular hashtags
 topCounts60.foreachRDD(rdd => {
 val topList = rdd.take(5)
 println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
 topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
 })

 topCounts10.foreachRDD(rdd => {
 val topList = rdd.take(5)
 println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
 topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
 })

 ssc.start()
 ssc.awaitTermination()
 }

}

This function will get the most Popular Tags from Twitter of matching “filters” from Streaming Tweets using Spark Streaming. Also, Spark Streaming needs its own Port to stream in data, so, provide a separate port to it other than 9000 or the port on which Play is running.

3). Add Twitter App credentials in conf/application.conf like this

# Twitter Keys
consumer_key="consumer key of Twitter App"
consumer_secret="consumer secret of Twitter App"
access_token="access token of Twitter App"
access_token_secret="access token secret of Twitter App"

4). At last add following code in app/controllers/Application.conf

package controllers

import play.api._
import play.api.mvc._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import utils.TwitterPopularTags
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits._

object Application extends Controller {

 def index = Action {
 Future{TwitterPopularTags.TwitterStreamUtil}
 Ok(views.html.index(""))
 }

}

5). Now the application is ready to be run on localhost:9000.

To download a Demo Application click here.

2 comments

  1. Hi, thanks for the post. But if you put the sparkstreaming job inside of Future, it is going to skip over the spark job and go directly to views.html.index page, right?

    I wonder if there is a way to start the spark streaming job in the background and still serve the views.html.index page under play framework.

    Thank you.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.