RabbitMq - ConversationId vs CorrelationId - Which is the more appropriate for tracking a specific request?
Asked Answered
Z

1

9

RabbitMQ seems to have two properties that are very similar, and I don't entirely understand the difference. ConversationId and CorrelationId.

My use case is as follows. I have a website that generates a Guid. The website calls an API, adding that unique identifier to the HttpRequest headers. This in turn publishes a message to RabbitMQ. That message is processed by the first consumer and passed off elsewhere to another consumer, and so on.

For logging purposes I want to log an identifier that ties the initial request together with all of the subsequent actions. This should be unique for that journey throughout the different parts of the application. Hence. When logged to something like Serilog/ElasticSearch, this then becomes easy to see which request triggered the initial request, and all of the log entries for that request throughout the application can be correlated together.

I have created a provider that looks at the incoming HttpRequest for an identifier. I've called this a "CorrelationId", but I'm starting to wonder if this should really be named a "ConversationId". In terms of RabbitMQ, does the idea of a "ConversationId" fit better to this model, or is "CorrelationId" better?

What is the difference between the two concepts?

In terms of code, I've looked to do the following. Firstly register the bus in my API and configure the SendPublish to use the CorrelationId from the provider.

// bus registration in the API
var busSettings = context.Resolve<BusSettings>();
// using AspNetCoreCorrelationIdProvider
var correlationIdProvider = context.Resolve<ICorrelationIdProvider>();

var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
    cfg.Host(
        new Uri(busSettings.HostAddress),
        h =>
        {
            h.Username(busSettings.Username);
            h.Password(busSettings.Password);
        });
    cfg.ConfigurePublish(x => x.UseSendExecute(sendContext =>
    {
        // which one is more appropriate
        //sendContext.ConversationId = correlationIdProvider.GetCorrelationId();
        sendContext.CorrelationId = correlationIdProvider.GetCorrelationId();
    }));
});

For reference, this is my simple provider interface

// define the interface
public interface ICorrelationIdProvider
{
    Guid GetCorrelationId();
}

And the AspNetCore implementation, which extracts the unique ID set by the calling client (i.e. a website).

public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider
{
    private IHttpContextAccessor _httpContextAccessor;

    public AspNetCoreCorrelationIdProvider(IHttpContextAccessor httpContextAccessor)
    {
        _httpContextAccessor = httpContextAccessor;
    }

    public Guid GetCorrelationId()
    {
        if (_httpContextAccessor.HttpContext.Request.Headers.TryGetValue("correlation-Id", out StringValues headers))
        {
            var header = headers.FirstOrDefault();
            if (Guid.TryParse(header, out Guid headerCorrelationId))
            {
                return headerCorrelationId;
            }
        }

        return Guid.NewGuid();
    }
}

Finally, my Service hosts are simple windows service applications that sit and consume published messages. They use the following to grab the CorrelationId and might well publish to other consumers as well in other service hosts.

public class MessageContextCorrelationIdProvider : ICorrelationIdProvider
{
    /// <summary>
    /// The consume context
    /// </summary>
    private readonly ConsumeContext _consumeContext;

    /// <summary>
    /// Initializes a new instance of the <see cref="MessageContextCorrelationIdProvider"/> class.
    /// </summary>
    /// <param name="consumeContext">The consume context.</param>
    public MessageContextCorrelationIdProvider(ConsumeContext consumeContext)
    {
        _consumeContext = consumeContext;
    }

    /// <summary>
    /// Gets the correlation identifier.
    /// </summary>
    /// <returns></returns>
    public Guid GetCorrelationId()
    {
        // correlationid or conversationIs?
        if (_consumeContext.CorrelationId.HasValue && _consumeContext.CorrelationId != Guid.Empty)
        {
            return _consumeContext.CorrelationId.Value;
        }

        return Guid.NewGuid();
    }
}

I then have a logger in my consumer that uses that provider to extract the CorrelationId:

public async Task Consume(ConsumeContext<IMyEvent> context)
{
    var correlationId = _correlationProvider.GetCorrelationId();
    _logger.Info(correlationId, $"#### IMyEvent received for customer:{context.Message.CustomerId}");

    try
    {
        await _mediator.Send(new SomeOtherRequest(correlationId) { SomeObject: context.Message.SomeObject });
    }
    catch (Exception e)
    {
        _logger.Exception(e, correlationId, $"Exception:{e}");
        throw;
    }

    _logger.Info(correlationId, $"Finished processing: {DateTime.Now}");
}

Reading the docs, it says the following about a "ConversationId":

The conversation is created by the first message that is sent or published, in which no existing context is available (such as when a message is sent or published by using IBus.Send or IBus.Publish). If an existing context is used to send or publish a message, the ConversationId is copied to the new message, ensuring that a set of messages within the same conversation have the same identifier.

Now I'm starting to think that I've got my terminology mixed up, and technically this is a conversation (although the 'conversation' is like 'the telephone game').

So, CorrelationId in this use case, or ConversationId? Please help me get my terminology right!!

Zoellick answered 21/3, 2019 at 16:42 Comment(7)
It seems ConversationId is a MassTransit/NServiceBus thing? I don't find any mention in the RabbitMQ or AMQP docs. CorrelationId is a specific AMQP thing. The spec says "no formal behaviour but may hold the name of a private response queue, when used in request messages", and it's commonly used for the response queue name when doing an RPC request. So I think your use-case is ConversationId, not CorrelationId.Haim
ConversationId is limited to a single message sequence, where you get one message and it goes through several consumers. CorrelationId is more long-living, you can have multiple conversations with one correlation id.Pippin
In addition to that, the conversation id is auto-generated and the correlation id is something arbitrary, which you can take from your domain. For example, in our ShoppingCart saga we use order id as the correlation id.Pippin
I stand corrected on ConversationId. Of course you can add your own header, if you want to keep CorrelationId for its normal purpose of doing RPC.Haim
When I ran into this I just decided to use a term from my business domain. It ended up just being called lineId and I didn't use the header. I just used my own custom correlationId which helped because different rabbitmq clients can decide do different things with those headers. I also could include multiple lineids per message if I wanted to be less chatty.Counterproposal
Suggest reading the docs: masstransit-project.com/MassTransit/usage/…Fascista
@ChrisPatterson I literally quoted the docs and the difference between the two aren't clear! That's why I'm asking the question!!Zoellick
F
12

In a message conversation (cue foreboding musical score), there can be a single message (I told you to do something, or I told everyone who is listening that something happened) or multiple messages (I told you to do something, and you told someone else, or I told everyone who is listening that something happened and those listeners told their friends, and so on, and so on).

Using MassTransit, from the first message to the final message, used properly, every single one of those messages would have the same ConversationId. MassTransit copies the property from ConsumeContext, unmodified, to every outgoing message during the consumption of a message. This makes everything part of the same trace - a conversation.

The CorrelationId, however, is not set by default by MassTransit. It can be automatically set if a message property is named CorrelationId (or CommandId, or EventId), or you can add your own names too.

If the CorrelationId is present on a consumed message, any outgoing messages will have that CorrelationId property copied to the InitiatorId property (cause, and effect -- the consumed message initiated the creation of the subsequent messages). This forms a chain (or span, in trace terminology) that can be followed to show the spread of messages from the initial message.

The CorrelationId should be thought of as the identifier for a command or event, such that the effects of that command can be seen throughout the system logs.

It sounds to me like your input from HTTP might be the Initiator, and thus copy that identifier into InitiatorId and create a new CorrelationId for the message, or you may want to just use the same identifier for the initial CorrelationId and let the subsequent messages use it as the initiator.

Fascista answered 22/3, 2019 at 12:56 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.