The pipeline object in sklearn makes it easy to sequentially apply a list of transforms and a final estimator [1]. This makes it easy to organize models. PySpark has a similar pipeline API but there are some differences.
.fit()
and
.transform()
method, they work in the same way in
sklearn but on the pyspark.sql.DataFrame
, but there
are no .predict()
method.X
but you can do with a
subset..transform()
method
on a fitted model object, which adds extra column(s) based on
the model.Vector
object, You
usually use the VectorAssembler
in the first step
of the pipeline to assemble input features into this
object.Sample code snippet:
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
= VectorAssembler(
assembler =feature_cols,
inputCols="features"
outputCol
)
= RandomForestRegressor(
rf ="features",
featuresCol=response_variable,
labelCol="prediction",
predictionCol**parameters
)
= Pipeline(stages=[assembler, rf])
pipeline = pipeline.fit(train)
model = model.predict(test) # This will add a `prediction` column AND a `features` column to test.
test_with_predictions
# Save the model for later use.
'/tmp/my_great_model')
model.write().overwrite().save(
# Load the model later.
= PipelineModel.load('/tmp/my_great_model') model
The added features
column is added from the
VectorAssembler
.
Feel free to comment here below. A Github account is required.