How to override MassTransit default exchange and queue topology convention?
Asked Answered
Y

2

5

As pointed out [in one of my questions on SO] (Why a simple configuration in MassTransit creates 2 queues and 3 exchanges?), MassTransit for RabbitMQ creates automatically a certain number of queues and exchange for a given simple configuration:

Exchanges, all fanouts:

  • ConsoleApp1:Program-YourMessage: Durable
  • VP0003748_dotnet_bus_6n9oyyfzxhyx9ybobdmpj8qeyt: Auto-delete and Durable?
  • test_queue: Durable

Queues:

  • VP0003748_dotnet_bus_6n9oyyfzxhyx9ybobdmpj8qeyt: x-expire 60000
  • test_queue: Durable

However, I found it a bit frustration to not be able to override the naming of those exchanges and queues. Is there anything I can do to change that?

For example, if you refactor some type or a namespace you may endup polluting your RabbitMQ instance with tons of exchanges that are no longer used =/

I understand test_queue cause this is something I decided so fair enough. Types are easily subject to changes / refactoring.

Yseulte answered 10/5, 2019 at 9:30 Comment(4)
You might read the documentation: masstransit-project.com/MassTransit/advanced/topology/… and masstransit-project.com/MassTransit/advanced/topology/… and masstransit-project.com/MassTransit/advanced/topology/rabbitmqAs
@Chris Patterson the documentation does not say how to change the temporary exchanges / queues e.g. VP0003748_dotnet_bus_7qyyyyrfxhbykj9dbdmpks5xd5 VP0003748_dotnet_bus_7qyyyyrfxhbykj9dbdmpks5xd5Yseulte
github.com/MassTransit/MassTransit/blob/develop/src/…As
github.com/MassTransit/MassTransit/blob/develop/src/Transports/… Updated link to method.As
C
7

This is a simple and effective way to do it: https://bartwullems.blogspot.com/2018/09/masstransitchange-exchange-naming.html

But better to drop some dotnet core code here, to help anyone that's starting.

Our configuration based custom formatter:

public class BusEnvironmentNameFormatter : IEntityNameFormatter
{
    private readonly IEntityNameFormatter _original;
    private readonly string _prefix;

    public BusEnvironmentNameFormatter(IEntityNameFormatter original, SomeAppSettingsSection busSettings)
    {
        _original = original;
        _prefix = string.IsNullOrWhiteSpace(busSettings.Environment)
            ? string.Empty // no prefix
            : $"{busSettings.Environment}:"; // custom prefix
    }

    // Used to rename the exchanges
    public string FormatEntityName<T>()
    {
        var original = _original.FormatEntityName<T>();
        return Format(original);
    }

    // Use this one to rename the queue
    public string Format(string original)
    {
        return string.IsNullOrWhiteSpace(_prefix)
            ? original
            : $"{_prefix}{original}";
    }
}

Then to use it, we would do something like this:

var busSettings = busConfigSection.Get<SomeAppSettingsSection>();
var rabbitMqSettings = rabbitMqConfigSection.Get<SomeOtherAppSettingsSection>();

services.AddMassTransit(scConfig =>
{
    scConfig.AddConsumers(consumerAssemblies);

    scConfig.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(rmqConfig =>
    {
        rmqConfig.UseExtensionsLogging(provider.GetRequiredService<ILoggerFactory>());

        // Force serialization of default values: null, false, etc
        rmqConfig.ConfigureJsonSerializer(jsonSettings =>
        {
            jsonSettings.DefaultValueHandling = DefaultValueHandling.Include;
            return jsonSettings;
        });

        var nameFormatter = new BusEnvironmentNameFormatter(rmqConfig.MessageTopology.EntityNameFormatter, busSettings);
        var host = rmqConfig.Host(new Uri(rabbitMqSettings.ConnectionString), hostConfig =>
        {
            hostConfig.Username(rabbitMqSettings.Username);
            hostConfig.Password(rabbitMqSettings.Password);
        });

        // Endpoint with custom naming
        rmqConfig.ReceiveEndpoint(host, nameFormatter.Format(busSettings.Endpoint), epConfig =>
        {
            epConfig.PrefetchCount = busSettings.MessagePrefetchCount;
            epConfig.UseMessageRetry(x => x.Interval(busSettings.MessageRetryCount, busSettings.MessageRetryInterval));
            epConfig.UseInMemoryOutbox();

            //TODO: Bind messages to this queue/endpoint
            epConfig.MapMessagesToConsumers(provider, busSettings);
        });

        // Custom naming for exchanges
        rmqConfig.MessageTopology.SetEntityNameFormatter(nameFormatter);
    }));
});
Cindicindie answered 13/9, 2019 at 12:16 Comment(0)
P
1

Name of the queue can be changed by using OverrideDefaultBusEndpointQueueName method of IRabbitMqBusFactoryConfigurator in the following way

var bus = Bus.Factory.CreateUsingRabbitMq(sbc =>
{
    sbc.Host("rabbitmq://localhost/");

    sbc.OverrideDefaultBusEndpointQueueName("endpoint");
});
Poynter answered 8/2, 2020 at 12:10 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.