How to force DataFrame evaluation in Spark
Asked Answered
C

4

27

Sometimes (e.g. for testing and bechmarking) I want force the execution of the transformations defined on a DataFrame. AFAIK calling an action like count does not ensure that all Columns are actually computed, show may only compute a subset of all Rows (see examples below)

My solution is to write the DataFrame to HDFS using df.write.saveAsTable, but this "clutters" my system with tables I don't want to keep any further.

So what is the best way to trigger the evaluation of a DataFrame?

Edit:

Note that there is also a recent discussion on the spark developer list : http://apache-spark-developers-list.1001551.n3.nabble.com/Will-count-always-trigger-an-evaluation-of-each-row-td21018.html

I made a small example which shows that count on DataFrame does not evaluate everything (tested using Spark 1.6.3 and spark-master = local[2]):

val df = sc.parallelize(Seq(1)).toDF("id")
val myUDF = udf((i:Int) => {throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).count // runs fine
df.withColumn("test",myUDF($"id")).show() // gives Exception

Using the same logic, here an example that show does not evaluate all rows:

val df = sc.parallelize(1 to 10).toDF("id")
val myUDF = udf((i:Int) => {if(i==10) throw new RuntimeException;i})

df.withColumn("test",myUDF($"id")).show(5) // runs fine
df.withColumn("test",myUDF($"id")).show(10) // gives Exception

Edit 2 : For Eliasah: The Exception says this:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 6.0 failed 1 times, most recent failure: Lost task 0.0 in stage 6.0 (TID 6, localhost): java.lang.RuntimeException
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply$mcII$sp(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:68)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
    at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
.
.
.
.

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858)
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212)
    at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1500)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56)
    at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2087)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1499)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1506)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1376)
    at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2100)
    at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1375)
    at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1457)
    at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:170)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:350)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:311)
    at org.apache.spark.sql.DataFrame.show(DataFrame.scala:319)
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
.
.
.
.
Capping answered 10/3, 2017 at 9:4 Comment(11)
My answer here will help you as your assumption is wrong #31384404Pawnshop
Secondly I'm not sure that I understand your problem : "this "clutters" my system with tables I don't want to keep any further." What does that mean ?Pawnshop
Do you need unit testing on DataFrame transformations ?Getaway
@Pawnshop I would prefer calling an action on my dataframe which has no side-effects, writing hive-tables is a (unwanted) side-effect, thats what I meant with cluttering.Capping
But count doesn't have side effect. Your assumption is wrong Raphael. count insures that all columns are computed.Pawnshop
@Pawnshop Can you elaborate which of my assumption is wrong, the answer you linked does not really relate to my question, I'm well aware of the difference between transformations and actions.Capping
@Pawnshop according to this discussion (apache-spark-developers-list.1001551.n3.nabble.com/…, see post of Matei Zaharia), count on dataframe seems not to evaluate all columns. But I'm happy if this is not trueCapping
Ok I withdraw then my claim. I'm sorry about that. Let me look back to it. The definition given by Matei is kind a misleading I believe with all due respect for himPawnshop
I'm sorry, I don't believe that what he says is correct neither.Pawnshop
Let us continue this discussion in chat.Pawnshop
The discussion link that you have given is very interesting.Pawnshop
N
21

I guess simply getting an underlying rdd from DataFrame and triggering an action on it should achieve what you're looking for.

df.withColumn("test",myUDF($"id")).rdd.count // this gives proper exceptions
Nowise answered 10/3, 2017 at 13:18 Comment(1)
This is weird. Looks like this answer worked for the original question but someone still managed to give couple of downvotes without mentioning any reason or feedback. :)Nowise
O
24

It's a bit late, but here's the fundamental reason: count does not act the same on RDD and DataFrame.

In DataFrames there's an optimization, as in some cases you do not require to load data to actually know the number of elements it has (especially in the case of yours where there's no data shuffling involved). Hence, the DataFrame materialized when count is called will not load any data and will not pass into your exception throwing. You can easily do the experiment by defining your own DefaultSource and Relation and see that calling count on a DataFrame will always end up in the method buildScan with no requiredColumns no matter how many columns you did select (cf. org.apache.spark.sql.sources.interfaces to understand more). It's actually a very efficient optimization ;-)

In RDDs though, there's no such optimizations (that's why one should always try to use DataFrames when possible). Hence the count on RDD executes all the lineage and returns the sum of all sizes of the iterators composing any partitions.

Calling dataframe.count goes into the first explanation, but calling dataframe.rdd.count goes into the second as you did build an RDD out of your DataFrame. Note that calling dataframe.cache().count forces the dataframe to be materialized as you required Spark to cache the results (hence it needs to load all the data and transform it). But it does have the side-effect of caching your data...

Oppilate answered 10/8, 2017 at 8:57 Comment(1)
Isn't cache a suggestion to spark rather than forcing it to materialize?Surprisal
N
21

I guess simply getting an underlying rdd from DataFrame and triggering an action on it should achieve what you're looking for.

df.withColumn("test",myUDF($"id")).rdd.count // this gives proper exceptions
Nowise answered 10/3, 2017 at 13:18 Comment(1)
This is weird. Looks like this answer worked for the original question but someone still managed to give couple of downvotes without mentioning any reason or feedback. :)Nowise
G
4

It appears that df.cache.count is the way to go:

scala> val myUDF = udf((i:Int) => {if(i==1000) throw new RuntimeException;i})
myUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(IntegerType)))

scala> val df = sc.parallelize(1 to 1000).toDF("id")
df: org.apache.spark.sql.DataFrame = [id: int]

scala> df.withColumn("test",myUDF($"id")).show(10)
[rdd_51_0]
+---+----+
| id|test|
+---+----+
|  1|   1|
|  2|   2|
|  3|   3|
|  4|   4|
|  5|   5|
|  6|   6|
|  7|   7|
|  8|   8|
|  9|   9|
| 10|  10|
+---+----+
only showing top 10 rows

scala> df.withColumn("test",myUDF($"id")).count
res13: Long = 1000

scala> df.withColumn("test",myUDF($"id")).cache.count
org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (int) => int)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
.
.
.
Caused by: java.lang.RuntimeException

Source

Gq answered 10/3, 2017 at 18:15 Comment(2)
although this seems to work, it has a side-effect (i.e. the caching)Capping
spark does not always have to execute everything you may intend it to in order to perform count, this will sometimes workNormie
N
3

I prefer to use df.save.parquet(). This does add disc I/o time that you can estimate and subtract out later, but you are positive that spark performed each step you expected and did not trick you with lazy evaluation.

Normie answered 21/9, 2018 at 20:52 Comment(2)
How can I estimate the disc I/o time Dan? Also, will collect function work? Or will it also add some overhead time?Katsuyama
I assume you have a "df" and you are doing "X" actions it. To get the baseline time, just do df.save.parquet(), and time that. Then do you methods on df, and do df_prime.save.parquet(). The first one will let you know how long it takes to simply save the df.Normie

© 2022 - 2024 — McMap. All rights reserved.