Spark – LDA : A Complete example of clustering algorithm for topic discovery.

In this blog we will be demonstrating the functionality of applying the full ML pipeline over a set of documents which in this case we are using 10 books from the internet.

What is Clustering ?

Clustering is the task of grouping a set of objects in such a way that objects in the same group (called a cluster) are more similar (in some sense or another) to each other than to those in other groups (clusters). It is a main task of exploratory data mining, and a common technique for statistical data analysis, used in many fields, including machine learning, pattern recognition, image analysis, information retrieval, bioinformatics, data compression, and computer graphics.

Clustering when applied on the textual data , then it is known as Document Clustering.

It has applications in automatic document organization, topic extraction and fast information retrieval or filtering.

The basic difference between Clustering and Classification ?

Clustering algorithms in computational text analysis groups documents into what are called subsets or clusters where the algorithm’s goal is to create internally coherent clusters that are distinct from one another.[4] Classification on the other hand, is a form of supervised learning where the features of the documents are used to predict the “type” of documents.

The basic difference is that in the clustering we are forming the clusters by simply telling that we want these many clusters and now go and cook them ! While in the case of classification we are first training the model to classify our data into different set of classes that we have already defined , and then by using that trained model we classify our data.

A basic example for a clustering algorithm would be LDA and for Classification would be SVM.

How LDA Actually Works ?

For this I would recommend you to through these :

In a nut shell what it does is :

Step 1

You tell the algorithm how many topics you think there are. You can either use an informed estimate (e.g. results from a previous analysis), or simply trial-and-error. In trying different estimates, you may pick the one that generates topics to your desired level of interpretability, or the one yielding the highest statistical certainty (i.e. log likelihood). In our example above, the number of topics might be inferred just by eyeballing the documents.

Step 2

The algorithm will assign every word to a temporary topic. Topic assignments are temporary as they will be updated in Step 3. Temporary topics are assigned to each word in a semi-random manner (according to a Dirichlet distribution, to be exact). This also means that if a word appears twice, each word may be assigned to different topics. Note that in analyzing actual documents, function words (e.g. “the”, “and”, “my”) are removed and not assigned to any topics.

Step 3 (iterative)

The algorithm will check and update topic assignments, looping through each word in every document. For each word, its topic assignment is updated based on two criteria:

• How prevalent is that word across topics?
• How prevalent are topics in the document?

What we are going to do in this blog ?

We are going to perform these steps for the document clustering, these steps are:

1.  Spark RegexTokenizer : For Tokenization

2.  Stanford NLP Morphology : For Stemming and lemmatization

3.  Spark StopWordsRemover : For removing stop words and punctuation

4.  Spark LDA : For Clustering of documents.

Code:

So let’s get started with the Code:

Build.sbt :

```name := "spark-ml-example"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"org.apache.spark" % "spark-core_2.11" % "2.0.0",
"org.apache.spark" % "spark-sql_2.11" % "2.0.0",
"org.apache.spark" % "spark-mllib_2.11" % "2.0.0",
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0",
"edu.stanford.nlp" % "stanford-corenlp" % "3.6.0" classifier "models",
)```

Now comes your pipeLine which should look like this:

```import edu.stanford.nlp.process.Morphology
import edu.stanford.nlp.simple.Document
import org.apache.log4j.{Level, Logger}
import scala.collection.JavaConversions._

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature._
import org.apache.spark.ml.linalg.{Vector => MLVector}
import org.apache.spark.mllib.clustering.{DistributedLDAModel, EMLDAOptimizer, LDA, OnlineLDAOptimizer}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}

case class Params(
input: String = "",
k: Int = 5,
maxIterations: Int = 10,
docConcentration: Double = -1,
topicConcentration: Double = -1,
vocabSize: Int = 2900000,
stopwordFile: String = "src/main/resources/stopWords.txt",
algorithm: String = "em",
checkpointDir: Option[String] = None,
checkpointInterval: Int = 10)

class LDAExample(sc: SparkContext, spark: SparkSession) {

def run(params: Params): Unit = {

Logger.getRootLogger.setLevel(Level.WARN)

// Load documents, and prepare them for LDA.
val preprocessStart = System.nanoTime()
val (corpus, vocabArray, actualNumTokens) =
preprocess(sc, params.input, params.vocabSize, params.stopwordFile)
val actualCorpusSize = corpus.count()
val actualVocabSize = vocabArray.length
val preprocessElapsed = (System.nanoTime() - preprocessStart) / 1e9
corpus.cache()
println()
println(s"Corpus summary:")
println(s"\t Training set size: \$actualCorpusSize documents")
println(s"\t Vocabulary size: \$actualVocabSize terms")
println(s"\t Training set size: \$actualNumTokens tokens")
println(s"\t Preprocessing time: \$preprocessElapsed sec")
println()

// Run LDA.
val lda = new LDA()

val optimizer = params.algorithm.toLowerCase match {
case "em" => new EMLDAOptimizer
// add (1.0 / actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets.
case "online" => new OnlineLDAOptimizer().setMiniBatchFraction(0.05 + 1.0 / actualCorpusSize)
case _ => throw new IllegalArgumentException(
s"Only em, online are supported but got \${params.algorithm}.")
}

lda.setOptimizer(optimizer)
.setK(params.k)
.setMaxIterations(params.maxIterations)
.setDocConcentration(params.docConcentration)
.setTopicConcentration(params.topicConcentration)
.setCheckpointInterval(params.checkpointInterval)
if (params.checkpointDir.nonEmpty) {
sc.setCheckpointDir(params.checkpointDir.get)
}
val startTime = System.nanoTime()
val ldaModel = lda.run(corpus)
val elapsed = (System.nanoTime() - startTime) / 1e9

println(s"Finished training LDA model.  Summary:")
println(s"\t Training time: \$elapsed sec")

if (ldaModel.isInstanceOf[DistributedLDAModel]) {
val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
val avgLogLikelihood = distLDAModel.logLikelihood / actualCorpusSize.toDouble
println(s"\t Training data average log likelihood: \$avgLogLikelihood")
println()
}

// Print the topics, showing the top-weighted terms for each topic.
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 20)
val topics = topicIndices.map { case (terms, termWeights) =>
terms.zip(termWeights).map { case (term, weight) => (vocabArray(term.toInt), weight) }
}
println(s"\${params.k} topics:")
topics.zipWithIndex.foreach { case (topic, i) =>
println(s"TOPIC \$i")
topic.foreach { case (term, weight) =>
println(s"\$term\t\$weight")
}
println()
}
sc.stop()
}

import org.apache.spark.sql.functions._

/**
* Load documents, tokenize them, create vocabulary, and prepare documents as term count vectors.
*
* @return (corpus, vocabulary as array, total token count in corpus)
*/
def preprocess(
sc: SparkContext,
paths: String,
vocabSize: Int,
stopwordFile: String): (RDD[(Long, Vector)], Array[String], Long) = {

import spark.implicits._
val initialrdd = spark.sparkContext.wholeTextFiles(paths).map(_._2)
initialrdd.cache()
val rdd = initialrdd.mapPartitions { partition =>
val morphology = new Morphology()
partition.map { value =>
LDAHelper.getLemmaText(value, morphology)
}
}.map(LDAHelper.filterSpecialCharacters)
rdd.cache()
initialrdd.unpersist()
val df = rdd.toDF("docs")
val customizedStopWords: Array[String] = if (stopwordFile.isEmpty) {
Array.empty[String]
} else {
val stopWordText = sc.textFile(stopwordFile).collect()
stopWordText.flatMap(_.stripMargin.split(","))
}
//Tokenizing using the RegexTokenizer
val tokenizer = new RegexTokenizer().setInputCol("docs").setOutputCol("rawTokens")

//Removing the Stop-words using the Stop Words remover
val stopWordsRemover = new StopWordsRemover().setInputCol("rawTokens").setOutputCol("tokens")
stopWordsRemover.setStopWords(stopWordsRemover.getStopWords ++ customizedStopWords)

//Converting the Tokens into the CountVector
val countVectorizer = new CountVectorizer().setVocabSize(vocabSize).setInputCol("tokens").setOutputCol("features")

//Setting up the pipeline
val pipeline = new Pipeline().setStages(Array(tokenizer, stopWordsRemover, countVectorizer))

val model = pipeline.fit(df)
val documents = model.transform(df).select("features").rdd.map {
case Row(features: MLVector) => Vectors.fromML(features)
}.zipWithIndex().map(_.swap)

(documents,
model.stages(2).asInstanceOf[CountVectorizerModel].vocabulary, // vocabulary
documents.map(_._2.numActives).sum().toLong) // total token count
}
}

object LDAExample extends App {

val conf = new SparkConf().setAppName(s"LDAExample").setMaster("local[*]").set("spark.executor.memory", "2g")
val spark = SparkSession.builder().config(conf).getOrCreate()
val sc = spark.sparkContext
val lda = new LDAExample(sc, spark)
val defaultParams = Params().copy(input = "src/main/resources/docs/")
lda.run(defaultParams)
}

object LDAHelper {

def filterSpecialCharacters(document: String) = document.replaceAll("""[! @ # \$ % ^ & * ( ) _ + - − , " ' ; : . ` ? --]""", " ")

def getStemmedText(document: String) = {
val morphology = new Morphology()
new Document(document).sentences().toList.flatMap(_.words().toList.map(morphology.stem)).mkString(" ")
}

def getLemmaText(document: String, morphology: Morphology) = {
val string = new StringBuilder()
val value = new Document(document).sentences().toList.flatMap { a =>
val words = a.words().toList
val tags = a.posTags().toList
(words zip tags).toMap.map { a =>
val newWord = morphology.lemma(a._1, a._2)
val addedWoed = if (newWord.length > 3) {
newWord
} else {
""
}
}
}
string.toString()
}
}```

So currentlly, what we are doing is that we are applying the algorithm on just sample books and trying to figure out the topic from the cluster:

So our LDA output looks something like this :

and so on…

We can make this more efficient by tuning in the parameters of LDA, and hence getting a beeter set of related terms.

The next task on which I am working on is that finding the core of the topic i.e assigning the label to the topic by finding the core of this cluster !

If anyone has any comments or any suggestions on how to find out the Topic Label by using the related terms, I would be happy to hear from you. Currently what I have in mind is Finding Coallocations using PMI approach , but for this i didnt found any good package in scala there is one in NLTK in python, but maybe something better can come up.

Any comments or suggestions are welcomed here or on twitter : @shiv4nsh

You can find the code here on my github: @shiv4nsh

References:

7 thoughts on “Spark – LDA : A Complete example of clustering algorithm for topic discovery.9 min read”

1. vitaly says:

I tried to run the code but have a problem with
import edu.stanford.nlp.simple.Document
For Core NLP jar it says edu.stanford.nlp does not have simple package.
What jar did you use for Document?

2. If I want to classify the documents based on the topics, how should I do in spark?

3. Vitaly says:

Regarding “Topic Label by using the related terms”. I am also very interested in labeling topics. Do you assume that you have possible labels in advance or they should be somehow generated?