Understanding of Axon Event Sourcing
Asked Answered
C

2

5

I have been learning about axon and event sourcing and I think I have finally understood part of it and it makes sense in my head, but I want to make sure that my understanding of it is correct and that I am not making any mistakes. The code works, and I can see the events in my DOMAIN_EVENT_ENTRY table also.

I will post my code below (simple GiftCard example from the docs) and explain my thought process. If I have not understood it correctly, please could you help me on understanding that part in the correct way.

I've not included the commands/events as they are very simple with fields for id,amount

First my code:

TestRunner.java

package com.example.demoaxon;

import java.util.UUID;

import org.axonframework.commandhandling.gateway.CommandGateway;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import lombok.extern.slf4j.Slf4j;

@Component
@Slf4j
public class TestRunner implements CommandLineRunner {
    
    private final CommandGateway commandGateway;
    
    @Autowired
    public TestRunner(CommandGateway commandGateway) {
        this.commandGateway = commandGateway;
    }
    
    @Override
    public void run(String... args) throws Exception {
        log.info("send command");
        String id = UUID.randomUUID().toString();
        commandGateway.sendAndWait(new IssueCardCommand(id,100));
        commandGateway.sendAndWait(new RedeemCardCommand(id,90));
        
    }
}

GiftCard.java

package com.example.demoaxon;

import org.axonframework.commandhandling.CommandHandler;
import org.axonframework.eventsourcing.EventSourcingHandler;
import org.axonframework.modelling.command.AggregateIdentifier;
import static org.axonframework.modelling.command.AggregateLifecycle.apply;
import org.axonframework.spring.stereotype.Aggregate;

import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

@NoArgsConstructor
@Aggregate
@Slf4j
public class GiftCard {
    
    @AggregateIdentifier
    private String giftCardId;
    private Integer amount;
    
    @CommandHandler
    public GiftCard(IssueCardCommand cmd) {
        log.info("handling {}",cmd);
        apply(new CardIssuedEvent(cmd.getCardId(),cmd.getAmount()));
    }
    
    @EventSourcingHandler
    public void onCardIssuedEvent(CardIssuedEvent evt) {
        log.info("applying {}",evt);
        this.giftCardId = evt.getCardId();
        this.amount = evt.getAmount();
    }
    
    @CommandHandler
    public void redeemCardCommandHandler(RedeemCardCommand cmd) {
        log.info("handling {}",cmd);
        this.amount -= cmd.getAmount();
        apply(new CardRedeemedEvent(cmd.getCardId(),cmd.getTransactionId(),this.amount));
    }
    
    @EventSourcingHandler
    public void onCardRedeemedEvent(CardRedeemedEvent evt) {
        log.info("applying {}",evt);
        this.amount = evt.getAmount();
    }   
}

From what I can understand:

  1. In my TestRunner class, the Command Gateway dispatches the issueCardCommand to it's CommandHandler using the command bus, which is then creating a new instance of the GiftCard aggregate. In this CommandHandler we can perform any logic, and then we use this apply method.

  2. The apply(event) method is used to publish the CardIssuedEvent as an EventMessage within the scope of the GiftCard aggregate and it also invokes the EventSourcingHandler for that particular event, so in this case onCardIssuedEvent. It publishes the EventMessage to the EventBus and is sent to EventHandlers.

  3. In the @EventSourcingHandler onCardIssuedEvent, we can make any state changes to the GiftCard aggregate and we are also persisting the event to the DOMAIN_EVENT_ENTRY table using spring Jpa.

  4. Once this CommandHandler is finished executing, the aggregate object doesn't exist anymore.

  5. Now in my TestRunner class again, the Command Gateway dispatches the RedeemCardCommand to its CommandHandler and as the first command no longer exists, the empty no args constructor is used to create the objects. The axon framework retrieves all events from this DOMAIN_EVENT_ENTRY table and it replays all the events (EventSourcingHandlers) for the GiftCard aggregate instance in order to get it's current state (which is why the @AggregateIdentifier is important).

  6. The RedeemCardCommandHandler method is then executed, it performs any logic and applies the event, which is published within the aggregate and it invokes it's EventSourcingHandler. This EventSourcingHandler then updates the state of the GiftCard aggregate / persists to the DOMAIN_EVENT_ENTRYtable.

Is my understanding of how the event sourcing works correct?

Clymer answered 7/1, 2021 at 13:52 Comment(0)
T
8

let me try to guide you through this journey!

Your understanding is almost completely correct. I would just like to split it into Event Sourcing and Axon for a better understanding.

  • Event Sourcing

In short, Event Sourcing is a way of storing an application’s state through the history of events that have happened in the past. Keep in mind that you have other patterns as well, for example State Stored Aggregates. In your example you are using Event Sourcing hence why @EventSourcingHandlers are in place. Now to the next topic.

  • Axon

I recommend you reading this awesome blog by one of our colleagues, specially the slide decks included on it, and you will see the journey of a message inside Axon Framework!

Now to your points, there are things I want to clarify upfront:

  1. Correct, you dispatch a Command and since the method annotated is a constructor, it will create an Aggregate for you. CommandHanlders are the right place for business logic and validations.
  2. Here the apply will publish the message internally (to this Aggregate but also to their Entities/AggregateMembers) and later on to the EventBus. From the javadoc:

The event should be applied to the aggregate immediately and scheduled for publication to other event handlers.

  1. Since we are talking about Event Sourcing, all the EventSourcingHandlers will be called and the state of the Aggregate modified/updated. This is important because, as stated before, this is how you reconstruct your Aggregate state whenever needed. But the persisting of the Event does not occur here, it was already scheduled to happen when this process is done.

  2. Correct.

  3. Also correct. This has to do with Event Sourcing in general and how it reconstructs your Aggregate.

  4. Also correct with the observations made on point 3, regarding when the Event is published/saved/persisted.


Another small side note on your code: you are doing this on your @CommandHandler

@CommandHandler
public void redeemCardCommandHandler(RedeemCardCommand cmd) {
    log.info("handling {}", cmd);
    this.amount -= cmd.getAmount(); // you should not do this here
    apply(new CardRedeemedEvent(cmd.getCardId(), cmd.getTransactionId(), this.amount));
}

This state change should be made in an @EventSourcingHandler. On the @CommandHandler you should only do validation if this Aggregate has enough 'money' to be redeemed :)

Thirdrate answered 11/1, 2021 at 10:5 Comment(3)
Thanks @Lucas Campos , that has helped to make things clearer. I will check out that blog from your colleague also.Clymer
On #4: why go to the trouble of rebuilding the state only to throw it away? The queries (in most examples) don't use that state, but use state maintained in their own (projection) databases. So is this ephemeral, event-sourced aggregate instance only useful for validating commands before firing state change events? Or is there another purpose?Cerebellum
Yes, it makes sure your state is up to date when handling commands! This is what Event Sourcing is all about... you can improve performance, when needed, introducing both Snapshots and Caches! Do you want to do it beforehand? I would say no but, as always, it depends =)Thirdrate
R
2

yes, it looks like you are using the command gateway correctly, and the aggregates have the right methods annotated with the right stuff! You are using the Saga design pattern for Distributed Transactions which is much better than a 2PC (Two-phase commit), so each step calling the next step is the right way of doing it.

As long as the starting saga method is annotated with @StartSaga, and the end one is annotated with @EndSaga, then the steps will work in order, as well as rollback should work properly

The saveAndWait function actually returns a CompletableFuture, so if you wanted to, you can step through the threads in debug mode and pause the thread until the whole saga is complete if that's what you'd like to do.

My only concern with is is - are you using the Axon Server as the Event Sourcer, or another Event Bus or sourcer that Axon Supports? The only problem with the Standard Axon Server is that it's a Free Tier product, and they are offering a better, more supported version like Axon Server Enterprise (which offers more features - better for infrastructure) - so if you are doing this for a company, then they might be willing to pay for the extra support in higher tier...

Axon Framework does actually support Spring AMQP, which is useful because then, you can have better control over your own infrastructure rather than being tied down to a paid-for Axon Server - which I don't is actually good or not.

If you can get it to work with a custom Event Bus/Sourcer, then your system will be very good - in combination with the ease of developing on Axon.

As long as the Aggregates have the right annotations on them, and you command gateway is autowired correctly, then everything should work fine. My only worry is that there isn't yet enough support for Axon Framework yet...

Recent answered 10/1, 2021 at 14:59 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.