RabbitMQ seems to have two properties that are very similar, and I don't entirely understand the difference. ConversationId
and CorrelationId
.
My use case is as follows. I have a website that generates a Guid
. The website calls an API, adding that unique identifier to the HttpRequest
headers. This in turn publishes a message to RabbitMQ. That message is processed by the first consumer and passed off elsewhere to another consumer, and so on.
For logging purposes I want to log an identifier that ties the initial request together with all of the subsequent actions. This should be unique for that journey throughout the different parts of the application. Hence. When logged to something like Serilog/ElasticSearch, this then becomes easy to see which request triggered the initial request, and all of the log entries for that request throughout the application can be correlated together.
I have created a provider that looks at the incoming HttpRequest
for an identifier. I've called this a "CorrelationId", but I'm starting to wonder if this should really be named a "ConversationId". In terms of RabbitMQ, does the idea of a "ConversationId" fit better to this model, or is "CorrelationId" better?
What is the difference between the two concepts?
In terms of code, I've looked to do the following. Firstly register the bus in my API and configure the SendPublish
to use the CorrelationId
from the provider.
// bus registration in the API
var busSettings = context.Resolve<BusSettings>();
// using AspNetCoreCorrelationIdProvider
var correlationIdProvider = context.Resolve<ICorrelationIdProvider>();
var busControl = Bus.Factory.CreateUsingRabbitMq(cfg =>
{
cfg.Host(
new Uri(busSettings.HostAddress),
h =>
{
h.Username(busSettings.Username);
h.Password(busSettings.Password);
});
cfg.ConfigurePublish(x => x.UseSendExecute(sendContext =>
{
// which one is more appropriate
//sendContext.ConversationId = correlationIdProvider.GetCorrelationId();
sendContext.CorrelationId = correlationIdProvider.GetCorrelationId();
}));
});
For reference, this is my simple provider interface
// define the interface
public interface ICorrelationIdProvider
{
Guid GetCorrelationId();
}
And the AspNetCore implementation, which extracts the unique ID set by the calling client (i.e. a website).
public class AspNetCoreCorrelationIdProvider : ICorrelationIdProvider
{
private IHttpContextAccessor _httpContextAccessor;
public AspNetCoreCorrelationIdProvider(IHttpContextAccessor httpContextAccessor)
{
_httpContextAccessor = httpContextAccessor;
}
public Guid GetCorrelationId()
{
if (_httpContextAccessor.HttpContext.Request.Headers.TryGetValue("correlation-Id", out StringValues headers))
{
var header = headers.FirstOrDefault();
if (Guid.TryParse(header, out Guid headerCorrelationId))
{
return headerCorrelationId;
}
}
return Guid.NewGuid();
}
}
Finally, my Service hosts are simple windows service applications that sit and consume published messages. They use the following to grab the CorrelationId and might well publish to other consumers as well in other service hosts.
public class MessageContextCorrelationIdProvider : ICorrelationIdProvider
{
/// <summary>
/// The consume context
/// </summary>
private readonly ConsumeContext _consumeContext;
/// <summary>
/// Initializes a new instance of the <see cref="MessageContextCorrelationIdProvider"/> class.
/// </summary>
/// <param name="consumeContext">The consume context.</param>
public MessageContextCorrelationIdProvider(ConsumeContext consumeContext)
{
_consumeContext = consumeContext;
}
/// <summary>
/// Gets the correlation identifier.
/// </summary>
/// <returns></returns>
public Guid GetCorrelationId()
{
// correlationid or conversationIs?
if (_consumeContext.CorrelationId.HasValue && _consumeContext.CorrelationId != Guid.Empty)
{
return _consumeContext.CorrelationId.Value;
}
return Guid.NewGuid();
}
}
I then have a logger in my consumer that uses that provider to extract the CorrelationId
:
public async Task Consume(ConsumeContext<IMyEvent> context)
{
var correlationId = _correlationProvider.GetCorrelationId();
_logger.Info(correlationId, $"#### IMyEvent received for customer:{context.Message.CustomerId}");
try
{
await _mediator.Send(new SomeOtherRequest(correlationId) { SomeObject: context.Message.SomeObject });
}
catch (Exception e)
{
_logger.Exception(e, correlationId, $"Exception:{e}");
throw;
}
_logger.Info(correlationId, $"Finished processing: {DateTime.Now}");
}
Reading the docs, it says the following about a "ConversationId":
The conversation is created by the first message that is sent or published, in which no existing context is available (such as when a message is sent or published by using IBus.Send or IBus.Publish). If an existing context is used to send or publish a message, the ConversationId is copied to the new message, ensuring that a set of messages within the same conversation have the same identifier.
Now I'm starting to think that I've got my terminology mixed up, and technically this is a conversation (although the 'conversation' is like 'the telephone game').
So, CorrelationId
in this use case, or ConversationId
? Please help me get my terminology right!!
ConversationId
is limited to a single message sequence, where you get one message and it goes through several consumers.CorrelationId
is more long-living, you can have multiple conversations with one correlation id. – Pippin