Spark Streaming - Stopped worker throws FileNotFoundException
Asked Answered
S

0

7

I am running a spark streaming application on a cluster composed by three nodes, each one with a worker and three executors (so a total of 9 executors). I am using the spark standalone mode (version 2.1.1).

The application is run with a spark-submit command with option --deploy-mode client and --conf spark.streaming.stopGracefullyOnShutdown=true. The submit command is run from one of the nodes, let's call it node 1.

As a fault tolerance test I am stopping the worker on node 2 by calling the script stop-slave.sh.

In executor logs on node 2 I can see several errors related to a FileNotFoundException during a shuffle operation:

ERROR Executor: Exception in task 5.0 in stage 5531241.0 (TID 62488319)
java.io.FileNotFoundException: /opt/spark/spark-31c5b4b0-56e1-45d2-88dc-772b8712833f/executor-0bad0669-57fe-43f9-a77e-1b69cd284523/blockmgr-2aa295ac-78ca-4df6-ab89-51d422e8860e/1c/shuffle_2074211_5_0.index.ecb8e397-c3a3-4c1a-96ba-e153ed92b05c (No such file or directory)
    at java.io.FileOutputStream.open(Native Method)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:206)
    at java.io.FileOutputStream.<init>(FileOutputStream.java:156)
    at org.apache.spark.shuffle.IndexShuffleBlockResolver.writeIndexFileAndCommit(IndexShuffleBlockResolver.scala:144)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

I can see 4 errors of this kind on the same task in each of the 3 executors on node 2.

In driver logs I can see:

ERROR TaskSetManager: Task 5 in stage 5531241.0 failed 4 times; aborting job
 ...
ERROR JobScheduler: Error running job streaming job 1503995015000 ms.1
org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 5531241.0 failed 4 times, most recent failure: Lost task 5.3 in stage 5531241.0 (TID 62488335, 10.7.94.68, executor 2): java.io.FileNotFoundException: /opt/spark/spark-31c5b4b0-56e1-45d2-88dc-772b8712833f/executor-0bad0669-57fe-43f9-a77e-1b69cd284523/blockmgr-2aa295ac-78ca-4df6-ab89-51d422e8860e/1c/shuffle_2074211_5_0.index.9e6148da-6ce2-4de5-94ab-d95db2c8f9f7 (No such file or directory)

This is taking down the application, as expected: the executor reached the spark.task.maxFailures on a single task and the application is then stopped.

I ran different tests and all of them but one ended with the app stopped. My idea is that the behaviour can vary depending on the precise step in the stream process I ask the worker to stop. In any case, all other tests failed with the same error described above.

Increasing the parameter spark.task.maxFailures to 8 did not help either, with the TaskSetManager signalling task failed 8 times instead of 4.

What if the worker is killed?

I also ran a different test: I killed the worker and 3 executors processes on node 2 with the command kill -9. And in this case, the streaming app adapted to the remaining resources and kept working.

In driver log we can see the driver noticing the missing executors:

ERROR TaskSchedulerImpl: Lost executor 0 on 10.7.94.68: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.

Then, we notice the a long long serie of the following errors:

17/08/29 14:43:19 ERROR ReceiverTracker: Deregistered receiver for stream 5: Error starting receiver 5 - org.jboss.netty.channel.ChannelException: Failed to bind to: /X.X.X.X:40001
    at org.jboss.netty.bootstrap.ServerBootstrap.bind(ServerBootstrap.java:272)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:106)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:119)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:74)
    at org.apache.avro.ipc.NettyServer.<init>(NettyServer.java:68)
    at org.apache.spark.streaming.flume.FlumeReceiver.initServer(FlumeInputDStream.scala:162)
    at org.apache.spark.streaming.flume.FlumeReceiver.onStart(FlumeInputDStream.scala:169)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149)
    at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:607)
    at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:597)
    at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2028)
    at org.apache.spark.SparkContext$$anonfun$33.apply(SparkContext.scala:2028)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:99)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.BindException: Cannot assign requested address
    at sun.nio.ch.Net.bind0(Native Method)
    at sun.nio.ch.Net.bind(Net.java:414)
    at sun.nio.ch.Net.bind(Net.java:406)
    at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214)
    at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
    at org.jboss.netty.channel.socket.nio.NioServerBoss$RegisterTask.run(NioServerBoss.java:193)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.processTaskQueue(AbstractNioSelector.java:372)
    at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:296)
    at org.jboss.netty.channel.socket.nio.NioServerBoss.run(NioServerBoss.java:42)
    ... 3 more

This errors appears in the log until the killed worker is started again.

Conclusion

Stopping a worker with the dedicated command has a unexpected behaviour: the app should be able to cope with the missed worked, adapting to the remaining resources and keep working (as it does in the case of kill).

What are your observations on this issue?

Thank you, Davide

Sternlight answered 9/9, 2017 at 11:56 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.