Handling out of order events in CQRS read side
Asked Answered
K

3

18

I've read this nice post from Jonathan Oliver about handling out of order events.

http://blog.jonathanoliver.com/cqrs-out-of-sequence-messages-and-read-models/

The solution that we use is to dequeue a message and to place it in a “holding table” until all messages with a previous sequence are received. When all previous messages have been received we take all messages out of the holding table and run them in sequence through the appropriate handlers. Once all handlers have been executed successfully, we remove the messages from the holding table and commit the updates to the read models.

This works for us because the domain publishes events and marks them with the appropriate sequence number. Without this, the solution below would be much more difficult—if not impossible.

This solution is using a relational database as a persistence storage mechanism, but we’re not using any of the relational aspects of the storage engine. At the same time, there’s a caveat in all of this. If message 2, 3, and 4 arrive but message 1 never does, we don’t apply any of them. The scenario should only happen if there’s an error processing message 1 or if message 1 somehow gets lost. Fortunately, it’s easy enough to correct any errors in our message handlers and re-run the messages. Or, in the case of a lost message, to re-build the read models from the event store directly.

Got a few questions particularly about how he says we can always ask the event store for missing events.

  1. Does the write side of CQRS have to expose a service for the read side to "demand" replaying of events? For example if event 1 was not received but but 2, 4, 3 have can we ask the eventstore through a service to republish events back starting from 1?
  2. Is this service the responsibility of the write side of CQRS?
  3. How do we re-build the read model using this?
Kahn answered 20/11, 2016 at 11:15 Comment(3)
We were using "retries" approach with RabbitMq and it worked fine. If after several retries it still doesn't work - you just put this event in a dead letter queue and reset the sequence number so that further events could be processed correctly. What is usually the cause of out-of-order events in your application?Consider
I have some specific commands that will generate multiple events. I haven't implemented anything yet but am vary of possible out of order events. My event publisher works asynchronously too. So there is a possibility that some events might not get published in order as well. I'm relying on my event sequence number to help me put it back together. I will give the retries approach a try. If you can elaborate a bit on it I can mark it as an answer.Kahn
I've added a more detailed explanation in the comments section of my answer.Consider
C
5

If you have a sequence number, then you can detect a situation where current event is out of order, e.g. currentEventNumber != lastReceivedEventNumber + 1

Once you've detected that, you just throw an exception. If your subscriber has a mechanism for 'retries' it will try to process this event again in a second or so. There is a pretty good chance that during this time earlier events will be processed and sequence will be correct. This is a solution if out-of-order events are happening rarely.

If you are facing with this situation regularly, you need to implement global locking mechanism, which will allow certain events be processed sequentially. For example, we were using sp_getapplock in MSSQL to achieve global "critical section" behaviour in certain situations. Apache ZooKeeper offers a framework to deal with even more complicated scenarios when multiple parts of the distributed application require something more than a just simple lock.

Consider answered 21/11, 2016 at 4:26 Comment(7)
I was looking at how certain multiplayer games handle this situation. The game has a built in cache that lasts for about 100ms. It waits 100ms before applying the event in case there is a previous event missing. I'm a bit hesitant to use any locks because of potential scalability issues. Btw how do you ask the event store for a missing event in your implementation?Kahn
If you want to make this system robust, you have to be very careful with all kinds of caches on the aggregate side. What will happen if your application suddenly fails? From my experience, approach with cached events will not scale. In our business case, we were building distributed 24/7 fault tolerant server, which means that you have to have at least 2 instances of the aggregator process on separate physical machines. And if you want to avoid split-brain scenario - you have to consider having 3 separate instances running in parallel.Consider
>>how do you ask the event store for a missing event in your implementation? well, we were using rabbitmq and later switched to azure service bus. Both services provide functionality for a delivery guarantee. You basically letting know queue service that your event has been processed successfully at the end of the transaction on the aggregator side.Consider
And if you need to know if the event is out of sequence - just store last event sequence number for every aggreate in some table. I'm not aware about any one-size-fits-all solution that makes your life very easy when it comes down to ES/CQRS and distributed systems. Everything very depends on your specific business needs.Consider
the idea is to use manual acknowledgement mode when receiving messages. If your sequence number is invalid - just throw an exception, which will mean that acknowledgement will not happen, which will mean that message will be redelivered. You can play with the delay by catching-sleeping-rethrowing of unhandled exceptions in your client implementation.This is how acknowledgement happens in dotnet. Screenshot was made from this tutorial pageConsider
The statement currentEventNumber != lastReceivedEventNumber + 1; is not quite true. Take the case when its lastReceivedEventNumber = 5, and currentEventNumber = 8. Which is a totally valid case. Would this absolutely mean that events with version 6 and 7 did not arrive yet? Should they actually arrive? Nope.Sorayasorb
@CristianE. Why do you think 6 and 7 will not arrive? why miss the sequence and when can that happen?, and if in case those messages are lost then that is invalid.Arielle
I
1

Timestamp based solution:

The incoming messages are:

{
 id: 1,
 timestamp: T2,
 name: Samuel
}
{
 id: 1,
 timestamp: T1,
 name: Sam,
 age: 26
}
{
 id: 1,
 timestamp: T3,
 name: Marlon Samuels,
 contact: 123
}

And what we expect to see irrespective of the ORDER in the database is:

{
 id: 1,
 timestamp: T3,
 name: Marlon Samuels,
 age: 26,
 contact: 123
}

For every incoming message, do the following:

  1. Get the persisted record and evaluate the timestamp.
  2. Whichever's timestamp is greater that's the target.

Now let's go through the messages:

  1. T2 arrives first: Store it in the database as it's the first one.
  2. T1 arrives next: Persistent one (T2) & incoming (T1), so T2 is the target.
  3. T3 arrives: Persistent one (T2) & incoming (T1), so T3 is target.

The following deepMerge(src, target) should be able to give us the resultant:

public static JsonObject deepMerge(JsonObject source, JsonObject target) {
    for (String key: source.keySet()) {
        JsonElement srcValue = source.get(key);
        if (!target.has(key)) { // add only when target doesn't have it already
            target.add(key, srcValue);
        } else {
            // handle recursively according to the requirement

        }
    }
    return target;
}

Let me know in the comment if you need full version of deepMerge()

Inaccuracy answered 12/9, 2020 at 6:43 Comment(0)
U
0

Another alternative would be to feed the service that your reading events from (S1) in such a way that that it can only produce in-order events to your service (S2).

For example if you have loads of events for many different sessions coming in, have an ordering service (O1) at the front end responsible for order. It ensures only one event for each session gets passed to (S1) and only when (S1) and (S2) have both processed it successfully does (O1) allow a new event for that session to pass to (S1). Throw in a bit of queuing too for performance.

Ulani answered 28/3, 2018 at 13:48 Comment(3)
Thinking about it, the front end service (O2) only has to tag the events it passes on with a version for that session, then downstream services have what they need to complete each session/version pair. Whatever is at the far end that may care about versions can then ensure it uses the latestUlani
What you're describing is a scenario better handled by an actor model design pattern. Both Service Fabric and Akka.net have actors that do this exact thing.Kahn
Dasith. I've looked at ServiceFabric recently and really like its reliable collections (well done MS) and yes the sessions I mention could be implemented as Actors. Although I'm not fully sure though that using ServiceFabric, if I invoke lots of async methods on an actor then the order of execution of these tasks is always kept in order.Ulani

© 2022 - 2025 — McMap. All rights reserved.