Custom Transformer in PySpark Pipeline with Cross Validation
Asked Answered
C

0

7

I wrote a custom transformer like it is described here.

When creating a pipeline with my transformer as first step I am able to train a (Logistic Regression) model for classification.

However, when I want to perform cross validation with this pipeline like so:

from pyspark.ml.feature import HashingTF
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

sentenceDataFrame = sqlContext.createDataFrame([
  (1.0, "Hi I heard about Spark"),
  (1.0, "Spark is awesome"),
  (0.0, "But there seems to be a problem"),
  (0.0, "And I don't know why...")
], ["label", "sentence"])

tokenizer = NLTKWordPunctTokenizer(
    inputCol="sentence", outputCol="words",  
    stopwords=set(nltk.corpus.stopwords.words('english')))

hasher = HashingTF(inputCol="words",outputCol="features")
lr = LogisticRegression()

pipeline = Pipeline(stages=[tokenizer,hasher,lr])

paramGrid = ParamGridBuilder().addGrid(lr.regParam, (0.01, 0.1))\
                              .addGrid(lr.tol, (1e-5, 1e-6))\
                              .build()

cv = CrossValidator(estimator=pipeline,
                    estimatorParamMaps=paramGrid,
                    evaluator=BinaryClassificationEvaluator(),
                    numFolds=4)

model = cv.fit(sentenceDataFrame)

I get the following error:

Py4JJavaError: An error occurred while calling o59.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 (TID 3, localhost): org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
    at org.apache.spark.rdd.RDD$$anonfun$zip$1$$anonfun$apply$27$$anon$1.hasNext(RDD.scala:812)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:276)
    at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
    at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

And the Python stacktrace:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-11-780e2ee6bae8> in <module>()
     22                     numFolds=4)
     23 
---> 24 model = cv.fit(sentenceDataFrame)

~/spark-1.4.1-bin-hadoop2.6/python/pyspark/ml/pipeline.pyc in fit(self, dataset, params)
     63                 return self.copy(params)._fit(dataset)
     64             else:
---> 65                 return self._fit(dataset)
     66         else:
     67             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

~/spark-1.4.1-bin-hadoop2.6/python/pyspark/ml/tuning.pyc in _fit(self, dataset)
    220             train = df.filter(~condition)
    221             for j in range(numModels):
--> 222                 model = est.fit(train, epm[j])
    223                 # TODO: duplicate evaluator to take extra params from input
    224                 metric = eva.evaluate(model.transform(validation, epm[j]))

~/spark-1.4.1-bin-hadoop2.6/python/pyspark/ml/pipeline.pyc in fit(self, dataset, params)
     61         elif isinstance(params, dict):
     62             if params:
---> 63                 return self.copy(params)._fit(dataset)
     64             else:
     65                 return self._fit(dataset)

~/spark-1.4.1-bin-hadoop2.6/python/pyspark/ml/pipeline.pyc in _fit(self, dataset)
    196                     dataset = stage.transform(dataset)
    197                 else:  # must be an Estimator
--> 198                     model = stage.fit(dataset)
    199                     transformers.append(model)
    200                     if i < indexOfLastEstimator:

~/spark-1.4.1-bin-hadoop2.6/python/pyspark/ml/pipeline.pyc in fit(self, dataset, params)
     63                 return self.copy(params)._fit(dataset)
     64             else:
---> 65                 return self._fit(dataset)
     66         else:
     67             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

~/spark-1.4.1-bin-hadoop2.6/python/pyspark/ml/wrapper.pyc in _fit(self, dataset)
    129 
    130     def _fit(self, dataset):
--> 131         java_model = self._fit_java(dataset)
    132         return self._create_model(java_model)
    133 

~/spark-1.4.1-bin-hadoop2.6/python/pyspark/ml/wrapper.pyc in _fit_java(self, dataset)
    126         """
    127         self._transfer_params_to_java()
--> 128         return self._java_obj.fit(dataset._jdf)
    129 
    130     def _fit(self, dataset):

~/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    536         answer = self.gateway_client.send_command(command)
    537         return_value = get_return_value(answer, self.gateway_client,
--> 538                 self.target_id, self.name)
    539 
    540         for temp_arg in temp_args:

~/spark-1.4.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    298                 raise Py4JJavaError(
    299                     'An error occurred while calling {0}{1}{2}.\n'.
--> 300                     format(target_id, '.', name), value)
    301             else:
    302                 raise Py4JError(

I worked around this error by transforming my data frame beforehand, i.e. moving my transformer out of the pipeline. But I'd really like to keep all steps inside the processing pipeline, so I can use it when classifying unseen data without any preceding steps and also be able to tune the feature extraction parameters. So any help is appreciated.

Coypu answered 22/9, 2015 at 10:44 Comment(12)
I cannot reproduce the problem. Could you provide full stacktrace? It is not clear where exactly is zip called.Snifter
I edited my question to also include the python stack trace. Does that help?Coypu
A little. Problem is not reproducible on 1.5.0 but indeed fails on 1.4.1. So I guess solution is straightforward - Spark update.Snifter
Well, with Spark 1.5 I do not get that error and the model seems to get fitted. However, I get an 'java.lang.ArrayIndexOutOfBoundsException'. Did you run the code exactly as it is shown here? Additionally 1.5 is not yet on Amazon EMR where I'm going to run my application eventually...Coypu
Yep, I used exactly the same code and both fit and transform seems to work just fine.Snifter
And so did I. I might have to ask a new question for this, if I can't figure out why that happens. Maybe some other dependency?Coypu
Well, there is not even a single working Transformer implemented using pure Python in Spark source, tests are almost non existent and there is no documentation so truth be told it is hard to tell what can go wrong :) It is quite possible I've messed something writing an example.Snifter
If I were I would try [email protected].Snifter
I've answered a question concerning adding a non-existent version of Spark on EMR. Let me try to find it on SO.Entertain
Try to check the part concerning AWS on this question here.Entertain
I think I'll get it running now like I want it to. Thank you both for your help!Coypu
I'm seeing the same issue in 1.6.0 using naive bayes but a similar setup of the pipeline.Farcical

© 2022 - 2024 — McMap. All rights reserved.