Often a machine learning task contains several steps such as extracting features out of raw data, creating learning models to train on features and running predictions on trained models, etc. With the help of the pipeline API provided by Spark, it is easier to combine and tune multiple ML algorithms into a single workflow.
Whats is in the blog?
We will create a sample ML pipeline to extract features out of raw data and apply K-Means Clustering algorithm to group data points.
The code examples used in the blog can be executed on spark-shell running Spark 1.3 or higher.
Basics of Spark ML pipeline API
DataFrames
DataFrame is a Spark SQL datatype which is used as Datasets in ML pipline. A Dataframe allows storing structured data into named columns. A Dataframe can be created from structured data files, Hive tables, external databases, or existing RDDs.
Transformers
A Transformer converts a Dataframe into another Dataframe with one or more added features to it. e.g. OneHotEncoder transforms a column with a label index into a column of vectored features. Every Transformer has a method transform() which is called to transform a Dataframe into another.
Estimators
An Estimator is a learning algorithms which learns from the training data. Estimators accept a data set to be trained on and produces a model which is a transformer. e.g. K Means is an estimator which accept a training Dataframe and produces a K Means model which is a transformer. Every estimator has method fit() which invoked to learn from the data.
Pipeline
Each pipeline consists of an array of stages where each stage is either an Estimator or a Transformer which operate on Dataframes.
Input Dataset for Pipeline
We will use the following sample Dataframe as our input data. Each row in the Dataframe represents a customer with attributes: email, income and gender.
The aim is to cluster this Dataset into similar groups using K-Means clustering algorithm available in Spark MLlib. The sequence of task involves:
- Converting categorical attribute labels into label indexes
- Converting categorical label indexes into numerical vectors
- Combining all numerical features into a single feature vector
- Fitting a K-Means model on the extracted features
- Predicting using K-Means model to get clusters for each data row
Creating Pipeline of Tasks:
The following code creates a Pipeline with StringIndexer, OneHotEncoder, VectorAssembler and KMeans as a sequence of stages to accomplish the above mentioned tasks.
Pipeline stages and Output:
We have the following pipeline stages generated out of the above code. At each stage, the Dataframe is transformed and becomes the input to the next stage :-

Reblogged this on himanshu2014.
How can I use this library with Spark 2.0 + Scala 2.11.8.
My SBT file is:
name := “Simple Project”
version := “1.0”
scalaVersion := “2.11.8”
libraryDependencies ++= Seq(
“org.apache.spark” % “spark-core_2.11” % “2.0.0”,
“org.apache.spark” % “spark-sql_2.11” % “2.0.0”,
“org.apache.spark” % “spark-mllib_2.11” % “2.0.0”,
“knoldus” % “k-means-pipeline” % “0.0.1”
)
Its giving me following exception:
org.scalatest:scalatest _2.11, _2.10
java.lang.RuntimeException: Conflicting cross-version suffixes in: org.scalatest:scalatest
at scala.sys.package$.error(package.scala:27)
at sbt.ConflictWarning$.processCrossVersioned(ConflictWarning.scala:46)
at sbt.ConflictWarning$.apply(ConflictWarning.scala:32)
at sbt.Classpaths$$anonfun$69.apply(Defaults.scala:1222)
at sbt.Classpaths$$anonfun$69.apply(Defaults.scala:1219)
at scala.Function1$$anonfun$compose$1.apply(Function1.scala:47)
at sbt.$tilde$greater$$anonfun$$u2219$1.apply(TypeFunctions.scala:40)
at sbt.std.Transform$$anon$4.work(System.scala:63)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
at sbt.Execute$$anonfun$submit$1$$anonfun$apply$1.apply(Execute.scala:228)
at sbt.ErrorHandling$.wideConvert(ErrorHandling.scala:17)
at sbt.Execute.work(Execute.scala:237)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
at sbt.Execute$$anonfun$submit$1.apply(Execute.scala:228)
at sbt.ConcurrentRestrictions$$anon$4$$anonfun$1.apply(ConcurrentRestrictions.scala:159)
at sbt.CompletionService$$anon$2.call(CompletionService.scala:28)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
I have explored a lot for above issue. This is because k-means-pipeline is prepared/compiled in Scala version: 2.10. We can not use it with Scala version 2.11.
So we need to add the dependencies of Scala 2.10 as below and its working fine with this:
name := “Simple Project”
version := “1.0”
scalaVersion := “2.10.5”
libraryDependencies ++= Seq(
“org.apache.spark” % “spark-core_2.10” % “1.6.2”,
“org.apache.spark” % “spark-sql_2.10” % “1.6.2”,
“org.apache.spark” % “spark-mllib_2.10” % “1.6.2”,
“knoldus” % “k-means-pipeline” % “0.0.1”
)
Correct me if I am wrong.
Thanks,
Balkrushn.
Where can I find sample code?
Where can I find code samples for this tutorial?
Due to some problems, editor is not showing the code. Here is what I found as code snippet
val input = sqlContext.createDataFrame(Seq(
(“a@email.com”, 12000,”M”),
(“b@email.com”, 43000,”M”),
(“c@email.com”, 5000,”F”),
(“d@email.com”, 60000,”M”)
)).toDF(“email”, “income”,”gender”)
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.Pipeline
val indexer = new StringIndexer().setInputCol(“gender”).setOutputCol(“genderIndex”)
val encoder = new OneHotEncoder().setInputCol(“genderIndex”).setOutputCol(“genderVec”)
val assembler = new VectorAssembler().setInputCols(Array(“income”,”genderVec”)).setOutputCol(“features”)
val kmeans = new KMeans().setK(2).setFeaturesCol(“features”).setPredictionCol(“prediction”)
val pipeline = new Pipeline().setStages(Array(indexer, encoder, assembler, kmeans))
val kMeansPredictionModel = pipeline.fit(input)
val predictionResult = kMeansPredictionModel.transform(input)
How could this code be extended to the case where both email and gender need to be transformed? Generally I am looking for applying this method to more than one categorical variable for clustering.
Hi Isaac,
Actually you can set n number of categorical features. Please look at this plugin: https://github.com/knoldus/k-means-pipeline and let me know.
Thanks Manish. Do you have an example of doing this steps but with training phase (KMeans.train) and cost calculation (KMeansModel.computeCost) so that one can iterate through and find the best value of K. Thanks in advance. I am working with PySpark 1.6.2 fyi.
Another question if you could perhaps flesh out: How to get cluster centers from kMeansPredictionModel in their original scale (meaning something like (2000, Male)) NOT in converted OHE ones? Appreciate
Can a pipeline be made only of transformers (and no estimator), and if so, how? The output of all pipeline.fit() seems to be an estimator object even though none of my stages involved an estimator (only Transformers). How do I get a transformed data upon applying my pipeline [T1 T2 T3] on new instances of a dataset