Pyspark ML - How to save pipeline and RandomForestClassificationModel
Asked Answered
D

2

8

I unable to save random forest model generated using ml package of python/spark.

>>> rf = RandomForestClassifier(labelCol="label", featuresCol="features")
>>> pipeline = Pipeline(stages=early_stages + [rf])
>>> model = pipeline.fit(trainingData)
>>> model.save("fittedpipeline")

Traceback (most recent call last): File "", line 1, in AttributeError: 'PipelineModel' object has no attribute 'save'

>>> rfModel = model.stages[8]
>>> print(rfModel)

RandomForestClassificationModel (uid=rfc_46c07f6d7ac8) with 20 trees

>> rfModel.save("rfmodel")

Traceback (most recent call last): File "", line 1, in AttributeError: 'RandomForestClassificationModel' object has no attribute 'save'**

Also tried by pass 'sc' as first parameter to save method.

Deach answered 8/7, 2017 at 0:36 Comment(3)
What version of spark are you using ?Capercaillie
I am using spark 1.6.0. Unfortunately I cannot upgrade to higher version due to certain reasons. Do we have some work around to save model in 1.6.0?Deach
Nothing out of the box for <2.0.0 for pyspark.Capercaillie
C
10

The main issue with your code is that you are using a version of Apache Spark prior to 2.0.0. Thus, save isn't available yet for the Pipeline API.

Here is a full example compounded from the official documentation. Let's create our pipeline first:

from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer

# Load and parse the data file, converting it to a DataFrame.
data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
label_indexer = StringIndexer(inputCol="label", outputCol="indexedLabel")
labels = label_indexer.fit(data).labels

# Automatically identify categorical features, and index them.
# Set maxCategories so features with > 4 distinct values are treated as continuous.
feature_indexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4)

early_stages = [label_indexer, feature_indexer]

# Split the data into training and test sets (30% held out for testing)
(train, test) = data.randomSplit([0.7, 0.3])

# Train a RandomForest model.
rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=10)

# Convert indexed labels back to original labels.
label_converter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labels)

# Chain indexers and forest in a Pipeline
pipeline = Pipeline(stages=early_stages + [rf, label_converter])

# Train model. This also runs the indexers.
model = pipeline.fit(train)

You can now save your pipeline:

>>> model.save("/tmp/rf")
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.

You can also save the RF model :

>>> rf_model = model.stages[2]
>>> print(rf_model)
RandomForestClassificationModel (uid=rfc_b368678f4122) with 10 trees
>>> rf_model.save("/tmp/rf_2")
Capercaillie answered 8/7, 2017 at 8:11 Comment(2)
Hi, I'm not really understanding how to set path. If I just do .save() why not save it to current working directory? I've also tried adding full path however no luck.Bifrost
You can save it where ever you want. The path here was not the issue.Capercaillie
N
8

You can save pipelines and models. In case of loading these models, you need to know apriori the kind of model corresponding to each one. For example:

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, VectorIndexer, OneHotEncoder, StringIndexer, OneHotEncoderEstimator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel

df = *YOUR DATAFRAME*
categoricalColumns = ["A", "B", "C"]
stages = []

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol="id_imp", outputCol="label")
stages += [label_stringIdx]

assemblerInputs = [c + "classVec" for c in categoricalColumns]
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

pipeline = Pipeline(stages=stages)

pipelineModel = pipeline.fit(df)
pipelineModel.save("/path")

In the previous case, I saved a Pipeline with different stages. pipelineModel.save("/path")

Now, if you want to use them:

pipelineModel = Pipeline.load("/path")
df = pipelineModel.transform(df)

You can do the same for other cases, like:

cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=2)

(trainingData, testData) = df.randomSplit([0.7, 0.3], seed=100)
cvModel = cv.fit(trainingData)
cvModel.save("/path")
cvM = CrossValidatorModel.load("/path")
predictions2 = cvM.transform(testData)

predictions = cvModel.transform(testData)

In brief, if you want to load the model you need to use the corresponding object.

Narbada answered 8/5, 2018 at 14:7 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.