Spark ML Pipeline with RandomForest takes too long on 20MB dataset
Asked Answered
R

1

10

I am using Spark ML to run some ML experiments, and on a small dataset of 20MB (Poker dataset) and a Random Forest with parameter grid, it takes 1h and 30 minutes to finish. Similarly with scikit-learn it takes much much less.

In terms of environment, I was testing with 2 slaves, 15GB memory each, 24 cores. I assume it was not supposed to take that long and I am wondering if the problem lies within my code, since I am fairly new to Spark.

Here it is:

df = pd.read_csv(http://archive.ics.uci.edu/ml/machine-learning-databases/poker/poker-hand-testing.data)
dataframe = sqlContext.createDataFrame(df)

train, test = dataframe.randomSplit([0.7, 0.3])

columnTypes = dataframe.dtypes

for ct in columnTypes:
    if ct[1] == 'string' and ct[0] != 'label':
        categoricalCols += [ct[0]]
    elif ct[0] != 'label':
        numericCols += [ct[0]]

stages = []

for categoricalCol in categoricalCols:

    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol+"Index")

stages += [stringIndexer]

assemblerInputs = map(lambda c: c + "Index", categoricalCols) + numericCols

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

stages += [assembler]

labelIndexer = StringIndexer(inputCol='label', outputCol='indexedLabel', handleInvalid='skip')

stages += [labelIndexer]

estimator = RandomForestClassifier(labelCol="indexedLabel", featuresCol="features")

stages += [estimator]

parameters = {"maxDepth" : [3, 5, 10, 15], "maxBins" : [6, 12, 24, 32], "numTrees" : [3, 5, 10]}

paramGrid = ParamGridBuilder()
for key, value in parameters.iteritems():
    paramGrid.addGrid(estimator.getParam(key), value)
estimatorParamMaps = (paramGrid.build())

pipeline = Pipeline(stages=stages)

crossValidator = CrossValidator(estimator=pipeline, estimatorParamMaps=estimatorParamMaps, evaluator=MulticlassClassificationEvaluator(labelCol='indexedLabel', predictionCol='prediction', metricName='f1'), numFolds=3)

pipelineModel = crossValidator.fit(train)

predictions = pipelineModel.transform(test)

evaluator = pipeline.getEvaluator().evaluate(predictions)

Thanks in advance, any comments/suggestions are highly appreciated :)

Rog answered 2/7, 2017 at 0:27 Comment(8)
Cross validation is a heavy and long tasks as it's proportional to the combination of your 3 hyper-parameters times the number of folds times the time spent to train each model. You might want to cache your data per example for a start but it will still not gain you much time. I believe that spark is an overkill for this amount of data. You might want to use scikit learn instead and maybe use github.com/databricks/spark-sklearn to distributed local model trainingRhyton
hi @Rhyton thank you for your comment. in fact, I am doing just that with spark-sklearn and getting good results. however, I just wanted to compare execution time between sklearn and spark, but these numbers seemed odd to me, since while one takes seconds the other takes hoursRog
Because spark will learn each model separately and sequentially with the hypothesis that data is distributed and big.Rhyton
There is a huge overhead in using spark on small data but it would be just interesting to distributed the CV and not the model training in your case.Rhyton
ok, thanks, I will try with over 15GB datasets to try to compare. sorry for the naive question, but does it have to be in HDFS or it doesn't make a difference as long as I read it with a Spark's DataFrame?Rog
Well you can benefit in performance when you read optimized file formats like parquet. Also tuning spark itself but it's too broad to talk about it here.Rhyton
@Rhyton I will check parquet, then, thanks. I tried with 7 slaves and the execution time even went up a few minutes. Is this really possible?Rog
Yes. It's possible.Rhyton
R
10

The following may not solve your problem completely but it should give you some pointer to start.

The first problem that you are facing is the disproportion between the amount of data and the resources.

This means that since you are parallelizing a local collection (pandas dataframe), Spark will use the default parallelism configuration. Which is most likely to be resulting in 48 partitions with less than 0.5mb per partition. (Spark doesn't do well with small files nor small partitions)

The second problem is related to expensive optimizations/approximations techniques used by Tree models in Spark.

Spark tree models use some tricks to optimally bucket continuous variables. With small data it is way cheaper to just get the exact splits. It mainly uses approximated quantiles in this case.

Usually, in a single machine framework scenario, like scikit, the tree model uses unique feature values for continuous features as splits candidates for the best fit calculation. Whereas in Apache Spark, the tree model uses quantiles for each feature as a split candidate.

Also to add that you shouldn't forget as well that cross validation is a heavy and long tasks as it's proportional to the combination of your 3 hyper-parameters times the number of folds times the time spent to train each model (GridSearch approach). You might want to cache your data per example for a start but it will still not gain you much time. I believe that spark is an overkill for this amount of data. You might want to use scikit learn instead and maybe use spark-sklearn to distributed local model training.

Spark will learn each model separately and sequentially with the hypothesis that data is distributed and big.

You can of course optimize performance using columnar data based file formats like parquet and tuning spark itself, etc. it's too broad to talk about it here.

You can read more about tree models scalability with spark-mllib in this following blogpost :

Rhyton answered 4/7, 2017 at 12:11 Comment(2)
how might columnar based file formats like parquet help performance? Or how might Parquet help specifically. I'm using parquet with the Scala Spark API, but only with the advantage that I can save my data's types, use less of a memory footprint, and have the data partitioned. I haven't thought about how might Parquet help for iterative algorithms. Only thought it was useful for IO, but I know I'm wrongAramanta
@Angelito The underlying data serialization with parquet will improve shuffling time for partitions because it “slims down” the data structureRhyton

© 2022 - 2024 — McMap. All rights reserved.