A sample ML Pipeline for Clustering in Spark

Reading Time: 2 minutes

Often a machine learning task contains several steps such as extracting features out of raw data, creating learning models to train on features and running predictions on trained models, etc.  With the help of the pipeline API provided by Spark, it is easier to combine and tune multiple ML algorithms into a single workflow.

Whats is in the blog?

We will create a sample ML pipeline to extract features out of raw data and apply K-Means Clustering algorithm to group data points.

The code examples used in the blog can be executed on spark-shell running Spark 1.3 or higher.

Basics of Spark ML pipeline API

DataFrames

DataFrame is a Spark SQL datatype which is used as Datasets in ML pipline. A Dataframe allows storing structured data into named columns. A Dataframe can be created from structured data files, Hive tables, external databases, or existing RDDs.

Transformers

A Transformer converts a Dataframe into another Dataframe  with one or more added features to it. e.g. OneHotEncoder transforms a column with a label index into a column of vectored features. Every Transformer has a method transform() which is called to transform a Dataframe into another.

Estimators

An Estimator is a learning algorithms which learns from the training data. Estimators accept a data set to be trained on and produces a model which is a transformer. e.g. K Means is an estimator which accept a training Dataframe and produces a K Means model which is a transformer. Every estimator has method fit() which invoked to learn from the data.

Pipeline

Each pipeline consists of an array of stages where each stage is either an Estimator or a Transformer which operate on Dataframes.

Input Dataset for Pipeline

We will use the following sample Dataframe as our input data. Each row in the Dataframe represents a customer with attributes: email, income and gender.

The aim is to cluster this Dataset into similar groups using K-Means clustering algorithm available in Spark MLlib. The sequence of task involves:

  1. Converting categorical attribute labels into label indexes
  2. Converting categorical label indexes into numerical vectors
  3. Combining all numerical features into a single feature vector
  4. Fitting a K-Means model on the extracted features
  5. Predicting using K-Means model to get clusters for each data row

Creating Pipeline of Tasks:

The following code creates a Pipeline with StringIndexer, OneHotEncoder, VectorAssembler and KMeans as a sequence of stages to accomplish the above mentioned tasks.

 

Pipeline stages and Output:

We have the following pipeline stages generated out of the above code. At each stage, the Dataframe is transformed and becomes the input to the next stage :-

 

Pipeline stages

Written by 

Manish Mishra is Lead Software Consultant, with experience of more than 7 years. His primary development technology was Java. He fell for Scala language and found it innovative and interesting language and fun to code with. He has also co-authored a journal paper titled: Economy Driven Real Time Deadline Based Scheduling. His interests include: learning cloud computing products and technologies, algorithm designing. He finds Books and literature as favorite companions in solitude. He likes stories, Spiritual Fictions and Time Traveling fictions as his favorites.

12 thoughts on “A sample ML Pipeline for Clustering in Spark3 min read

  1. How can I use this library with Spark 2.0 + Scala 2.11.8.

    My SBT file is:

    name := “Simple Project”
    version := “1.0”
    scalaVersion := “2.11.8”
    libraryDependencies ++= Seq(
    “org.apache.spark” % “spark-core_2.11” % “2.0.0”,
    “org.apache.spark” % “spark-sql_2.11” % “2.0.0”,
    “org.apache.spark” % “spark-mllib_2.11” % “2.0.0”,
    “knoldus” % “k-means-pipeline” % “0.0.1”
    )

    Its giving me following exception:

    org.scalatest:scalatest _2.11, _2.10
    java.lang.RuntimeException: Conflicting cross-version suffixes in: org.scalatest:scalatest
    at scala.sys.package$.error(package.scala:27)
    at sbt.ConflictWarning$.processCrossVersioned(ConflictWarning.scala:46)
    at sbt.ConflictWarning$.apply(ConflictWarning.scala:32)
    at sbt.Classpaths$$anonfun$69.apply(Defaults.scala:1222)
    at sbt.Classpaths$$anonfun$69.apply(Defaults.scala:1219)
    at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
    at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
    at sbt.std.Transform$$anon$4.work(System.scala:63)
    at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
    at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
    at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
    at sbt.Execute.work(Execute.scala:237)
    at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
    at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
    at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
    at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

  2. I have explored a lot for above issue. This is because k-means-pipeline is prepared/compiled in Scala version: 2.10. We can not use it with Scala version 2.11.

    So we need to add the dependencies of Scala 2.10 as below and its working fine with this:

    name := “Simple Project”
    version := “1.0”
    scalaVersion := “2.10.5”
    libraryDependencies ++= Seq(
    “org.apache.spark” % “spark-core_2.10” % “1.6.2”,
    “org.apache.spark” % “spark-sql_2.10” % “1.6.2”,
    “org.apache.spark” % “spark-mllib_2.10” % “1.6.2”,
    “knoldus” % “k-means-pipeline” % “0.0.1”
    )

    Correct me if I am wrong.

    Thanks,
    Balkrushn.

  3. Due to some problems, editor is not showing the code. Here is what I found as code snippet
    val input = sqlContext.createDataFrame(Seq(
    (“a@email.com”, 12000,”M”),
    (“b@email.com”, 43000,”M”),
    (“c@email.com”, 5000,”F”),
    (“d@email.com”, 60000,”M”)
    )).toDF(“email”, “income”,”gender”)

    import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
    import org.apache.spark.ml.feature.VectorAssembler
    import org.apache.spark.ml.clustering.KMeans
    import org.apache.spark.ml.Pipeline

    val indexer = new StringIndexer().setInputCol(“gender”).setOutputCol(“genderIndex”)
    val encoder = new OneHotEncoder().setInputCol(“genderIndex”).setOutputCol(“genderVec”)
    val assembler = new VectorAssembler().setInputCols(Array(“income”,”genderVec”)).setOutputCol(“features”)
    val kmeans = new KMeans().setK(2).setFeaturesCol(“features”).setPredictionCol(“prediction”)

    val pipeline = new Pipeline().setStages(Array(indexer, encoder, assembler, kmeans))

    val kMeansPredictionModel = pipeline.fit(input)

    val predictionResult = kMeansPredictionModel.transform(input)

  4. How could this code be extended to the case where both email and gender need to be transformed? Generally I am looking for applying this method to more than one categorical variable for clustering.

  5. Thanks Manish. Do you have an example of doing this steps but with training phase (KMeans.train) and cost calculation (KMeansModel.computeCost) so that one can iterate through and find the best value of K. Thanks in advance. I am working with PySpark 1.6.2 fyi.

  6. Another question if you could perhaps flesh out: How to get cluster centers from kMeansPredictionModel in their original scale (meaning something like (2000, Male)) NOT in converted OHE ones? Appreciate

  7. Can a pipeline be made only of transformers (and no estimator), and if so, how? The output of all pipeline.fit() seems to be an estimator object even though none of my stages involved an estimator (only Transformers). How do I get a transformed data upon applying my pipeline [T1 T2 T3] on new instances of a dataset

Comments are closed.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading