How to fix "MetadataFetchFailedException: Missing an output location for shuffle"?
Asked Answered
B

2

10

If I increase the model size of my word2vec model I start to get this kind of exception in my log:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 6
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:542)
    at org.apache.spark.MapOutputTracker$$anonfun$org$apache$spark$MapOutputTracker$$convertMapStatuses$2.apply(MapOutputTracker.scala:538)
    at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
    at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
    at org.apache.spark.MapOutputTracker$.org$apache$spark$MapOutputTracker$$convertMapStatuses(MapOutputTracker.scala:538)
    at org.apache.spark.MapOutputTracker.getMapSizesByExecutorId(MapOutputTracker.scala:155)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:47)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1$$anonfun$12.apply(RDD.scala:927)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

I tried to write my own "save model" version which looks like this:

  def save(model: Word2VecModel, sc: SparkContext, path: String): Unit = {

    println("Saving model as CSV ..")

    val vectorSize = model.getVectors.values.head.size

    println("vectorSize="+vectorSize)

    val SEPARATOR_TOKEN = " "    
    val dataArray = model.getVectors.toSeq.map { case (w, v) => Data(w, v) }

    println("Got dataArray ..")
    println("parallelize(dataArray, 10)")
    val par = sc.parallelize(dataArray, 10)
          .map(d => {

            val sb = new mutable.StringBuilder()
            sb.append(d.word)
            sb.append(SEPARATOR_TOKEN)

            for(v <- d.vector) {
              sb.append(v)
              sb.append(SEPARATOR_TOKEN)
            }
            sb.setLength(sb.length - 1)
            sb.append("\n")
            sb.toString()
          })
    println("repartition(1)")
    val rep = par.repartition(1)
    println("collect()")
    val vectorsAsString = rep.collect()

    println("Collected serialized vectors ..")    

    val cfile = new mutable.StringBuilder()

    cfile.append(vectorsAsString.length)
    cfile.append(" ")
    cfile.append(vectorSize)
    cfile.append("\n")

    val sb = new StringBuilder
    sb.append("word,")
    for(i <- 0 until vectorSize) {
      sb.append("v")
      sb.append(i.toString)
      sb.append(",")
    }
    sb.setLength(sb.length - 1)
    sb.append("\n")

    for(vectorString <- vectorsAsString) {
      sb.append(vectorString)
      cfile.append(vectorString)
    }

    println("Saving file to " + new Path(path, "data").toUri.toString)
    sc.parallelize(sb.toString().split("\n"), 1).saveAsTextFile(new Path(path+".csv", "data").toUri.toString)
    sc.parallelize(cfile.toString().split("\n"), 1).saveAsTextFile(new Path(path+".cs", "data").toUri.toString)
  }

Apparently it's working similar to their current implementation - it doesn't.

I'd like to get a word2vec model. It works with small files but not if the model gets larger.

Barron answered 23/4, 2016 at 19:38 Comment(1)
That error usually means that an executor died (mostly because of OOM). You should look at executor logs to verify if that is the case. If so, you need to increase memory (and/or overhead).Forswear
W
19

MetadataFetchFailedException is thrown when a MapOutputTracker on an executor could not find requested shuffle map outputs for partitions in local cache and tried to fetch them remotely from the driver's MapOutputTracker.

That could lead to few conclusions:

  1. The driver's memory issues
  2. The executors' memory issues
  3. Executors being lost

Please review the logs looking for issues reported as "Executor lost" INFO messages and/or review web UI's Executors page and see how the executors work.

The root cause of executors being lost may also be that the cluster manager has decided to kill ill-behaved executors (that may have used up more memory than requested).

See the other question FetchFailedException or MetadataFetchFailedException when processing big data set for more insights.

Wallop answered 23/12, 2016 at 18:5 Comment(2)
repartition(10) worked for meMatabele
@SushilVerma Can you show the query you used. Did repartition(10) help before or after groupBy? How do you submit your Spark app?Wallop
S
1

In short, configs might help:

--conf spark.blacklist.enabled=true # blacklist bad machine
--conf spark.reducer.maxReqsInFlight=10 # limit concurrent requests from reducer 
--conf spark.shuffle.io.retryWait=10s # increase retry wait
--conf spark.shuffle.io.maxRetries=10 # increase retry times
--conf spark.shuffle.io.backLog=4096 # increase tcp connection wait queue length

Long explainations as below.

For MetadataFetchFailedException, it usually happens when one executor suddenly being killed or terminated, but this executor has some shuffle output, then when another executor try to fetch metadata of this shuffle output, exception happens.

  1. In most cases, this is caused by container killed by Yarn for exceeding memory limits. So you need to double confirm this in the logs.

  2. The most common fix is to increase memoryOverhead, the default value is 0.1 * executor memory. This is too small for most cases. I would suggest to make it 0.2 * executor memory. If you have large amount of executors or run another sub-process, you need bigger value for this.

Spiracle answered 8/2, 2022 at 0:49 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.