Spark SQL: Why two jobs for one query?
Asked Answered
V

1

17

Experiment

I tried the following snippet on Spark 1.6.1.

val soDF = sqlContext.read.parquet("/batchPoC/saleOrder") # This has 45 files
soDF.registerTempTable("so")
sqlContext.sql("select dpHour, count(*) as cnt from so group by dpHour order by cnt").write.parquet("/out/")

The Physical Plan is:

== Physical Plan ==
Sort [cnt#59L ASC], true, 0
+- ConvertToUnsafe
   +- Exchange rangepartitioning(cnt#59L ASC,200), None
      +- ConvertToSafe
         +- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Final,isDistinct=false)], output=[dpHour#38,cnt#59L])
            +- TungstenExchange hashpartitioning(dpHour#38,200), None
               +- TungstenAggregate(key=[dpHour#38], functions=[(count(1),mode=Partial,isDistinct=false)], output=[dpHour#38,count#63L])
                  +- Scan ParquetRelation[dpHour#38] InputPaths: hdfs://hdfsNode:8020/batchPoC/saleOrder

For this query, I got two Jobs: Job 9 and Job 10 enter image description here

For Job 9, the DAG is:

enter image description here

For Job 10, the DAG is:

enter image description here

Observations

  1. Apparently, there are two jobs for one query.
  2. Stage-16 (marked as Stage-14 in Job 9) is skipped in Job 10.
  3. Stage-15's last RDD[48], is same as Stage-17's last RDD[49]. How? I saw in the logs that after Stage-15 execution, the RDD[48] is registered as RDD[49]
  4. Stage-17 is shown in the driver-logs but never got executed at Executors. On driver-logs the task-execution is shown, but when I looked at Yarn container's logs, there was no evidence of receiving any task from Stage-17.

Logs supporting these observations (only driver-logs, I lost executor logs due to later crash). It is seen that before Stage-17 starts, RDD[49] is registered:

16/06/10 22:11:22 INFO TaskSetManager: Finished task 196.0 in stage 15.0 (TID 1121) in 21 ms on slave-1 (199/200)
16/06/10 22:11:22 INFO TaskSetManager: Finished task 198.0 in stage 15.0 (TID 1123) in 20 ms on slave-1 (200/200)
16/06/10 22:11:22 INFO YarnScheduler: Removed TaskSet 15.0, whose tasks have all completed, from pool 
16/06/10 22:11:22 INFO DAGScheduler: ResultStage 15 (parquet at <console>:26) finished in 0.505 s
16/06/10 22:11:22 INFO DAGScheduler: Job 9 finished: parquet at <console>:26, took 5.054011 s
16/06/10 22:11:22 INFO ParquetRelation: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/06/10 22:11:22 INFO DefaultWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
16/06/10 22:11:22 INFO FileOutputCommitter: File Output Committer Algorithm version is 1
16/06/10 22:11:22 INFO SparkContext: Starting job: parquet at <console>:26
16/06/10 22:11:22 INFO DAGScheduler: Registering RDD 49 (parquet at <console>:26)
16/06/10 22:11:22 INFO DAGScheduler: Got job 10 (parquet at <console>:26) with 25 output partitions
16/06/10 22:11:22 INFO DAGScheduler: Final stage: ResultStage 18 (parquet at <console>:26)
16/06/10 22:11:22 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 17)
16/06/10 22:11:22 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 17)
16/06/10 22:11:22 INFO DAGScheduler: Submitting ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26), which has no missing parents
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25 stored as values in memory (estimated size 17.4 KB, free 512.3 KB)
16/06/10 22:11:22 INFO MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 8.9 KB, free 521.2 KB)
16/06/10 22:11:22 INFO BlockManagerInfo: Added broadcast_25_piece0 in memory on 172.16.20.57:44944 (size: 8.9 KB, free: 517.3 MB)
16/06/10 22:11:22 INFO SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006
16/06/10 22:11:22 INFO DAGScheduler: Submitting 200 missing tasks from ShuffleMapStage 17 (MapPartitionsRDD[49] at parquet at <console>:26)
16/06/10 22:11:22 INFO YarnScheduler: Adding task set 17.0 with 200 tasks
16/06/10 22:11:23 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 1125, slave-1, partition 0,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 1.0 in stage 17.0 (TID 1126, slave-2, partition 1,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 2.0 in stage 17.0 (TID 1127, slave-1, partition 2,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 3.0 in stage 17.0 (TID 1128, slave-2, partition 3,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 4.0 in stage 17.0 (TID 1129, slave-1, partition 4,NODE_LOCAL, 1988 bytes)
16/06/10 22:11:23 INFO TaskSetManager: Starting task 5.0 in stage 17.0 (TID 1130, slave-2, partition 5,NODE_LOCAL, 1988 bytes)

Questions

  1. Why two Jobs? What is the intention here by breaking a DAG into two jobs?
  2. Job 10's DAG looks complete for the query execution. Is there anything specific Job 9 is doing?
  3. Why Stage-17 is not Skipped? It looks like dummy tasks are created, do they have any purpose.
  4. Later, I tried another rather simpler query. Unexpectedly, it was creating 3 Jobs.

    sqlContext.sql("select dpHour from so order by dphour").write.parquet("/out2/")

Vauban answered 10/6, 2016 at 19:37 Comment(1)
My observation is that the rdd stuff was easier to follow and that most documentation is based on that. The DF stuff is indeed harder to relate to the initial discussion of Job, App, stuff etc.Slake
M
15

When you are using the high-level dataframe/dataset APIs, you leave it up to Spark to determine the execution plan, including the job/stage chunking. These depend on many factors such as execution parallelism, cached/persisted data structures, etc. In future versions of Spark, as the optimizer sophistication increases, you may see even more jobs per query as, for example, some data sources are sampled to parameterize cost-based execution optimization.

For example, I have frequently, but not always, seen writing generate separate jobs from processing that involves shuffles.

Bottom line, if you are using the high-level APIs, unless you have to do extremely detailed optimization with huge data volumes, it rarely pays to dig into the specific chunking. Job startup costs are extremely low compared to processing/output.

If, on the other hand, you are curious about the Spark internals, read the optimizer code and engage on the Spark developer mailing list.

Multipartite answered 11/6, 2016 at 18:35 Comment(6)
This is curious, why the second job stages cant be in the first job?Horodko
Good question. It may have to do with intermediate result generation. The important question is: why does it matter how a DAG is mapped to stages & jobs?Multipartite
Yeah, it's hard to really understand how Spark is doing this, a mix of available resources, data ....Horodko
My observation is that the rdd stuff was easier to follow and that most documentation is based on that. The DF stuff is indeed harder to relate to the initial discussion of Job, App, stuff etcSlake
@Slake RDD plans are easier to follow because there is no optimization: whatever you write is what Spark does. The downside is obvious: no optimization, no advanced SQL(-like) operations, greater serialization/deserialization overhead, etc. That's why, in most cases, Spark executes faster using datasets than RDDs, even if datasets use RDDs under the covers.Multipartite
sure, but the blurb is all about job, app etc. but it isindeed a little diff with dfs. thxSlake

© 2022 - 2024 — McMap. All rights reserved.