Why is "Error communicating with MapOutputTracker" reported when Spark tries to send GetMapOutputStatuses?
Asked Answered
D

3

16

I'm using Spark 1.3 to do an aggregation on a lot of data. The job consists of 4 steps:

  1. Read a big (1TB) sequence file (corresponding to 1 day of data)
  2. Filter out most of it and get about 1GB of shuffle write
  3. keyBy customer
  4. aggregateByKey() to a custom structure that build a profile for that customer, corresponding to a HashMap[Long, Float] per customer. The Long keys are unique and never bigger than 50K distinct entries.

I'm running this with this configuration:

--name geo-extract-$1-askTimeout \
--executor-cores 8 \
--num-executors 100 \
--executor-memory 40g \
--driver-memory 4g \
--driver-cores 8 \
--conf 'spark.storage.memoryFraction=0.25' \
--conf 'spark.shuffle.memoryFraction=0.35' \
--conf 'spark.kryoserializer.buffer.max.mb=1024' \
--conf 'spark.akka.frameSize=1024' \
--conf 'spark.akka.timeout=200' \
--conf 'spark.akka.askTimeout=111' \
--master yarn-cluster \

And getting this error:

    org.apache.spark.SparkException: Error communicating with MapOutputTracker
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:117)
        at org.apache.spark.MapOutputTracker.getServerStatuses(MapOutputTracker.scala:164)
        at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.fetch(BlockStoreShuffleFetcher.scala:42)
        ...
    Caused by: org.apache.spark.SparkException: Error sending message [message = GetMapOutputStatuses(0)]
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:209)
        at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113)
        ... 21 more
    Caused by: java.util.concurrent.TimeoutException: Futures timed out after [30 seconds]
        at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
        at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
        at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
        at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
        at scala.concurrent.Await$.result(package.scala:107)
        at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:195)

The job and the logic have been shown to work with a small test set and I can even run this job for some dates but not for others. I've googled around and found hints that "Error communicating with MapOutputTracker" is related to internal Spark messages, but I already increased "spark.akka.frameSize", "spark.akka.timeout" and "spark.akka.askTimeout" (this last one does not even appear on Spark documentation, but was mentioned in the Spark mailing list), to no avail. There is still some timeout going on at 30 seconds that I have no clue how to identify or fix.

I see no reason for this to fail due to data size, as the filtering operation and the fact that aggregateByKey performs local partial aggregations should be enough to address the data size. The number of tasks is 16K (automatic from the original input), much more than the 800 cores that are running this, on 100 executors, so it is not as simple as the usual "increment partitions" tip. Any clues would be greatly appreciated! Thanks!

Dorindadorine answered 9/9, 2015 at 18:51 Comment(9)
'16/01/13 13:19:30 WARN util.AkkaUtils: Error sending message [message = GetMapOutputStatuses(214)] in 1 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) at scala.concurrent.Await$.result(package.scala:107)'Milch
I am not facing this issue all the time. I am using spark with amazon kinesis service. i am facing this issue for the long run say 20hrs run. Any help as how to debug it further would be great help. ThanksMilch
Hi Sam. I still don't have a clear answer for this, but play with actually decreasing the number of partitions. A smaller number of tasks apparently requires less sync memory and sometimes works.Dorindadorine
Hi Daniel, I strongly believe this needs to taken care from the framework job orchestration as launching job and getting it done or speculative job execution like mapreduce to make sure we get output of specific tasks. If map output is lost and shuffle hangs due to missing block and gets timeout it leads me thinking in the direction that there is bug in the memory management of spark with my current understanding of spark. Once we have allocate resources to spark and it setup the job execution then discovering such less sync memory either needs to spill to disk (making job slow) but not thisMilch
Not sure but have you tried increasing this timeouts you mentioned to some high values as mentioned at this line :mail-archives.apache.org/mod_mbox/spark-user/201505.mbox/…Finespun
I don't think I did, and I share your perceptions, but as you can tell, I did not get much of an answer myself ;-)Dorindadorine
Just a long shot, but are you sure, that there is no error prior this one? Plus sometimes data is not uniformly distributed, and this can also cause some problems. Check logs on executors if there is nothing suspicious going on there.Untwist
Err, I don't have the capability to test again. IIRC yes, there was no other error, and code worked on independent unit tests and some other datasets.Dorindadorine
I have recently upgraded spark to 1.5.2 and now not able to recreate it again.Milch
J
6

I had a similar issue, that my job would work fine with a smaller dataset, but will fail with larger ones.

After a lot of configuration changes, I found that the changing the driver memory settings has much more of an impact than changing the executor memory settings. Also using the new garbage collector helps a lot. I am using the following configuration for a cluster of 3, with 40 cores each. Hope the following config helps:

spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -  
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g 
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions

spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:NewRatio=3 -
XX:InitiatingHeapOccupancyPercent=35 -XX:+PrintGCDetails -XX:MaxPermSize=4g   
-XX:PermSize=1G -XX:+PrintGCTimeStamps -XX:+UnlockDiagnosticVMOptions


spark.driver.memory=8g
spark.driver.cores=10
spark.driver.maxResultSize=8g

spark.executor.memory=16g
spark.executor.cores=25

spark.default.parallelism=50
spark.eventLog.dir=hdfs://mars02-db01/opt/spark/logs
spark.eventLog.enabled=true

spark.kryoserializer.buffer=512m
spark.kryoserializer.buffer.max=1536m

spark.rdd.compress=true
spark.storage.memoryFraction=0.15
spark.storage.MemoryStore=12g
Joslin answered 20/1, 2016 at 19:4 Comment(1)
if it does not work, make the spark.executor.cores=2 (a low number), by doing that you will have more containers running the processJoslin
N
2

What's going on in the driver at the time of this failure? It could be due to memory pressure on the driver causing it to be unresponsive. If I recall correctly, the MapOutputTracker that it's trying to get to when it calls GetMapOutputStatuses is running in the Spark driver driver process.

If you're facing long GCs or other pauses for some reason in that process this would cause the exceptions you're seeing above.

Some things to try would be to try jstacking the driver process when you start seeing these errors and see what happens. If jstack doesn't respond, it could be that your driver isn't sufficiently responsive.

16K tasks does sound like it would be a lot for the driver to keep track of, any chance you can increase the driver memory past 4g?

Normally answered 20/1, 2016 at 16:4 Comment(0)
A
0

Try the following property

spark.shuffle.reduceLocality.enabled = false.

Refer to this link. https://issues.apache.org/jira/browse/SPARK-13631

Antineutron answered 16/11, 2016 at 14:25 Comment(1)
Why would that help? Could you elaborate?Tc

© 2022 - 2024 — McMap. All rights reserved.