What are possible reasons for receiving TimeoutException: Futures timed out after [n seconds] when working with Spark [duplicate]
Asked Answered
R

4

22

I'm working on a Spark SQL program and I'm receiving the following exception:

16/11/07 15:58:25 ERROR yarn.ApplicationMaster: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:190)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:190)
    at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:107)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.Project.doExecute(basicOperators.scala:46)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
    at org.apache.spark.sql.execution.Union$$anonfun$doExecute$1.apply(basicOperators.scala:144)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.execution.Union.doExecute(basicOperators.scala:144)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.buildBuffers(InMemoryColumnarTableScan.scala:129)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation.<init>(InMemoryColumnarTableScan.scala:118)
    at org.apache.spark.sql.execution.columnar.InMemoryRelation$.apply(InMemoryColumnarTableScan.scala:41)
    at org.apache.spark.sql.execution.CacheManager$$anonfun$cacheQuery$1.apply(CacheManager.scala:93)
    at org.apache.spark.sql.execution.CacheManager.writeLock(CacheManager.scala:60)
    at org.apache.spark.sql.execution.CacheManager.cacheQuery(CacheManager.scala:84)
    at org.apache.spark.sql.DataFrame.persist(DataFrame.scala:1581)
    at org.apache.spark.sql.DataFrame.cache(DataFrame.scala:1590)
    at com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56)
    at com.somecompany.ml.modeling.NewModel.generateArtifacts(FlowForNewModel.scala:32)
    at com.somecompany.ml.modeling.Flow$class.run(Flow.scala:52)
    at com.somecompany.ml.modeling.lowForNewModel.run(FlowForNewModel.scala:15)
    at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)
    at com.somecompany.ml.Main$$anonfun$2.apply(Main.scala:54)
    at scala.Option.getOrElse(Option.scala:121)
    at com.somecompany.ml.Main$.main(Main.scala:46)
    at com.somecompany.ml.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:542)
16/11/07 15:58:25 INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 15, (reason: User class threw exception: java.util.concurrent.TimeoutException: Futures timed out after [3000 seconds])

The last part of my code that I recognize from the stack trace is com.somecompany.ml.modeling.NewModel.getTrainingSet(FlowForNewModel.scala:56) which gets me to this line: profilesDF.cache() Before the caching I perform a union between 2 dataframes. I've seen an answer about persisting both the dataframes before the join here I still need to cache the unioned dataframe since I'm using it in several of my transformations

And I was wondering what may cause this exception to be thrown? Searching for it got me to a link dealing with rpc timeout exception or some security issues which is not my problem If you also have any idea on how to solve it I'd obviously appreciate it but even just understanding the problem will help me solve it

Thanks in advance

Ronnironnica answered 7/11, 2016 at 20:31 Comment(0)
Q
18

Question : I was wondering what may cause this exception to be thrown?

Answer :

spark.sql.broadcastTimeout 300 Timeout in seconds for the broadcast wait time in broadcast joins

spark.network.timeout 120s Default timeout for all network interactions.. spark.network.timeout (spark.rpc.askTimeout), spark.sql.broadcastTimeout, spark.kryoserializer.buffer.max(if you are using kryo serialization), etc. are tuned with larger-than-default values in order to handle complex queries. You can start with these values and adjust accordingly to your SQL workloads.

Note : Doc says that

The following options(see spark.sql. properties) can also be used to tune the performance of query execution. It is possible that these options will be deprecated in future release as more optimizations are performed automatically.*

Also,for your better understanding you can see BroadCastHashJoin where execute method is trigger point for the above stack trace.

protected override def doExecute(): RDD[Row] = {
    val broadcastRelation = Await.result(broadcastFuture, timeout)

    streamedPlan.execute().mapPartitions { streamedIter =>
      hashJoin(streamedIter, broadcastRelation.value)
    }
  }
Queridas answered 21/11, 2016 at 14:28 Comment(0)
A
3

Good to know that the suggestion from Ram works in some cases. I'd like to mention that I stumbled on this exception a couple of times (including the one described here).

Much of the time, it was due to almost silent OOMs on some executor. Check on SparkUI for failed tasks, last column of this table: task panel for a stage in SparkUI You may notice OOM messages.

If understand well spark internals, the broadcasted data passes through the driver. So the driver has some thread mechanism to collect the data from executors, and send it back to all. If at some point an executor fails, you may end up with these timeouts.

Abscond answered 25/1, 2018 at 18:19 Comment(0)
O
2

I had set master as local[n] when I submitted the job to Yarn-cluster.

Do not set master in code when running on the cluster, instead use --master.

Omphale answered 21/9, 2018 at 13:36 Comment(1)
Can you explain why is that a problem?Bred
H
0

If you enabled dynamicAllocation, try to disable this configuration (spark.dynamicAllocation.enabled=false). You are able to set this spark configuration under conf/spark-defaults.conf, as --conf or within the code.

See also:

https://issues.apache.org/jira/browse/SPARK-22618

https://issues.apache.org/jira/browse/SPARK-23806

Heilbronn answered 31/3, 2018 at 12:54 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.