CQRS Repository/event publisher
Asked Answered
C

3

8

I am using CqrsLite for a CQRS-style project. The Save method of the concrete Repository implementation looks like so (with irrelevant lines omitted).

    public void Save<T>(T aggregate, int? expectedVersion = null) where T : AggregateRoot
    {
        if (expectedVersion != null && _eventStore.Get(typeof(T), aggregate.Id, expectedVersion.Value).Any())
            throw new ConcurrencyException(aggregate.Id);

        var i = 0;
        foreach (var @event in aggregate.GetUncommittedChanges())
        {
            // ... [irrelevant code removed] ...
            _eventStore.Save(typeof(T), @event);
            _publisher.Publish(@event);
        }
        aggregate.MarkChangesAsCommitted();
    }

What's troubling me is that this method is committing events to be published to subscribers BEFORE the aggregate is told to mark them as committed. Thus, if an event handler that observes a given event chokes then the aggregate will not have committed changes that previous event handlers may have been notified of.

Why would I not move _publisher.Publish(@event) to after aggregate.MarkChangesAsCommitted(), like so. What am I missing?

    public void Save<T>(T aggregate, int? expectedVersion = null) where T : AggregateRoot
    {
        if (expectedVersion != null && _eventStore.Get(typeof(T), aggregate.Id, expectedVersion.Value).Any())
            throw new ConcurrencyException(aggregate.Id);

        var events = aggregate.GetUncommittedChanges();
        foreach (var @event in events)
        {
            // ... [irrelevant code removed] ...
            _eventStore.Save(typeof(T), @event);
        }
        aggregate.MarkChangesAsCommitted();
        _publisher.Publish(events);
    }
Cato answered 6/11, 2015 at 14:56 Comment(0)
I
8

Both approaches are problematic because there might be an error between Save and Publish, no matter in what order the two methods are called. This can lead to unsaved events being published or saved events not being published. The problem of in-memory state corruption (in aggregate objects) exists as well (although that could be handled by simply catching errors produced by event handlers).

One solution to this problem would be to use two-phase commit (available e.g., if your event store is SQL Server-based and the publisher is MSMQ-based). However, this has performance, scalability, and operations implications, and it doesn't allow late subscribers (see below).

The better approach is to allow parties interested in events to pull them out of the event store (ideally, combining this with some sort of notification mechanism or long polling to make it more "reactive"). This moves the responsibility of tracking the last received event to the subscriber, allowing

  • late subscribers (joining in long after an event was stored) to receive old events as well as new ones,
  • reliability without two-phase commits.

You should find more about this approach when searching for something like "using the event store as a queue", and the video from Greg's answer probably adds a lot to this as well.

A common algorithm is this one:

  1. the event store assigns every saved event a checkpoint token (e.g., a sequence number);
  2. subscribers ask the event store for new events (periodically, based on long polling, reacting on push notifications, etc.) starting from the last checkpoint token they know (if any),
  3. the event store sends newer events starting from that checkpoint token together with a new checkpoint token,
  4. the subscribers handle the events and, if possible, store the new checkpoint token atomically with whatever side effects they produced;
    • if atomic saving is not possible, they can store the new checkpoint token after producing their side effects and they need a way to ignore events they've already seen in case there is an error in-between (event handling is then said to be "idempotent");
  5. subscribers start again at #2.

I'd like to add that I don't consider event stores that ignore the Save/Publish problem production-ready. For alternatives, see Greg Young's Event Store or the (currently more or less unmaintained) NEventStore.

Incinerate answered 10/11, 2015 at 7:49 Comment(4)
Thanks for taking the time to reply. I am confused by the answer because two-phase commits and long-polling pull operations seem to run contrary to the principles of reactive, push message-based architectures. I see that once an event has been saved in the event store then it becomes a persistent fact, but message delivery itself should be a trait of the messaging infrastructure, shouldn't it? Would be very keen to discuss further to understand this better.Cato
I mentioned two-phase commit only because it is one technical way to achieve reliability while relying on a push-based message publishing infrastructure. There is one school of CQRS (Udi Dahan), I think, which makes use of this approach. The other branch of CQRS (Greg Young) has developed away from the idea that events are "published" over one event bus. Instead, as I understand it, they see the event store as the primary data sink of of the domain model (the "write" side of CQRS). All parties interested in events subscribe at the event store.Incinerate
This is not contrary to push-based or reactive, it just moves that concern away from the domain model and its infrastructure.Incinerate
The algorithm I mentioned above is the one subscribers implement to be able to catch up on events. If you use Greg's event store, for example, it will implement all of this for you, so for subscribers it will feel very reactive and push-based indeed.Incinerate
A
1

No matter which order of Commit and Publish you choose, you will have issues if the second one fails.

There are multiple approaches to resolve this issue. Here are your main options:

  1. Use a messaging infrastructure that uses the same database as your application, so that a single transaction can be used. This solution is viable when a very simple messaging infrastructure suffices, and the team decides to build it themselves.

  2. Use 2 phase commits. This has a performance impact, which may or may not be relevant for your application.

  3. Manually ensure you don't get into inconsistent states. See this answer of mine for more information, I don't think it makes sense to copy the whole answer over.

Albertinaalbertine answered 11/11, 2015 at 8:24 Comment(0)
F
-2

This is an anti-pattern and you shouldn't be doing it.

Apparently putting a link prevents me from signing in as its a "suspicious request"

https://www.youtube.com/watch?v=GbM1ghLeweU

Freshet answered 9/11, 2015 at 0:34 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.