Spark job restarted after showing all jobs completed and then fails (TimeoutException: Futures timed out after [300 seconds])
Asked Answered
P

1

7

I'm running a spark job. It shows that all of the jobs were completed: enter image description here

however after couple of minutes the entire job restarts, this time it will show all jobs and tasks were completed too, but after couple of minutes it will fail. I found this exception in the logs:

java.util.concurrent.TimeoutException: Futures timed out after [300 seconds]

So this happens when I'm trying to join 2 pretty big tables: one of 3B rows, and the second is 200M rows, when I run show(100) on the resulting dataframe, everything gets evaluated and I'm getting this issue.

I tried playing around with increasing/decreasing the number of partitions, I changed the garbage collector to G1 with increased number of threads. I changed spark.sql.broadcastTimeout to 600 (which made the time out message to change to 600 seconds).

I also read that this might be a communication issue, however other show() clauses that run prior this code segment work without problems, so it's probably not it.

This is the submit command:

/opt/spark/spark-1.4.1-bin-hadoop2.3/bin/spark-submit  --master yarn-cluster --class className --executor-memory 12g --executor-cores 2 --driver-memory 32g --driver-cores 8 --num-executors 40 --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:ConcGCThreads=20" /home/asdf/fileName-assembly-1.0.jar

you can get the idea about spark versions, and the resources used from there.

Where do I go from here? Any help will be appreciated, and code segments/additional logging will be provided if needed.

Paranoid answered 29/3, 2016 at 16:56 Comment(1)
related?: issues.apache.org/jira/browse/SPARK-46516Surly
P
5

What solved this eventually was persisting both data frames before join.

I looked at the execution plan before and after persisting the data frames, and the strange thing was that before persisting spark tried to perform a BroadcastHashJoin, which clearly failed due to large size of the data frame, and after persisting the execution plan showed that the join will be ShuffleHashJoin, that completed without any issues whatsoever. A bug? Maybe, I'll try with a newer spark version when I'll get to it.

Paranoid answered 5/4, 2016 at 11:23 Comment(2)
if its larger size then what was the size? you can increase spark.sql.autoBroadcastJoinThreshold (maybe <2gig) can see the effect. means, your broadcased piece of data( small dataframe) is fitting in to above propery value then it still does broadcast join.. also have see my answer to understand betterHalloran
how you did it ? can you share code ? I am prety . new to spark and I have same problem when i am doing val df_before_visits =sparkSession.sqlContext.sql("SELECT fieldsFROM two tables join each other")Illustrious

© 2022 - 2024 — McMap. All rights reserved.