Why is Akka Streams swallowing my exceptions?
Asked Answered
D

3

19

Why is the exception in

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source

object TestExceptionHandling {
  def main(args: Array[String]): Unit = {
    implicit val actorSystem = ActorSystem()
    implicit val materializer = ActorMaterializer()(defaultActorSystem)

    Source(List(1, 2, 3)).map { i =>
      if (i == 2) {
        throw new RuntimeException("Please, don't swallow me!")
      } else {
        i
      }
    }.runForeach { i =>
      println(s"Received $i")
    }
  }
}

silently ignored? I can see that the stream gets stopped after printing Received 1, but nothing is logged. Note that the problem is not the logging configuration in general, as I see a lot of output if I set akka.log-config-on-start = on in my application.conf file.

Davila answered 25/2, 2016 at 15:38 Comment(2)
You're throwing the exception away since you ignore the return value of runForeach.Long
@ViktorKlang thanks for pointing that out, I've just updated my answer!Davila
D
15

I'm now using a custom Supervision.Decider that makes sure exceptions are properly logged, that can be set up like this:

val decider: Supervision.Decider = { e =>
  logger.error("Unhandled exception in stream", e)
  Supervision.Stop
}

implicit val actorSystem = ActorSystem()
val materializerSettings = ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider)
implicit val materializer = ActorMaterializer(materializerSettings)(actorSystem)

Also, as has been pointed out by Vikor Klang, in the example given above, the exception could also be "caught" via

Source(List(1, 2, 3)).map { i =>
  if (i == 2) {
    throw new RuntimeException("Please, don't swallow me!")
  } else {
    i
  }
}.runForeach { i =>
  println(s"Received $i")
}.onComplete {
  case Success(_) =>
    println("Done")
  case Failure(e) =>
    println(s"Failed with $e")
}

Note however, that this approach won't help you with

Source(List(1, 2, 3)).map { i =>
  if (i == 2) {
    throw new RuntimeException("Please, don't swallow me!")
  } else {
    i
  }
}.to(Sink.foreach { i =>
  println(s"Received $i")
}).run()

since run() returns Unit.

Davila answered 25/2, 2016 at 15:54 Comment(1)
run() only returns Unit because it keeps the materialied value of the "left" side (Keep.left) by default. if you had used: toMat(Sink.foreach(…))(Keep.right) then it would work again.Long
B
4

I had similar questions when I started using akk-streams. Supervision.Decider helps but not always.

Unfortunately it doesn't catch exceptions thrown in ActionPublisher. I see it handled, ActorPublisher.onError is called but it doesn't reach Supervision.Decider. It works with simple Stream provided in documentation.

Errors also don't reach actor if I use Sink.actorRef.

And for the sake of experiment I tried following sample

val stream = Source(0 to 5).map(100 / _)
stream.runWith(Sink.actorSubscriber(props))

In this case exception was caught by Decider but never reached actor subscriber.

Overall I think it's inconsistent behavior. I cannot use one mechanism for handling errors in Stream.

My original SO question: Custom Supervision.Decider doesn't catch exception produced by ActorPublisher

And here is akka issue where it's tracked: https://github.com/akka/akka/issues/18359

Binocular answered 25/2, 2016 at 21:20 Comment(0)
C
0

I had a different issue with Akka Streams swallowing my exceptions. I'll post it here since this is the top Google result.

In a case like this, where source is a Source[ByteString, Any]:

source.runWith(StreamConverters.fromOutputStream(() => outputStream))

This returns a Future[IOResult]. If the write to the output stream fails (for example, the source fails), then the Future will still return a Success. In this case, you actually have to check the IOResult for the error:

source.runWith(StreamConverters.fromOutputStream(() => output)).
      map(ior => {
        if (!ior.wasSuccessful) 
          throw new RuntimeException(ior.getError)
      })

The result of this will be a failed Future with the correct exception.

Chaing answered 8/5, 2020 at 17:31 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.