Finding the Impact of a Tweet using Spark GraphX


Social Network Analysis (SNA), a process of investigating social structures using Networks and Graphs, has become a very hot topic nowadays. Using it, we can answer many questions like:

  • How many connections an individual have ?
  • What is the ability of an individual to influence a network?
  • and so on…

Which can be used for conducting marketing research studies, running ad campaigns, and finding out latest trends. So, it becomes very crucial to identify the impact of an individual or individuals in a social network, so that we can identify key individuals, or Alpha Users (term used in SNA), in a social network.

In this post we are going to see how to find the impact of an individual in a Social Network like Twitter, i.e., How many Twitter Users an individaul can influence via his/her Tweet upto N number of level, i.e., Followers of Followers of Followers… and so on. For, this post we are have downloaded data from Stanford University’s SNAP portal. Also, for anonymity we have assigned random names to Users in Twitter data.

Now, lets get on to the task of finding Alpha User. For that first we have to extract data from text file like this:

val data = sparkContext.textFile("twitter-graph-data.txt")
val followees: RDD[(VertexId, String)] = data.map(_.split(",")).map { arr =>
  val user = arr(0).replace("((", "")
  val id = arr(1).replace(")", "")
  (id.toLong, user)
} 

val followers: RDD[(VertexId, String)] = data.map(_.split(",")).map { arr =>
  val user = arr(2).replace("(", "")
  val id = arr(3).replace("))", "")
  (id.toLong, user)
}

Then we have to create the graph from data extracted above using Spark GraphX API.

val vertices = followees.union(followers)
val edges = data.map(_.split(",")).map { arr =>
  val followeeId = arr(1).replace(")", "").toLong
  val followerId = arr(3).replace("))", "").toLong
  Edge(followeeId, followerId, "follow")
} 

val defaultUser = ("")
val graph = Graph(vertices, edges, defaultUser)

In above code we have created Vertices and Edges using the Twitter Followers & Followees RDD. Also, we have to provide a Default Entry like defaultUser.  

Now, comes the trivial task of finding the most influential/alpha user which has maximum influence up to 2 levels, i.e., the one who have maximum Followers of Followers in total. For this, we are going to use Pregel API of Spark GraphX and Breadth First Traversal algorithm.

val subGraph = graph.pregel("", 2, EdgeDirection.In)((id, attr, msg) =>
attr + "," + msg,
triplet => Iterator((triplet.srcId, triplet.dstAttr)),
(a, b) => (a + "," + b))

The above code is hard to digest at first. So, lets try to understand it.

  • First argument in pregel is the initial message that is sent to all vertices in graph which are connected.
  • Second argument is the number of iterations which tells to pregel api as to how many times the message has to be sent among connected vertices.
  • Third argument is the direction of Edge in which message has to be sent. We selected Inward direction because we have to traverse to root node from leaf nodes. So, that we can get all followers of followers for all users in graph.
  • Then comes the vprog argument which asks for the function/expression to get vertex argument. For that we have combined the attribute and message of previous vertex as to accumulate all users following an individual user on Twitter.
  • After that comes the sendMsg argument which requires expression for calculating the message to be sent in next iteration. Here we are sending followers name so that we can accumulate all users following an individual user on Twitter.
  • At last comes the mergeMsg argument, which reduces/merges the messages sent to a vertex during an iteration of Pregel API.

Now, we just have to find the user with maximum Followers of Followers. For that we can use following code:

val alphaUser = subGraph.vertices.map(vertex => (vertex._1, vertex._2.split(",").distinct.length - 2)).max()(new Ordering[Tuple2[VertexId, Int]]() {
  override def compare(x: (VertexId, Int), y: (VertexId, Int)): Int =
    Ordering[Int].compare(x._2, y._2) 
})

val alphaUserId = graph.vertices.filter(_._1 == alphaUser._1).map(_._2).collect().head

The variable alphaUserId will provide us the Id of Alpha User (as the name suggests). In the same way if we wanted to find the most influential user upto N level then just increase the number of iterations to N instead of 2 in Pregel API. It is as easy as it gets  😉

I hope you liked this post and want to have a hands on experience for this example. So, just download or clone the code from here: https://github.com/knoldus/spark-graphx-twitter and run it.


KNOLDUS-advt-sticker

This entry was posted in apache spark, big data, graph, Scala, Spark and tagged , , , . Bookmark the permalink.

3 Responses to Finding the Impact of a Tweet using Spark GraphX

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s