What's the StreamId in EventSourcing when a domain event affects multiple aggregates in the same Bounded Context?
Asked Answered
H

2

6

Streams

Some authors suggest to classify the events in "streams", and many authors identify the "stream" with the "aggregate Id".

Say an event car.repainted by which we mean we repainted the car with id 12345 into {color:red}.

In this example the stream Id would probably be something like car.12345 or if you have universal unique ids, then just 12345.

Some authors in fact suggest to store the event stream into a table with an structure more or less similar to the following (if you go with relational):

| writeIndex | event | cachedEventId | cachedTimeStamp | cachedType    | cachedStreamId |
| 1          | JSON  | abcd          | xxxx            | car.repainted | 12345          |
  • The event column has the "original" value object of the event, most probably serialized to JSON if it's relational DB.
  • The writeIndex is just for DB administration and has nothing to do with the domain itself. You can "dump" your events into another DB and have writeIndex rewritten with no side-effects.
  • The cached* fields are for easily finding and filtering events and they all can be calculated from the event itself.
  • Of special mention cachedStreamId which will be used -according to some authors- to be mapped to the "aggregate Id to which the event belongs to". In this case, "car identified by 12345".

If you don't go with relational, you'd probably store your event "as a document" in a data-lake / event-store / document-warehouse / or-call-it-how-you-want (mongo, redis, elasticsearch...) and then you make buckets or groups or selections or filters to retrieve some events by a criteria (and one of the criteria is "what entity/aggregate Id I'm interested in" => streamId again).

Replaying

When replaying the events to create fresh projections you just have a bunch of subcribers to the event type (and probably version) and if it is for you, you read the full-original-document of the event, you process it, calculate and update the projection. And if the event is not for you, you just skip it.

When replaying, you restore the aggregate read-tables you want to rebuild to a known initial set (maybe "all empty"), then select one or more streams, select the events in chronological order and iteratively update the state of the aggregates.

Okey...

All this seams to me reasonable. No news until here.

Question

But... I have now some shortcircuit in my head... It's a so basic shortcircuit that probably the answer is so obvious that I'll feel silly to not being able to see it now...

What happens... if an event is "equally important" to two aggregates of different types (assuming they are inside the same bounded context) or if even it refers to two instances of the same aggregate type.

Example of 2 equally-important different aggregates:

Imagine you are in the train industry and you have those aggregates:

Locomotive
Wagon

For one moment just imagine that one locomotive can carry 0 or 1 wagon but not many wagons.

And you have those commands:

Attach( locomotiveId, wagonId )
Detach( locomotiveId, wagonId )

Attach can be rejected if locomotive and wagon were already attached to something and Detach can be rejected if the command is issued when they are not attached.

The events are obviously the corresponding ones:

AttachedEvent( locomotiveId, wagonId )
DetachedEvent( locomotiveId, wagonId )

Q:

What's the stream id there? both loco and wagon are of equal importance, it's not an event "of the loco" or "of the wagon". It's an event of the domain that affects those two! Which one is the streamId and why?

Example with 2 aggregates of the same type

Say an issue tracker. You have this aggregate:

Issue

And these commands:

MarkAsRelated( issueAId, issueBId )
UnmarkAsRelated( issueAId, issueBId )

And mark is rejected if the mark was already there and unmark is rejected it there was not any previous mark.

And those the events:

MarkedAsRelatedEvent( issueAId, issueBId )
UnmarkedAsRelatedEvent( issueAId, issueBId )

Q:

Same question here: It's not that the relationship "belongs" to issue A or B. Either they are related or not. But its bidirecional. If A is related to B then B is related to A. What's the streamId here and why?

History is written once

In any case, I don't see creating TWO events one for each. That's a matter of the calculators...

If we see the definition of "history" (not in computers, in general!) it says "a sequence of events that happened". In the free dictionary it says: "A chronological record of events" (https://www.thefreedictionary.com/history)

So when there's war between social group A and social group B and say B beats A, you do not write 2 events: lost(A) and won(B). You just write one event warFinished( wonBy:B, lostBy:A ).

Question

So how do you handle event streams when the event affects multiple entites at the time and it's not that it "belongs" to one and the other is a complement to that but it's truly equal to both?

Hyde answered 1/9, 2018 at 0:57 Comment(0)
O
2

What happens... if an event is "equally important" to two aggregates of different types (assuming they are inside the same bounded context) or if even it refers to two instances of the same aggregate type

is a simple (note: not easy) idea. Instead of overwriting previous state when we save an aggregate to our stable storage, we write a new version, linked back to the previous version. Furthermore, instead of writing out an entire copy of the new version, we write out a diff, and the diff is expressed in a domain specific way.

Saving an aggregate to a stream, therefore, is analogous to saving a representation of the aggregate as a document in a key value store, or as rows in a relational database.

When you ask "which stream" does it belong in: it belongs in the stream of the aggregate that changed, just as it would in either of those other storage strategies.

If you aren't sure which aggregate changed, then what you have is a modeling problem, not an event sourcing problem.

Both of your examples describe introducing a relation between two aggregates; it's analogous to having a many to many relationship between two tables in a database. So who owns the M2M table?

Well, if neither aggregate needs that information to ensure its own invariant, then the M2M table might be an aggregate all by itself.

Imagine a representation of a contract between two parties - it might turn out that the two parties are incidental, and "Contract" is the important idea, worthy of being modeled as its own thing.

If the relation is clearly "part of" one aggregate (that aggregate is guarding invariants that depend on the state of the relation), then that aggregate will be responsible for editing the new table, and the other aggregate will ignore it.

If both aggregates care about the relations, then you have one of two problems

1) Your analysis of the domain is wrong - you've drawn your aggregate boundaries in the wrong place. Get thee to a white board and start drawing things out.

2) You have two copies of the relation -- one for each aggregate, but those copies aren't necessarily consistent with each other.

Here's an important heuristic: if you really have two different aggregates, you should be able to store them in two completely different databases. They can't share each other's data, but they can keep their own versioned/timestamped/cached copy of the other guys data.

So left-hand-aggregate makes a change, and the "plumbing" sends a "left-hand-aggregate-changed" message to right-hand-aggregate, then right-hand-aggregate updates his cache.

Note how this would work in the case where we think the contract is a first class concern that manages its own state. The model updates the contract, saving the changes to its state, and then the plumbing comes along and delivers a copy of the changes to each of left-hand-aggregate and right-hand-aggregate.

Simple. Not necessarily easy.

Olander answered 1/9, 2018 at 14:52 Comment(0)
F
0

I don't think it has anything to do with event sourcing per se. Perhaps the design can be tinkered with a bit.

I would go with something like this for the locomotive:

public class Locomotive
{
    Guid Id { get; private set; }
    Guid? AttachedWagonId { get; private set; }

    public WagonAttached Attach(Guid wagonId)
    {
        return On(
            new WagonAttached
            {
                Id = wagonId
            });
    }

    private WagonAttached On(WagonAttached wagonAttached)
    {
        AttachedWagonId = wagonAttached.Id;

        return wagonAttached;
    }
}

The event stream for the Locomotive is where the WagonAttached event will reside. In what way the Wagon aggregate is dependent on this event is what is up for debate. I would argue that the wagon probably doesn't care much much in the same way that a Product isn't too concerned with which Order (may in this case) it is related to. The aggregate Order is the side that seems more appropriate for the OrderItem associative entity. I'm guessing that your locomotive-to-wagon relationship would probably follow the same pattern given that a locomotive would have more than one wagon attached. Probably a bit more to the design but I'm going to assume that these are hypothetical examples.

The same goes for the Issue. If one could attach multiple then the Order to Product concept comes into play. Even though two issues are involved there is a direction of sorts given that one issue, as a subordinate, is attached to the primary issue. Perhaps event with a RelationshipType such as Dependency, Impediment, etc. In such as case one would probably go with a value object to represent that:

public class Issue
{
    public class RelatedIssue
    {
        public enum RelationshipType
        {
            Dependency = 0,
            Impediment = 1
        }

        public Guid Id { get; private set; }
        public RelationshipType Type { get; private set; }

        public RelatedIssue(Guid id, RelationshipType type)
        {
            Id = id;
            Type = type;
        }
    }

    private readonly List<RelatedIssue> _relatedIssues = new List<RelatedIssue>();

    public Guid Id { get; private set; }

    public IEnumerable<RelatedIssue> GetRelatedIssues()
    {
        return new ReadOnlyCollection<RelatedIssue>(_relatedIssues);
    }

    public IssueRelated Relate(Guid id, RelationshipType type)
    {
        // probably an invariant to check for existence of related issue

        return On(
            new IssueRelated
            {
                Id = id,
                Type = (int)type
            });
    }

    private IssueRelated On(IssueRelated issueRelated)
    {
        _relatedIssues.Add(
            new RelatedIssue(
                issueRelated.Id, 
                (RelatedIssue.RelationshipType)issueRelated.Type));

        return issueRelated;
    }
}

The point is that the event belongs to a single aggregate but still represents the relationship. You just need to identify the side that makes the most sense.

The events can (or should) also be published using some event-driven architecture method (say a service bus) in order for other interested parties to be notified.

Flagellate answered 1/9, 2018 at 6:25 Comment(10)
Hmmm... I'm not very sure. My reasoning: For me "the model" is the most near-to-reality representation and should be infrastructure-agnostic. We are here "blending" the events because we want a "streamId" in the infrastrucrture to search and filter. Are we sure? Would it maybe be that editing events because of the infrastructure is a code-smell that should trigger an alert tu us?Hyde
Internal question: If I had to come that I supported "multiple streams" for a single event: Would I have mutated my original event structure? Would I have "forced" a directionality between issues where there's not such directionality in my example? I think I should probably not... But still I'm unsure... exploring new points of view.Hyde
During the last year I'm putting under judgement if we should "tag" events into zero or more streams instead of "classifying them" to a specific one... so events will appear in "all the channels" that are affected. Instead of having a streamId, have an array of zero-to-N streamIds. Zero would be useful to reflect state events that do not belong to a specific aggregate but may change the state on how calculators need to calculate, like newTariffDocumentSetReceived(). Again: unsure. Just pushing EventSourcing classical structures to the limit and putting all under judgement.Hyde
Interesting. Are you perhaps regarding events as more "general" in a sense? As in more system-level? I do still regard service bus "system" events as useful but they certainly are more on an infrastructure level. For example, EMailSentEvent. However, in event sourcing the events are specifically for hydrating an aggregate which means they have a strong affinity to the domain model and the event produced from it. Thoughts?Flagellate
I'm talking exclussively about domain events in this question. I actually do have in my systems 2 more "event loggers", one at application level and another at infrastructure level:Hyde
At application level, I log all the "application execution events". I can there know if "Alice (for example a user) did it on the web, with which cookies, from which IP..." or "Bob (say system adminstrator) did it on the command line, in which server, what directory...". No changes in the domain can exist if it is not 'coz an application triggered it. It's always an incoming API call, an HTML web form submission, a command line invocation... In the domain event I always reference the "applicationExecutionId" so given a domain event I can go to application level to audit who, when, how...Hyde
In the infrastructure level, I record all the outgoing API calls. If to validate a phone number via SMS I have 4 "code-sendings via SMS" because the user clicks "re-send" and only the last one got "validated" then I'll have 4 api-call events "sms.sent" and 1 domain-event "phone.validated". For this question I'm exclussively talking about the "domain" and leave application and infrastructure aside. In your example EMailSentEvent would only be in the domain if it's for example a "sending of a quotation" so the domain expert needs to know it. Although it'd probably named "QuotationSentEvent".Hyde
Application event loggers and Infrastructure event loggers are completely separated from domain persistance. They may share the same DB connection into different tables but they could perfectly be in other DBs or other systems. I'm only concerned about "domain events" in my analysis of the stream-id in this question.Hyde
Ah. Thanks for clarifying that :) --- I'd like to understand why you think one event should reside in two aggregates though. That seems to be the sticking point. Perhaps this can become a chat of sorts but I don't know how to do that on SO...Flagellate
Opened a gist, so we avoid cluttering SO with a dialog. - Follow up here: gist.github.com/xmontero/869447b285cb1b93995bf72a6259d7d8Hyde

© 2022 - 2024 — McMap. All rights reserved.