ML Pipelines and PipelineStages (spark.ml)
ML Pipeline API (aka Spark ML or spark.ml due to the package the API lives in) lets Spark users quickly and easily assemble and configure practical distributed Machine Learning pipelines (aka workflows) by standardizing the APIs for different Machine Learning concepts.
The ML Pipeline API is a new DataFrame-based API developed under
org.apache.spark.ml package and is the primary API for MLlib as of Spark 2.0.
The previous RDD-based API under
The key concepts of Pipeline API (aka spark.ml Components):
Use of a machine learning algorithm is only one component of a predictive analytic workflow. There can also be additional pre-processing steps for the machine learning algorithm to work.
|While a RDD computation in Spark Core, a Dataset manipulation in Spark SQL, a continuous DStream computation in Spark Streaming are the main data abstractions a ML Pipeline is in Spark MLlib.|
A typical standard machine learning workflow is as follows:
Loading data (aka data ingestion)
Extracting features (aka feature extraction)
Training model (aka model training)
Evaluate (or predictionize)
You may also think of two additional steps before the final model becomes production ready and hence of any use:
Testing model (aka model testing)
Selecting the best model (aka model selection or model tuning)
Deploying model (aka model deployment and integration)
|The Pipeline API lives under org.apache.spark.ml package.|
Given the Pipeline Components, a typical machine learning pipeline is as follows:
You use a collection of
Transformerinstances to prepare input
DataFrame- the dataset with proper input data (in columns) for a chosen ML algorithm.
You then fit (aka build) a
Modelyou can calculate predictions (in
featuresinput column through DataFrame transformation.
Example: In text classification, preprocessing steps like n-gram extraction, and TF-IDF feature weighting are often necessary before training of a classification model like an SVM.
Upon deploying a model, your system must not only know the SVM weights to apply to input features, but also transform raw data into the format the model is trained on.
Pipeline for text categorization
Pipeline for image classification
Pipelines are like a query plan in a database system.
Components of ML Pipeline:
Pipeline Construction Framework – A DSL for the construction of pipelines that includes concepts of Nodes and Pipelines.
Nodes are data transformation steps (Transformers)
Pipelines are a DAG of Nodes.
Pipelines become objects that can be saved out and applied in real-time to new data.
It can help creating domain-specific feature transformers, general purpose transformers, statistical utilities and nodes.
You could eventually
load machine learning components as described in Persisting Machine Learning Components.
|A machine learning component is any object that belongs to Pipeline API, e.g. Pipeline, LinearRegressionModel, etc.|
The features of the Pipeline API in Spark MLlib:
pipeline: DataFrame =[fit]=> DataFrame (using transformers and estimators)
A pipeline is represented by Pipeline class.
Pipeline is also an Estimator (so it is acceptable to set up a
Pipeline with other
Pipeline object can
load pipelines (refer to Persisting Machine Learning Components page).
read: MLReader[Pipeline] load(path: String): Pipeline
You can create a
Pipeline with an optional
uid identifier. It is of the format
pipeline_[randomUid] when unspecified.
val pipeline = new Pipeline() scala> println(pipeline.uid) pipeline_94be47c3b709 val pipeline = new Pipeline("my_pipeline") scala> println(pipeline.uid) my_pipeline
uid is used to create an instance of PipelineModel to return from
fit(dataset: DataFrame): PipelineModel method.
scala> val pipeline = new Pipeline("my_pipeline") pipeline: org.apache.spark.ml.Pipeline = my_pipeline scala> val df = (0 to 9).toDF("num") df: org.apache.spark.sql.DataFrame = [num: int] scala> val model = pipeline.setStages(Array()).fit(df) model: org.apache.spark.ml.PipelineModel = my_pipeline
stages mandatory parameter can be set using
setStages(value: Array[PipelineStage]): this.type method.
fit(dataset: DataFrame): PipelineModel
fit method returns a PipelineModel that holds a collection of
Transformer objects that are results of
Estimator.fit method for every
Estimator in the Pipeline (with possibly-modified
dataset) or simply input
Transformer objects. The input
dataset DataFrame is passed to
transform for every
Transformer instance in the Pipeline.
It first transforms the schema of the input
It then searches for the index of the last
Estimator to calculate Transformers for
Estimator and simply return
Transformer back up to the index in the pipeline. For each
fit method is called with the input
dataset. The result DataFrame is passed to the next
Transformer in the chain.
transform method is called for every
Transformer calculated but the last one (that is the result of executing
fit on the last
The calculated Transformers are collected.
After the last
Estimator there can only be
The method returns a
uid and transformers. The parent
Estimator is the
PipelineStage has the following direct implementations (of which few are abstract classes, too):
PipelineStage transforms schema using
transformSchema family of methods:
transformSchema(schema: StructType): StructType transformSchema(schema: StructType, logging: Boolean): StructType
|StructType describes a schema of a DataFrame.|
(video) Apache Spark MLlib 2 0 Preview: Data Science and Production by Joseph K. Bradley (Databricks)