Spark Streaming Gnip :- An Apache Spark Utility to pull Tweets from Gnip in realtime


We all are familiar with Gnip, Inc. which provides data from dozens of social media websites via a single API. It is also known as the Grand Central Station for social media web. One of its popular API is PowerTrack which provides Tweets from Twitter in realtime along with the ability to filter Twitter’s full firehose, giving its customers only what they are interested in.

This ability of Gnip’s PowerTrack has a number of applications and can be applied anywhere, such as with Spark Streaming. Yes!!!, we can integrate Gnip’s PowerTrack with Apache Spark’s Streaming library and build a powerful utility which can provide us Tweets from Twitter in real-time. Also, we can apply all available features of Apache Spark on Gnip’s PowerTrack data to do real-time analysis.

In this blog, we will see a utility which will help us to pull Tweets from Gnip using Spark Streaming and have better handling of Gnip’s PowerTrack data.

Following are steps for implementation :-

1. Setup build.sbt file

name := "spark-streaming-gnip"

version := "1.0"

scalaVersion := "2.11.6"

organization := "com.knoldus"

libraryDependencies ++= Seq(
 "org.apache.spark" %% "spark-core" % "1.4.0",
 "org.apache.spark" %% "spark-streaming" % "1.4.0"
 )

2. Create a ReceiverInputDStream for Gnip :-

Now we need to create a ReceiverInputDStream for Gnip which will create a connection with Gnip, using the supplied Gnip authentication credentials and return a set of all tweets during each interval.

class GnipInputDStream(
 ssc_ : StreamingContext,
 url: String,
 username: String,
 password: String,
 storageLevel: StorageLevel) extends ReceiverInputDStream[String](ssc_) {

 override def getReceiver(): Receiver[String] = {
 new GnipReceiver(url, username, password, storageLevel)
 }
}

class GnipReceiver(
 url: String,
 username: String,
 password: String,
 storageLevel: StorageLevel) extends Receiver[String](storageLevel) with Logging {

 private var gnipConnection: HttpURLConnection = _
 private var stopped = false

 def onStart() {
   try {
     val newGnipConnection = getConnection(url, username, password)
     val inputStream = newGnipConnection.getInputStream()
     val responseCode = newGnipConnection.getResponseCode()
     if (responseCode >= 200 && responseCode <= 299) {
       val reader = new BufferedReader(new InputStreamReader(new GZIPInputStream(inputStream), "UTF-8"))
       storeData(reader)
     } else {
     logError("Error receiving tweets")
     }
     setGnipConnection(newGnipConnection)
     logInfo("Gnip receiver started")
     stopped = false
   } catch {
     case e: Exception => restart("Error starting Gnip stream", e)
   }
 }

 def onStop() {
   stopped = true
   setGnipConnection(null)
   logInfo("Gnip receiver stopped")
 }

3. Utility for Gnip Input Stream :-

At last we need a utility for Gnip Input Stream which will create an input stream that returns tweets received from Gnip.

object GnipUtils {

 /**
 * Create an input stream that returns tweets received from Gnip.
 * @param ssc          StreamingContext object
 * @param url          Gnip Powertrack Url
 * @param username     Gnip account username
 * @param password     Gnip account password
 * @param storageLevel Storage level to use for storing the received objects
 */
 def createStream(
   ssc: StreamingContext,
   url: String,
   username: String,
   password: String,
   storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[String] =
   new GnipInputDStream(ssc, url, username, password, storageLevel)
}

 

4. Example :-

Now we are ready to use our Gnip input stream receiver using Apache Spark Streaming API. Here is a short example of how to use above utility:

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.StreamingContext
import com.knoldus.spark.streaming.gnip.GnipUtils

object GnipStreaming extends App {

 val sparkConf: SparkConf = new SparkConf().setAppName("spark-streaming-gnip").setMaster("local[*]")
 val sc: SparkContext = new SparkContext(sparkConf)
 val ssc: StreamingContext = new StreamingContext(sc, Seconds(10))
 val gnipDstream = GnipUtils.createStream(ssc, "https://stream.gnip.com:443/accounts/Knoldus/publishers/twitter/streams/track/prod.json", "knoldus@knoldus.com", "knoldus")

 gnipDstream.print()

 ssc.start()
 ssc.awaitTermination()

}

As we can see in the example above, using this utility we can easily integrate Spark Streaming with Gnip’s PowerTrack, by just providing it Gnip authentication credentials and Resource (PowerTrack) Url.

The full source code for the utility package can be downloaded from here.

This entry was posted in Agile, Scala, Spark and tagged , , , , , , . Bookmark the permalink.

4 Responses to Spark Streaming Gnip :- An Apache Spark Utility to pull Tweets from Gnip in realtime

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s