Implementing sagas with Kafka
Asked Answered
V

3

12

I am using Kafka for Event Sourcing and I am interested in implementing sagas using Kafka.

Any best practices on how to do this? The Commander pattern mentioned here seems close to the architecture I am trying to build but sagas are not mentioned anywhere in the presentation.

Vo answered 8/5, 2017 at 10:21 Comment(0)
H
7

This talk from this year's DDD eXchange is the best resource I came across wrt Process Manager/Saga pattern in event-driven/CQRS systems: https://skillsmatter.com/skillscasts/9853-long-running-processes-in-ddd (requires registering for a free account to view)

The demo shown there lives on github: https://github.com/flowing/flowing-retail

I've given it a spin and I quite like it. I do recommend watching the video first to set the stage.

Although the approach shown is message-bus agnostic, the demo uses Kafka for the Process Manager to send commands to and listen to events from other bounded contexts. It does not use Kafka Streams but I don't see why it couldn't be plugged into a Kafka Streams topology and become part of the broader architecture like the one depicted in the Commander presentation you referenced.

I hope to investigate this further for our own needs, so please feel free to start a thread on the Kafka users mailing list, that's a good place to collaborate on such patterns.

Hope that helps :-)

Humane answered 10/5, 2017 at 14:36 Comment(0)
P
4

I would like to add something here about sagas and Kafka.

In general


In general Kafka is a tad different than a normal queue. It's especially good in scaling. And this actually can cause some complications.

One of the means to accomplish scaling, Kafka uses partitioning of the data stream. Data is placed in partitions, which can be consumed at its own rate, independent of the other partitions of the same topic. Here is some info on it: how-choose-number-topics-partitions-kafka-cluster. I'll come back on why this is important.

The most common ways to ensure the order within Kafka are:

  • Use 1 partition for the topic
  • Use a partition message key to "assign" the message to a topic

In both scenarios your chronologically dependent messages need to stream through the same topic.

Also, as @pranjal thakur points out, make sure the delivery method is set to "exactly once", which has a performance impact but ensures you will not receive the messages multiple times.

The caveat


Now, here's the caveat: When changing the amount of partitions the message distribution over the partitions (when using a key) will be changed as well.

In normal conditions this can be handled easily. But if you have a high traffic situation, the migration toward a different number of partitions can result in a moment in time in which a saga-"flow" is handled over multiple partitions and the order is not guaranteed at that point.

It's up to you whether this will be an issue in your scenario.

Here are some questions you can ask to determine if this applies to your system:

  • What will happen if you somehow need to migrate/copy data to a new system, using Kafka?
    (high traffic scenario)
  • Can you send your data to 1 topic?
  • What will happen after a temporary outage of your saga service?
    (low availability scenario/high traffic scenario)
  • What will happen when you need to replay a bunch of messages?
    (high traffic scenario)
  • What will happen if we need to increase the partitions?
    (high traffic scenario/outage & recovery scenario)

The alternative


If you're thinking of setting up a saga, based on steps, like a state machine, I would challenge you to rethink your design a bit.

I'll give an example:

Lets consider a booking-a-hotel-room process:

Simplified, it might consist of the following steps:

  • Handle room reserved (incoming event)
  • Handle room payed (incoming event)
  • Send acknowledgement of the booking (after payed and some processing)

Now, if your saga is not able to handle the payment if the reservation hasn't come in yet, then you are relying on the order of events.

In this case you should ask yourself: when will this break?


If you conclude you want to avoid the chronological dependency; consider a system without a saga, or a saga which does not depend on the order of events - i.e.: accepting all messages, even when it's not their turn yet in the process.

Some examples:

  • aggregators
  • Modeled as business process: parallel gateways (parallel process flows)

Do note in such a setup it is even more crucial that every action has got an implemented compensating action (rollback action).

I know this is often hard to accomplish; but, if you start small, you might start to like it :-)

Pooler answered 7/11, 2019 at 11:54 Comment(4)
I’m not familiar with Kafka, but I don’t think the example is valid in systems I’ve used. You’d have a data store that retains the state of the saga. When related events are raised, your saga would be triggered in response, then check your data store for current state. It’ll eventually be triggered at the appropriate time.Byssus
That is possible, and possibly your systems where just well designed. The thing I am stating is, that if your logic relies on a statemachine which is depending on the order of events for validation transitions to different states, then, when using Kafka you'll run into some serious issuesPooler
Kafka delivers messages in correct order if they come in the same partition, and all the messages having same key go to the same partition. In this way if you use same key ( I am not sure if this is a general practice, but we always have a correlation id for tracking in our sagas) your messages should get delivered in correct order for a particular transaction. I get your point that carelessly using Kafka without having good understanding about it could lead to problems, but from it would be far more work to consider other queues if you are already using kafka and kafka-streams.Saltant
@Pooler This look much better now :) I would also like to add one more point in challenges for using Kafka in Saga which is we should configure Kafka to process messages "exactly once" (default config for Kafka is processing message "atleast once"). In our system it was already configured that way which reduces performance a bit, but if your system doesn't have it configured that way, you need to take a decision whether the fall in performance is really worth it.Saltant
C
0

There is an example of Saga on Rebus.Kafka! The saga looks like this:

public class TestSaga : IdempotentSaga<TestSagaData>,
                        IAmInitiatedBy<KickoffSagaMessages>,
                        IHandleMessages<SagaMessageEarth>,
                        IHandleMessages<SagaMessageWind>,
                        IHandleMessages<SagaMessageFire>,
                        IHandleMessages<KickoffSagaMessages>,
                        IHandleMessages<IFailed<ISagaMessage>>
{
    public async Task Handle(KickoffSagaMessages message)
    {
        _logger.Info("Processing Kickoff - {id}", Data.SagaInstanceId);
        Data.SagaInstanceId = message.SagaInstanceId;
        Data.StuffDone += "Initiated;";
        await _bus.Publish(new SagaMessageEarth()
        {
            SagaInstanceId = message.SagaInstanceId
        });
        _logger.Info("Published Earth....Done Processing Kickoff - {id}", Data.SagaInstanceId);
    }

    public async Task Handle(SagaMessageEarth message)
    {
        try
        {
            if (!Data.Task1Processed)
            {
                _logger.Info("Processing Earth - {id}", Data.SagaInstanceId);
                Data.StuffDone += "Earth;";
                Data.Task1Processed = true;
            }
            await _bus.Publish(new SagaMessageWind()
            {
                SagaInstanceId = message.SagaInstanceId
            });
            PossiblyDone();
            _logger.Info("Published Wind...Done Processing Earth - {id}", Data.SagaInstanceId);
        }
        catch (Exception e)
        {
            _logger.Error(e, "WHAT Earth? - {id}", Data.SagaInstanceId);
            throw;
        }
    }

    public async Task Handle(SagaMessageWind message)
    {
        try
        {
            if (!Data.Task2Processed)
            {
                _logger.Info("Processing Wind - {id}", Data.SagaInstanceId);
                Data.StuffDone += "Wind;";
                Data.Task2Processed = true;
            }
            await _bus.Publish(new SagaMessageFire()
            {
                SagaInstanceId = message.SagaInstanceId
            });
            PossiblyDone();

            _logger.Info("Published Fire...Done Processing Wind - {id}", Data.SagaInstanceId);
        }
        catch (Exception e)
        {
            _logger.Error(e, "WHAT Wind? - {id}", Data.SagaInstanceId);
            throw;
        }
    }

    public Task Handle(SagaMessageFire message)
    {
        try
        {
            if (!Data.Task3Processed)
            {
                _logger.Info("Processing Fire - {id}", Data.SagaInstanceId);
                Data.StuffDone += "Fire;";
                Data.Task3Processed = true;
            }
            PossiblyDone();
            _logger.Info("Done Processing Fire - {id}", Data.SagaInstanceId);
        }
        catch (Exception e)
        {
            _logger.Error(e, "WHAT Fire? - {id}", Data.SagaInstanceId);
            throw;
        }
        return Task.CompletedTask;
    }

    private void PossiblyDone()
    {
        if (Data.Task1Processed && Data.Task2Processed && Data.Task3Processed)
        {
            _logger.Info("Completed everything for {id}: {msg}", Data.SagaInstanceId, Data.StuffDone);
            MarkAsComplete();
        }
        else
        {
            _logger.Info("NOT Completed everything for {id}: {task1},{task2},{task3}", Data.SagaInstanceId, Data.Task1Processed, Data.Task2Processed, Data.Task3Processed);
        }
    }

    public Task Handle(IFailed<ISagaMessage> message)
    {
        _logger.Error("Unable to handle the message of type {msgtype} with error message {errMsg}", message.Message.GetType().Name, message.ErrorDescription);
        return Task.CompletedTask;
    }

    protected override void CorrelateMessages(ICorrelationConfig<TestSagaData> config)
    {
        config.Correlate<KickoffSagaMessages>(m => m.SagaInstanceId, d => d.SagaInstanceId);
        config.Correlate<SagaMessageFire>(m => m.SagaInstanceId, d => d.SagaInstanceId);
        config.Correlate<SagaMessageEarth>(m => m.SagaInstanceId, d => d.SagaInstanceId);
        config.Correlate<SagaMessageWind>(m => m.SagaInstanceId, d => d.SagaInstanceId);
        config.Correlate<IFailed<ISagaMessage>>(m => m.Message.SagaInstanceId, d => d.SagaInstanceId);
    }

    protected override Task ResolveConflict(TestSagaData otherSagaData)
    {
        Data.Task1Processed = Data.Task1Processed || otherSagaData.Task1Processed;
        Data.Task2Processed = Data.Task2Processed || otherSagaData.Task2Processed;
        Data.Task3Processed = Data.Task3Processed || otherSagaData.Task3Processed;
        return Task.CompletedTask;
    }

    private IBus _bus;
    private ILog _logger;
    public TestSaga(IBus bus, ConsoleLoggerFactory loggerFactory)
    {
        _bus = bus ?? throw new ArgumentNullException();
        _logger = loggerFactory.GetLogger<TestSaga>();
    }
}
Cycling answered 7/2, 2024 at 18:28 Comment(0)

© 2022 - 2025 — McMap. All rights reserved.