Pausing an actor in Akka
Asked Answered
R

2

13

I have an actor in Akka that will process messages to create certain entities. Some fields on these entities are computed based on the state of other entities in the database at the moment of creation.

I would like to avoid creating a race condition where the actor processing goes faster than the database is able to persist the entities. This may lead to inconsistent data, going like:

  • Actor creates a Foo and sends it to other actors for further processing and saving
  • The actor is asked to create another Foo. Since the first one is not yet saved, the new one is created based on the old content of the DB, thereby creating a wrong Foo.

Now, this possibility is quite remote, since the creation of the Foos will be triggered manually. But it is still conceivable that a double click may cause problems under high load. And who knows if tomorrow Foo will be created automatically.

Hence, what I need is some way to tell the actor to wait, and resume its operations only after confirmation that the Foos have been saved.

Is there a way to put an actor in idle state, and tell it to resume its operations after a while?

Basically, I would like to use the mailbox as a message queue, and have control over the processing speed of the queue.

Runin answered 28/1, 2013 at 14:48 Comment(5)
Have you looked at Ask: Send And Receive Future?Bewail
@EmilH Well, this use case does not look as simple as sending futures. The problem is not being notified when the Foo has been saved, but rather waiting to process other messages until then. I would to like to block my actor for a while, so that its mailbox acts as a queue while I am still processing the Foo. But it is possible that I have misunderstood your comment, and maybe this is exactly what I need. In case, could you please expand the comment into an answer?Runin
You could use become/unbecome to re-que the messages that need wait, until the actor receives the proper FooSaved message that restores the previous behaviour. I'm a little baffled by your case, because, unless the saving operation occurs rarely, the actor's asynchronous nature becomes useless since it's bound to a persistence layer's response time... what's the use of the actor's high throughput if it's slowed down by some db operation?Tatiana
@Tatiana In fact I implemented a solution which turns out to agree with your suggestion. You are right that there is no reason to put this work into an actor, in isolation. But we have many other tasks that do not require such a blocking interface, hence our system is actor based, and it would be nice to incorporate such an operation in the same paradigm.Runin
I guessed it was something along this line...Tatiana
E
23

No, you cannot suspend an actor: actors always pull messages from their mailbox as quickly as possible. This leaves only the possibility that incoming requests are stashed away, to be processed later:

class A(db: ActorRef) extends Actor with Stash {
  def receive = {
    case Request =>
      doWork()
      db ! Persist
      context.setReceiveTimeout(5.seconds)
      context.become({
        case Request        => stash()
        case Persisted      => context.unbecome(); unstashAll()
        case ReceiveTimeout => throw new TimeoutException("not persisted")
      }, discardOld = false)
  }
}

Please note that message delivery is not guaranteed (or the database may be down) and therefore the timeout is recommended practice.

The underlying problem

This problem shows up mostly in those cases which are not well aligned between the actor model and the domain model: the actor is the unit of consistency, but in your use-case your consistent image requires an up-to-date external entity (the database) so that the actor does the right thing. I cannot recommend a solution without knowing more about the use-case, but try to remodel your problem taking this into account.

Elysha answered 28/1, 2013 at 17:5 Comment(0)
R
6

It turns out that this only requires a few lines. This is the solution I came up with, which agrees with pagoda_5b suggestion:

class QueueingActor(nextActor: ActorRef) extends Actor with Stash {
  import QueueingActor._

  def receive = {
    case message =>
      context.become({
        case Resume =>
          unstashAll()
          context.unbecome()
        case _ => stash()
      })
      nextActor ! message
  }
}

object QueueingActor {
  case class Resume()
}
Runin answered 28/1, 2013 at 16:25 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.