Wait on asynchronous Future call before processing next message in Akka
Asked Answered
L

5

19

When receiving events, Akka Actors will process one message at a time, blocking until a request has been completed before moving onto the next message.

This works well for synchronous/blocking tasks, however if I want to perform an asynchronous/non-blocking request, Akka will continue processing without waiting for the task to complete.

For example:

 def doThing():Future[Unit] = /* Non blocking request here */

 def receive = {
     case DoThing => doThing() pipeTo sender
 } 

This will call doThing() and start processing the future, but will not wait for it to complete before processing the next message - it will simple execute the next messages in the queue as fast as possible.

In essence, it appears that Akka considers "returning a future" to be "finished processing" and moves onto the next message.

In order to process one message at a time, it appears I need to actively block the Actor thread to stop it doing so

def receive = {
    case DoThing => sender ! blocking(Await.result(doThing()))
}

This feels like a very wrong approach - It's artificially blocking a thread in code that should otherwise be completely non-blocking.

When comparing Akka to, say, Elixir actors, we can easily avoid this problem in the first place by using a tail call to request the next message without needing to artificially block.

Is there any way in Akka to either

a) Wait for a Future to complete before processing the next message without blocking the thread.

b) Use an explicit tail call or some other mechanism to use a pull-based workflow instead of push based?

Loraine answered 13/4, 2016 at 7:58 Comment(2)
Perhaps a work pulling pattern would be suited to your needs - when the Future does complete, you can have it queue a new message to process the result.Predation
The actor has to either block, or deal with the incoming messages somehow, to keep up with demands. If stopping the processing is the most natural thing to do, you could use different behaviours based on the processing state and stash away messages for the duration of the asynchronous action, see doc.akka.io/docs/akka/snapshot/scala/… and doc.akka.io/docs/akka/snapshot/scala/actors.html#StashCarlenacarlene
O
12

Like it was suggested in the comments, you can use the Stash (http://doc.akka.io/docs/akka/current/scala/actors.html#Stash) trait to store incoming messages as you wait for the Future to resolve.

It's required to save the current sender so that you don't improperly close over the sender actor reference. You can achieve this through a simple case class like the one defined below.

class MyActor extends Actor with Stash {

  import context.dispatcher

  // Save the correct sender ref by using something like
  // case class WrappedFuture(senderRef: ActorRef, result: Any)
  def doThing(): Future[WrappedFuture] = ???

  override def receive: Receive = {
    case msg: DoThing =>
      doThing() pipeTo self

      context.become({
        case WrappedFuture(senderRef, result) =>
          senderRef ! result
          unstashAll()
          context.unbecome()
        case newMsg: DoThing =>
          stash()
      }, discardOld = false)
  }
}
Oleograph answered 12/5, 2016 at 19:8 Comment(1)
The problem with this approach is that while you are waiting for the Future(s) to return, you are effectively blocking all incoming messages, even if that would not be necessary (almost like a blocking IO call). So you have to be extra careful not to make this a bottleneck in your application.Lorin
D
3

With Akka Streams, you could use mapAsync:

import akka.actor.ActorSystem
import akka.stream._
import akka.stream.scaladsl._
import scala.concurrent.Future

implicit val system = ActorSystem("ThingDoer")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher

case object DoThing

def doThing(): Future[Unit] = Future {
  println("doing its thing")
}

Source((1 to 10).map(_ => DoThing))
  .mapAsync(parallelism = 1)(_ => doThing())
  .runWith(Sink.ignore)
Duct answered 10/5, 2018 at 19:35 Comment(0)
L
0

Instead of having one actor dealing with this problem, have a chain of two:

  • Actor 1 receives the initial messages, starts all the IO calls and merges them in a Future
  • Actor 2 receives the results of the merged Futures.

This does not guarantee the preservation of message ordering, so if you need that then Actor 2 will have to be aware of the messages Actor 1 has seen, and potentially stash early messages on itself.

I am not aware of anything in Akka that solves this problem. Maybe there is a library that implements a pattern like this?

Lorin answered 17/1, 2017 at 16:23 Comment(0)
T
0

As David suggested but if you like to stash all messages

class MyActor extends Actor with Stash {
  def doThing(): Future[WrappedFuture] = {}

  override def receive: Receive = {
    case msg: DoThing =>
      val future = doThing() pipeTo sender

      //when future finishes unstash all messages
      future.onComplete {
        case Failure(e) => {
          unstashAll()
          context.unbecome()
        }
        case Success(value) =>  {
          unstashAll()
          context.unbecome()
        }
      }
      // Stash incoming messeges until finished with future,
      context.become({
        case _ ⇒ stash()
      }, discardOld = false)
  }
}
Thermoelectric answered 10/5, 2018 at 9:2 Comment(0)
Z
0

context.become and context.unbecome are not thread safe so we must not call them directly from the completion context of a local future, but we could send ourselves an actor message and handle the context in a receive function:

  override def receive: Receive = {
    case DoThing =>
      val future = doThing() pipeTo sender

      //when future finishes stop stashing and unstash all messages
      future.onComplete {
        case Failure(e) => {
          self ! StopStash
        }
        case Success(value) =>  {
          self ! StopStash
        }
      }
      // Stash incoming messages until finished with future,
      context.become(stashing, discardOld = false)
  }

  def stashing: Receive = {
    case StopStash =>
      context.unbecome() // context.unbecome is not thread safe so we cannot call it in the success context of a future
      unstashAll()
    case _ =>
      stash()
  }
Zorazorah answered 29/3, 2023 at 5:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.