EventSourced Saga Implementation
Asked Answered
G

1

8

I have written an Event Sourced Aggregate and now implemented an Event Sourced Saga... I have noticed the two are similair and created an event sourced object as a base class from which both derive.

I have seen one demo here http://blog.jonathanoliver.com/cqrs-sagas-with-event-sourcing-part-ii-of-ii/ but feel there may be an issue as Commands could be lost in the event of a process crash as the sending of commands is outside the write transaction?

public void Save(ISaga saga)
{
    var events = saga.GetUncommittedEvents();
    eventStore.Write(new UncommittedEventStream
    {
        Id = saga.Id,
        Type = saga.GetType(),
        Events = events,
        ExpectedVersion = saga.Version - events.Count
    });

    foreach (var message in saga.GetUndispatchedMessages())
        bus.Send(message); // can be done in different ways

    saga.ClearUncommittedEvents();
    saga.ClearUndispatchedMessages();
}

Instead I am using Greg Young's EventStore and when I save an EventSourcedObject (either an aggregate or a saga) the sequence is as follows:

  1. Repository gets list of new MutatingEvents.
  2. Writes them to stream.
  3. EventStore fires off new events when streams are written to and committed to the stream.
  4. We listen for the events from the EventStore and handle them in EventHandlers.

I am implementing the two aspects of a saga:

  1. To take in events, which may transition state, which in turn may emit commands.
  2. To have an alarm where at some point in the future (via an external timer service) we can be called back).

Questions

  1. As I understand event handlers should not emit commands (what happens if the command fails?) - but am I OK with the above since the Saga is the actual thing controlling the creation of commands (in reaction to events) via this event proxy, and any failure of Command sending can be handled externally (in the external EventHandler that deals with CommandEmittedFromSaga and resends if the command fails)?

  2. Or do I forget wrapping events and store native Commands and Events in the same stream (intermixed with a base class Message - the Saga would consume both Commands and Events, an Aggregate would only consume Events)?

  3. Any other reference material on the net for implementation of event sourced Sagas? Anything I can sanity check my ideas against?

Some background code is below.

Saga issues a command to Run (wrapped in a CommandEmittedFromSaga event)

Command below is wrapped inside event:

public class CommandEmittedFromSaga : Event
{
    public readonly Command Command;
    public readonly Identity SagaIdentity;
    public readonly Type SagaType;

    public CommandEmittedFromSaga(Identity sagaIdentity, Type sagaType, Command command)
    {
        Command = command;
        SagaType = sagaType;
        SagaIdentity = sagaIdentity;
    }
}

Saga requests a callback at some point in future (AlarmRequestedBySaga event)

Alarm callback request is wrapped onside an event, and will fire back and event to the Saga on or after the requested time:

public class AlarmRequestedBySaga : Event
{
    public readonly Event Event;
    public readonly DateTime FireOn;
    public readonly Identity Identity;
    public readonly Type SagaType;

    public AlarmRequestedBySaga(Identity identity, Type sagaType, Event @event, DateTime fireOn)
    {
        Identity = identity;
        SagaType = sagaType;
        Event = @event;
        FireOn = fireOn;
    }
}

Alternatively I can store both Commands and Events in the same stream of base type Message

public abstract class EventSourcedSaga
{
    protected EventSourcedSaga() { }

    protected EventSourcedSaga(Identity id, IEnumerable<Message> messages)
    {
        Identity = id;

        if (messages == null) throw new ArgumentNullException(nameof(messages));

        var count = 0;

        foreach (var message in messages)
        {
            var ev = message as Event;
            var command = message as Command;

            if(ev != null) Transition(ev);
            else if(command != null) _messages.Add(command);
            else throw new Exception($"Unsupported message type {message.GetType()}");

            count++;
        }

        if (count == 0)
            throw new ArgumentException("No messages provided");

        // All we need to know is the original number of events this
        // entity has had applied at time of construction.
        _unmutatedVersion = count;
        _constructing = false;
    }

    readonly IEventDispatchStrategy _dispatcher = new EventDispatchByReflectionStrategy("When");
    readonly List<Message> _messages = new List<Message>();
    readonly int _unmutatedVersion;
    private readonly bool _constructing = true;
    public readonly Identity Identity;

    public IList<Message> GetMessages()
    {
        return _messages.ToArray();
    }

    public void Transition(Event e)
    {
        _messages.Add(e);
        _dispatcher.Dispatch(this, e);
    }

    protected void SendCommand(Command c)
    {
        // Don't add a command whilst we are in the constructor. Message
        // state transition during construction must not generate new
        // commands, as those command will already be in the message list.
        if (_constructing) return;

        _messages.Add(c);
    }

    public int UnmutatedVersion() => _unmutatedVersion;
}
Godroon answered 30/10, 2015 at 6:2 Comment(3)
In my opinion sagas can issue commands, usually based on events happening across multiple aggregate roots. The issue I see with recording commands is the logic needed to replay those commands. You'll need some sort of command idempotency, how do you know that the command should be replayed? Replaying the command later after other events have happened may change results. If the command triggers interaction with third party systems, what is the impact of replaying it?Satirize
@Satirize correct, the GetEventStore will publish all events one at a time, we can request a reply from a specific unique position. To prevent replay of commands we would just as the EventStore to replay from the last known event forwards (so we store the last dispatched event/message/command once sent)Godroon
The issue is that the commands and events are all interleaved in a stream, and how do you know to skip a command? The events that are generated as a result of executing the command you are looking at in the stream are ahead in the stream somewhere (or not if the command/process failed).Satirize
A
9

I believe the first two questions are the result of a wrong understanding of Process Managers (aka Sagas, see note on terminology at bottom).

Shift your thinking

It seems like you are trying to model it (as I once did) as an inverse aggregate. The problem with that: the "social contract" of an aggregate is that its inputs (commands) can change over time (because systems must be able to change over time), but its outputs (events) cannot. Once written, events are a matter of history and the system must always be able to handle them. With that condition in place, an aggregate can be reliably loaded from an immutable event stream.

If you try to just reverse the inputs and outputs as a process manager implementation, it's output cannot be a matter of record because commands can be deprecated and removed from the system over time. When you try to load a stream with a removed command, it will crash. Therefore a process manager modeled as an inverse aggregate could not be reliably reloaded from an immutable message stream. (Well I'm sure you could devise a way... but is it wise?)

So let's think about implementing a Process Manager by looking at what it replaces. Take for example an employee who manages a process like order fulfillment. The first thing you do for this user is setup a view in the UI for them to look at. The second thing you do is to make buttons in the UI for the user to perform actions in response to what they see on the view. Ex. "This row has PaymentFailed, so I click CancelOrder. This row has PaymentSucceeded and OrderItemOutOfStock, so I click ChangeToBackOrder. This order is Pending and 1 day old, so I click FlagOrderForReview"... and so forth. Once the decision process is well-defined and starts requiring too much of the user's time, you are tasked to automate this process. To automate it, everything else can stay the same (the view, even some of the UI so you can check on it), but the user has changed to be a piece of code.

"Go away or I will replace you with a very small shell script."

The process manager code now periodically reads the view and may issue commands if certain data conditions are present. Essentially, the simplest version of a Process Manager is some code that runs on a timer (e.g. every hour) and depends on particular view(s). That's the place where I would start... with stuff you already have (views/view updaters) and minimal additions (code that runs periodically). Even if you decide later that you need different capability for certain use cases, "Future You" will have a better idea of the specific shortcomings that need addressing.

And this is a great place to remind you of Gall's law and probably also YAGNI.

  1. Any other reference material on the net for implementation of event sourced Sagas? Anything I can sanity check my ideas against?

Good material is hard to find as these concepts have very malleable implementations, and there are diverse examples, many of which are over-engineered for general purposes. However, here are some references that I have used in the answer.

DDD - Evolving Business Processes
DDD/CQRS Google Group (lots of reading material)


Note that the term Saga has a different implication than a Process Manager. A common saga implementation is basically a routing slip with each step and its corresponding failure compensation included on the slip. This depends on each receiver of the routing slip performing what is specified on the routing slip and successfully passing it on to the next hop or performing the failure compensation and routing backward. This may be a bit too optimistic when dealing with multiple systems managed by different groups, so process managers are often used instead. See this SO question for more information.

Aecium answered 6/11, 2015 at 20:7 Comment(1)
Took me a while to get around to this then to see your thinking. And I agree, instead of getting worked up about the commands that have come in we should instead worry about storing the fact that we have transitioned state within the saga, and just replay those transitions events (as opposed to the commands that caused them).Godroon

© 2022 - 2024 — McMap. All rights reserved.