A sample ML Pipeline for Clustering in Spark


Often a machine learning task contains several steps such as extracting features out of raw data, creating learning models to train on features and running predictions on trained models, etc.  With the help of the pipeline API provided by Spark, it is easier to combine and tune multiple ML algorithms into a single workflow.

Whats is in the blog?

We will create a sample ML pipeline to extract features out of raw data and apply K-Means Clustering algorithm to group data points.

The code examples used in the blog can be executed on spark-shell running Spark 1.3 or higher.

Basics of Spark ML pipeline API

DataFrames

DataFrame is a Spark SQL datatype which is used as Datasets in ML pipline. A Dataframe allows storing structured data into named columns. A Dataframe can be created from structured data files, Hive tables, external databases, or existing RDDs.

Transformers

A Transformer converts a Dataframe into another Dataframe  with one or more added features to it. e.g. OneHotEncoder transforms a column with a label index into a column of vectored features. Every Transformer has a method transform() which is called to transform a Dataframe into another.

Estimators

An Estimator is a learning algorithms which learns from the training data. Estimators accept a data set to be trained on and produces a model which is a transformer. e.g. K Means is an estimator which accept a training Dataframe and produces a K Means model which is a transformer. Every estimator has method fit() which invoked to learn from the data.

Pipeline

Each pipeline consists of an array of stages where each stage is either an Estimator or a Transformer which operate on Dataframes.

Input Dataset for Pipeline

We will use the following sample Dataframe as our input data. Each row in the Dataframe represents a customer with attributes: email, income and gender.

val input = sqlContext.createDataFrame(Seq(
 ("a@email.com", 12000,"M"),
 ("b@email.com", 43000,"M"),
 ("c@email.com", 5000,"F"),
 ("d@email.com", 60000,"M")
)).toDF("email", "income","gender")

The aim is to cluster this Dataset into similar groups using K-Means clustering algorithm available in Spark MLlib. The sequence of task involves:

  1. Converting categorical attribute labels into label indexes
  2. Converting categorical label indexes into numerical vectors
  3. Combining all numerical features into a single feature vector
  4. Fitting a K-Means model on the extracted features
  5. Predicting using K-Means model to get clusters for each data row

Creating Pipeline of Tasks:

The following code creates a Pipeline with StringIndexer, OneHotEncoder, VectorAssembler and KMeans as a sequence of stages to accomplish the above mentioned tasks.

import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.Pipeline

val indexer = new StringIndexer().setInputCol("gender").setOutputCol("genderIndex")
val encoder = new OneHotEncoder().setInputCol("genderIndex").setOutputCol("genderVec")
val assembler = new VectorAssembler().setInputCols(Array("income","genderVec")).setOutputCol("features")
val kmeans = new KMeans().setK(2).setFeaturesCol("features").setPredictionCol("prediction")

val pipeline = new Pipeline().setStages(Array(indexer, encoder, assembler, kmeans))

val kMeansPredictionModel = pipeline.fit(input)

val predictionResult = kMeansPredictionModel.transform(input)

 

Pipeline stages and Output:

We have the following pipeline stages generated out of the above code. At each stage, the Dataframe is transformed and becomes the input to the next stage :-

 

Pipeline stages


											

About Manish Mishra

Manish is a Scala Developer at Knoldus Software LLP. He loves to learn and share about Functional Programming, Scala, Akka, Spark.
This entry was posted in apache spark, big data, Scala, Spark and tagged , , , , . Bookmark the permalink.

11 Responses to A sample ML Pipeline for Clustering in Spark

  1. Pingback: A sample ML Pipeline for Clustering in Spark | knoldermanish

  2. vbalkrushn says:

    How can I use this library with Spark 2.0 + Scala 2.11.8.

    My SBT file is:

    name := “Simple Project”
    version := “1.0”
    scalaVersion := “2.11.8”
    libraryDependencies ++= Seq(
    “org.apache.spark” % “spark-core_2.11” % “2.0.0”,
    “org.apache.spark” % “spark-sql_2.11” % “2.0.0”,
    “org.apache.spark” % “spark-mllib_2.11” % “2.0.0”,
    “knoldus” % “k-means-pipeline” % “0.0.1”
    )

    Its giving me following exception:

    org.scalatest:scalatest _2.11, _2.10
    java.lang.RuntimeException: Conflicting cross-version suffixes in: org.scalatest:scalatest
    at scala.sys.package$.error(package.scala:27)
    at sbt.ConflictWarning$.processCrossVersioned(ConflictWarning.scala:46)
    at sbt.ConflictWarning$.apply(ConflictWarning.scala:32)
    at sbt.Classpaths$$anonfun$69.apply(Defaults.scala:1222)
    at sbt.Classpaths$$anonfun$69.apply(Defaults.scala:1219)
    at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
    at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
    at sbt.std.Transform$$anon$4.work(System.scala:63)
    at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
    at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
    at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
    at sbt.Execute.work(Execute.scala:237)
    at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
    at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
    at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
    at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

  3. vbalkrushn says:

    I have explored a lot for above issue. This is because k-means-pipeline is prepared/compiled in Scala version: 2.10. We can not use it with Scala version 2.11.

    So we need to add the dependencies of Scala 2.10 as below and its working fine with this:

    name := “Simple Project”
    version := “1.0”
    scalaVersion := “2.10.5”
    libraryDependencies ++= Seq(
    “org.apache.spark” % “spark-core_2.10” % “1.6.2”,
    “org.apache.spark” % “spark-sql_2.10” % “1.6.2”,
    “org.apache.spark” % “spark-mllib_2.10” % “1.6.2”,
    “knoldus” % “k-means-pipeline” % “0.0.1”
    )

    Correct me if I am wrong.

    Thanks,
    Balkrushn.

  4. Piotrek says:

    Where can I find sample code?

  5. kurts73 says:

    Where can I find code samples for this tutorial?

  6. Due to some problems, editor is not showing the code. Here is what I found as code snippet
    val input = sqlContext.createDataFrame(Seq(
    (“a@email.com”, 12000,”M”),
    (“b@email.com”, 43000,”M”),
    (“c@email.com”, 5000,”F”),
    (“d@email.com”, 60000,”M”)
    )).toDF(“email”, “income”,”gender”)

    import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
    import org.apache.spark.ml.feature.VectorAssembler
    import org.apache.spark.ml.clustering.KMeans
    import org.apache.spark.ml.Pipeline

    val indexer = new StringIndexer().setInputCol(“gender”).setOutputCol(“genderIndex”)
    val encoder = new OneHotEncoder().setInputCol(“genderIndex”).setOutputCol(“genderVec”)
    val assembler = new VectorAssembler().setInputCols(Array(“income”,”genderVec”)).setOutputCol(“features”)
    val kmeans = new KMeans().setK(2).setFeaturesCol(“features”).setPredictionCol(“prediction”)

    val pipeline = new Pipeline().setStages(Array(indexer, encoder, assembler, kmeans))

    val kMeansPredictionModel = pipeline.fit(input)

    val predictionResult = kMeansPredictionModel.transform(input)

  7. Isaac says:

    How could this code be extended to the case where both email and gender need to be transformed? Generally I am looking for applying this method to more than one categorical variable for clustering.

  8. Isaac says:

    Thanks Manish. Do you have an example of doing this steps but with training phase (KMeans.train) and cost calculation (KMeansModel.computeCost) so that one can iterate through and find the best value of K. Thanks in advance. I am working with PySpark 1.6.2 fyi.

  9. Isaac says:

    Another question if you could perhaps flesh out: How to get cluster centers from kMeansPredictionModel in their original scale (meaning something like (2000, Male)) NOT in converted OHE ones? Appreciate

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s