CQRS read side, multiple event stream topics, concurrency / race conditions
Asked Answered
W

2

6

I'm having an issue with (re)applying events from multiple topics in the correct order on the read / query side.

Example:

On the write / command side, we have 2 Aggregates with an n:m relationship:

  • Contact
  • Group

Those Aggregates produce the following events on 2 separate event stream topics (because best practices says: One topic per aggregate. And I totally agree):

  • Contact Topic:

    1. ContactCreated (contactId: "123", name: "Peter")
    2. ContactAddedToGroup (contactId: "123", groupId: "456")
  • Group Topic:

    1. GroupCreated (groupId: "456", name: "Customers")

On the read / query side (e.g. Elasticsearch) I'd like to execute this query:

  • Find all Contacts that belong to any Group which name begins with Custo...
  • Find all Groups which name begins with Custo... (this shouldn't be a problem at all)

To achieve this, there are 2 read models. Example Data:

  • {contactId: "123", name: "Peter", groups: [{id: "456", name: "Customers"}]}
  • {groupId: "456", name: "Customers"}

The Problem:

The order of events can only be guaranteed for a single Event Topic (like in Apache Kafka). Though the 3 Events can be consumed by the read / query side in multiple ways: 1,2,3 or 1,3,2 or 3,1,2

How to handle 1,2,3? Database pseudo statements example:

  1. INSERT Contact (contactId: "123", name: "Peter")
    • FIND Group WHERE (groupId: "456") (doesn't work, because Group wasn't inserted yet)
    • UPDATE Contact WHERE (contactId: "123") ADD Group (groupId: "456", name: "???") (here's the problem)
  2. INSERT Group (groupId: "456", name: "Customers")

Idea(s):

  • I could extend the algorithm and append one more statement. This will lookup all Contacts that have been added to the Group, and add the Group name to those (to make the search query work):

    1. UPDATE Contact WHERE (groupId: "456") REPLACE Group (groupId: "456", name: "Customers")
  • Another idea (I don't like) could be to only use a single event stream topic. Then the order of events will always be correct. But there will be cases where this won't be easily possible. (Also best practices tell, that one should use one topic per aggregate)

  • Ignore the problem, because it's pretty unlikely to happen, because the User will provide the necessary delay between Create Group and Add Contact To Group. But when it comes to Event Replay there's no delay, and Event Topics can be consumed in parallel / 'random' order.

Question(s):

This scenario should be fairly common. But unfortunately there are very few real world CQRS examples on the web. And most of them won't explain the small / hidden pitfalls.

How do you solve those problems?

Wayfarer answered 25/11, 2017 at 5:28 Comment(2)
A similar SO question can be found here: #53688839Wayfarer
Keep in mind that order is guaranted inside partition so putting all events in one topic might not result in correct order.Wear
S
2

In your example, you are guaranteed that the GroupCreated event (3) has been added before ContactAddedToGroup (2), since obviously the user cannot add a contact to a group before the group has been created. So the GroupCreated event will be available to be read, even if you happen to read the ContactAddedToGroup event first.

Sticking with the 2 separate streams (which is definitely correct, since Groups and Contacts are separate aggregates), this is one approach:

  • The contact can maintain its own table of group names (just id and name columns are needed in your example). Or if you're happy to couple Groups and Contacts (they sound like the same bounded context), you can just have the single event handler dealing with both Group and Contacts projections.
  • The projection handler subscribes to both Group and Contact events (from a single process and thread).
  • If it reads a contact added message for a group that it doesn't have in its group names table, it immediately performs a catch-up on the group events (or at least catches up far enough that it does get that group) and then processes the contact event again.

This approach will work during replay as well as during live processing. During replay, you can also choose just to consume parent streams completely (e.g. Group) before you even start consuming the projection's main stream (Contact in this case), though you still need to be prepared to catch up again on the Group stream if necessary, since new events may be coming in during catch up.

The single thread also ensures there is no race condition if you have GroupRenamed events - you can be sure you will rename the column in all contacts, whereas with multiple threads you might have a race between inserting a contact with the old group name and the query that updates all group names in contacts that use that group. If you need crazy scale, you'll have to shard your contacts, and have each shard maintain its own group name table, to avoid race conditions.

The other approach is to decide that Group Name is allowed to be null, and just do the contacts update when you read the event (your first idea). So you'll treat new groups and group renames (if allowed) in much the same way, but your clients will need to deal with temporarily null group names in contacts, which may be an unwelcome complication.

Sliding answered 27/11, 2017 at 12:21 Comment(2)
Another approach to deal with null group names is to simply remove them before sending them to the client. E.g. 1. retrieve contact from DB, 2. remove groups with name == null, 3. send result to client. In this case it should be easy, but there may be aggregates that could have 10 or 20 of those cases.Wayfarer
Even though GroupCreatedEvent was published before ContactAddedToGroup event, there is not guarantee that GroupCreated event will be handled first, because they were published in different topics. GroupCreated event's topic can fail at some point, and while recovering, ContactAddedToGrouo can reach destination earlier. Or there may be some network / infrastructure latencied in the first topic. So ordering is not guaranteed.Ramify
C
0

How do you solve those problems?

On remedy is to avoid trying to rebuild images from unstable representations of the event history. When you are loading state into the write model, you will normally do so by querying a "document" that has all of the history of your aggregate in the order that they were written.

Taking the same approach in the read model, whereby you read the stable event history for each topic, avoids the problems that you might face because the topic events arrive out of order.

See Greg Young's talk on polyglot data.

You can take the same approach when building a read model from multiple topics, which gives you a consistent history for each topic... but not necessarily a synchronized whole.

So to use your specific example you might have ContactCreated (contactId: "123", name: "Peter") ContactAddedToGroup (contactId: "123", groupId: "456"), but without the event that belongs in the "middle". So now what?

One possible answer is to build the view using the unaligned histories - you have Contact information as of 00:15, and Group information as of 00:00, and you make that temporal discrepancy part of the read model. This might include using a variation of the NullObject pattern to represent objects that don't exist yet.

Another possibility would be to use something like a Lamport Clock to keep track of the dependencies between events in different topics. That might look like meta data in ContactAddedToGroup that lets the consumer know that event is consequent to GroupCreated. The consumer could then decide whether or not to ignore events that are missing precedents.

Capel answered 25/11, 2017 at 12:55 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.