Play with Spark: Building Apache Spark with Play Framework – (Part – 2)


Last week, we saw how to build a Simple Spark Application in Play using Scala. Now in this blog we will see how to add Spark’s Twitter Streaming feature in a Play Scala application.

Spark Streaming is a powerful tool of Spark. It runs on top of Spark. It gives the ability to process and analyze real-time streaming data (in batches) along with fault-tolerant characteristics of Spark.

To add Spark’s Twitter Streaming feature in a Play Scala application follow these steps:

1). Add following dependencies in build.sbt file

libraryDependencies ++= Seq(
"org.apache.spark"  %% "spark-core"              % "1.0.0",
"org.apache.spark"  %% "spark-streaming-twitter" % "1.0.0"
)

As you can see we have upgraded our application to Spark 1.0.0. The dependency – “org.apache.spark”  %% “spark-streaming-twitter” % “1.0.0” is specific to Twitter Streaming. If you want to use Flume, Kafka, etc, then import respective dependency given here.

2). Create a file app/utils/TwitterPopularTags.scala & add following code to it

package utils

import org.apache.spark.streaming.{ Seconds, StreamingContext }
import StreamingContext._
import org.apache.spark.SparkContext._
import org.apache.spark.streaming.twitter._
import play.api.Play
import org.apache.spark.SparkConf
import twitter4j.TwitterFactory
import twitter4j.auth.AccessToken

object TwitterPopularTags {

 def TwitterStreamUtil {

 // Twitter Authentication credentials
 val consumerKey = Play.current.configuration.getString("consumer_key").get
 val consumerSecret = Play.current.configuration.getString("consumer_secret").get
 val accessToken = Play.current.configuration.getString("access_token").get
 val accessTokenSecret = Play.current.configuration.getString("access_token_secret").get

 // Authorising with your Twitter Application credentials
 val twitter = new TwitterFactory().getInstance()
 twitter.setOAuthConsumer(consumerKey, consumerSecret)
 twitter.setOAuthAccessToken(new AccessToken(accessToken, accessTokenSecret))

 val driverPort = 8080
 val driverHost = "localhost"
 val conf = new SparkConf(false) // skip loading external settings
 .setMaster("local[4]") // run locally with enough threads
 .setAppName("firstSparkApp")
 .set("spark.logConf", "true")
 .set("spark.driver.port", s"$driverPort")
 .set("spark.driver.host", s"$driverHost")
 .set("spark.akka.logLifecycleEvents", "true")

 val ssc = new StreamingContext(conf, Seconds(2))
 val filters = Seq("fifa")
 val stream = TwitterUtils.createStream(ssc, Option(twitter.getAuthorization()), filters)

 val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))

 val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
 .map { case (topic, count) => (count, topic) }
 .transform(_.sortByKey(false))

 val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10))
 .map { case (topic, count) => (count, topic) }
 .transform(_.sortByKey(false))

 // Print popular hashtags
 topCounts60.foreachRDD(rdd => {
 val topList = rdd.take(5)
 println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
 topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
 })

 topCounts10.foreachRDD(rdd => {
 val topList = rdd.take(5)
 println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
 topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
 })

 ssc.start()
 ssc.awaitTermination()
 }

}

This function will get the most Popular Tags from Twitter of matching “filters” from Streaming Tweets using Spark Streaming. Also, Spark Streaming needs its own Port to stream in data, so, provide a separate port to it other than 9000 or the port on which Play is running.

3). Add Twitter App credentials in conf/application.conf like this

# Twitter Keys
consumer_key="consumer key of Twitter App"
consumer_secret="consumer secret of Twitter App"
access_token="access token of Twitter App"
access_token_secret="access token secret of Twitter App"

4). At last add following code in app/controllers/Application.conf

package controllers

import play.api._
import play.api.mvc._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import utils.TwitterPopularTags
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits._

object Application extends Controller {

 def index = Action {
 Future{TwitterPopularTags.TwitterStreamUtil}
 Ok(views.html.index(""))
 }

}

5). Now the application is ready to be run on localhost:9000.

To download a Demo Application click here.

This entry was posted in Agile, Akka, Future, Non-Blocking, Play Framework, Reactive, Scala, Spark, Web and tagged , , , , . Bookmark the permalink.

One Response to Play with Spark: Building Apache Spark with Play Framework – (Part – 2)

  1. Pingback: Play with Spark: Building Apache Spark with Play Framework – (Part – 3) | Knoldus

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s