Resolving Akka futures from ask in the event of a failure
Asked Answered
L

1

10

I am calling an Actor using the ask pattern within a Spray application, and returning the result as the HTTP response. I map failures from the actor to a custom error code.

val authActor = context.actorOf(Props[AuthenticationActor])

callService((authActor ? TokenAuthenticationRequest(token)).mapTo[LoggedInUser]) { user =>
  complete(StatusCodes.OK, user)
}

def callService[T](f: => Future[T])(cb: T => RequestContext => Unit) = {
 onComplete(f) {
  case Success(value: T) => cb(value)
  case Failure(ex: ServiceException) => complete(ex.statusCode, ex.errorMessage)
  case e => complete(StatusCodes.InternalServerError, "Unable to complete the request. Please try again later.")
  //In reality this returns a custom error object.
 }
}

This works correctly when the authActor sends a failure, but if the authActor throws an exception, nothing happens until the ask timeout completes. For example:

override def receive: Receive = {
  case _ => throw new ServiceException(ErrorCodes.AuthenticationFailed, "No valid session was found for that token")
}

I know that the Akka docs say that

To complete the future with an exception you need send a Failure message to the sender. This is not done automatically when an actor throws an exception while processing a message.

But given that I use asks for a lot of the interface between the Spray routing actors and the service actors, I would rather not wrap the receive part of every child actor with a try/catch. Is there a better way to achieve automatic handling of exceptions in child actors, and immediately resolve the future in the event of an exception?

Edit: this is my current solution. However, it's quite messy to do this for every child actor.

override def receive: Receive = {
case default =>
  try {
    default match {
      case _ => throw new ServiceException("")//Actual code would go here
    }
  }
  catch {
    case se: ServiceException =>
      logger.error("Service error raised:", se)
      sender ! Failure(se)
    case ex: Exception =>
      sender ! Failure(ex)
      throw ex
  }
}

That way if it's an expected error (i.e. ServiceException), it's handled by creating a failure. If it's unexpected, it returns a failure immediately so the future is resolved, but then throws the exception so it can still be handled by the SupervisorStrategy.

Latricialatrina answered 22/4, 2015 at 10:18 Comment(8)
Well... send a failure message before throwing the exception.Bipolar
That's what I didn't want to do - I said I would rather not wrap the receive part of every child actor with a try/catch. This is a toy example, it's perfectly possible that I don't control where the exception is thrown.Latricialatrina
Well... you know... one of the fundamental roadways of resilent distributed systems is to "make errors explicit". Think of various errors that can happen... make them explicit. If you can have "TypeSafe" errors... even better.Bipolar
There are always going to be potential uncaught exceptions in library code. You can't make them explicit using a failure unless you either know what the exception is, or do a blanket catch of Exception, which is terrible. Of course I can explicitly fail on expected errors but this doesn't help me much.Latricialatrina
Well... writing resilient systems is all about being prepared for exceptions. Yes there can always be some "unexpected" exceptions. But you know... Since you "have" to decide the behaviour of system in case of these exceptions.... you "need" to think and plan carefully for them. This is the only way to reliably predict the behaviour of system. Any "unexpected" exception is a bug and hence an opportunity to improve the reliability of your system.Bipolar
So I can answer accordingly, are you looking for code to handle unexpected exceptions, like those thrown by other libs or things not explicitly checked or are you looking for how to gracefully handle explicitly checked conditions?Sidneysidoma
@Sidneysidoma mostly unchecked from library code. For example, the persistence layer using Squeryl.Latricialatrina
@SarveshKumarSingh even these exceptions happen, though. Yes, long-term it's an opportunity to improve the system. But: 1. Explicitly writing catch blocks in EVERY service actor and mapping them to failures is incredibly boilerplatey 2. When they do happen and they havent been checked yet (they shouldn't, but of course they do), the expected behaviour should be to fail fast. Long-term they should be handled but before we have discovered the bug it should immediately fail and resolve the future and not wait for the ask to time out.Latricialatrina
S
17

If you want a way to provide automatic sending of a response back to the sender in case of an unexpected exception, then something like this could work for you:

trait FailurePropatingActor extends Actor{
  override def preRestart(reason:Throwable, message:Option[Any]){
    super.preRestart(reason, message)
    sender() ! Status.Failure(reason)
  }
}

We override preRestart and propagate the failure back to the sender as a Status.Failure which will cause an upstream Future to be failed. Also, it's important to call super.preRestart here as that's where child stopping happens. Using this in an actor looks something like this:

case class GetElement(list:List[Int], index:Int)
class MySimpleActor extends FailurePropatingActor {  
  def receive = {
    case GetElement(list, i) =>
      val result = list(i)
      sender() ! result
  }  
}

If I was to call an instance of this actor like so:

import akka.pattern.ask
import concurrent.duration._

val system = ActorSystem("test")
import system.dispatcher
implicit val timeout = Timeout(2 seconds)
val ref = system.actorOf(Props[MySimpleActor])
val fut = ref ? GetElement(List(1,2,3), 6)

fut onComplete{
  case util.Success(result) => 
    println(s"success: $result")

  case util.Failure(ex) => 
    println(s"FAIL: ${ex.getMessage}")
    ex.printStackTrace()    
}     

Then it would properly hit my Failure block. Now, the code in that base trait works well when Futures are not involved in the actor that is extending that trait, like the simple actor here. But if you use Futures then you need to be careful as exceptions that happen in the Future don't cause restarts in the actor and also, in preRestart, the call to sender() will not return the correct ref because the actor has already moved into the next message. An actor like this shows that issue:

class MyBadFutureUsingActor extends FailurePropatingActor{
  import context.dispatcher

  def receive = {
    case GetElement(list, i) => 
      val orig = sender()
      val fut = Future{
        val result = list(i)
        orig ! result
      }      
  } 
}

If we were to use this actor in the previous test code, we would always get a timeout in the failure situation. To mitigate that, you need to pipe the results of futures back to the sender like so:

class MyGoodFutureUsingActor extends FailurePropatingActor{
  import context.dispatcher
  import akka.pattern.pipe

  def receive = {
    case GetElement(list, i) => 
      val fut = Future{
        list(i)
      }

      fut pipeTo sender()
  } 
}

In this particular case, the actor itself is not restarted because it did not encounter an uncaught exception. Now, if your actor needed to do some additional processing after the future, you can pipe back to self and explicitly fail when you get a Status.Failure:

class MyGoodFutureUsingActor extends FailurePropatingActor{
  import context.dispatcher
  import akka.pattern.pipe

  def receive = {
    case GetElement(list, i) => 
      val fut = Future{
        list(i)
      }

      fut.to(self, sender())

    case d:Double =>
      sender() ! d * 2

    case Status.Failure(ex) =>
      throw ex
  } 
}

If that behavior becomes common, you can make it available to whatever actors need it like so:

trait StatusFailureHandling{ me:Actor =>
  def failureHandling:Receive = {
    case Status.Failure(ex) =>
      throw ex      
  }
}

class MyGoodFutureUsingActor extends FailurePropatingActor with StatusFailureHandling{
  import context.dispatcher
  import akka.pattern.pipe

  def receive = myReceive orElse failureHandling

  def myReceive:Receive = {
    case GetElement(list, i) => 
      val fut = Future{
        list(i)
      }

      fut.to(self, sender())

    case d:Double =>
      sender() ! d * 2        
  } 
}  
Sidneysidoma answered 22/4, 2015 at 12:40 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.