MassTransit - Can Multiple Consumers All Receive Same Message?
Asked Answered
L

6

23

I have one .NET 4.5.2 Service Publishing messages to RabbitMq via MassTransit.

And multiple instances of a .NET Core 2.1 Service Consuming those messages.

At the moment competing instances of the .NET core consumer service steal messages from the others.

i.e. The first one to consume the message takes it off the queue and the rest of the service instances don't get to consume it.

I want ALL instances to consume the same message.

How can I achieve this?

Publisher Service is configured as follows:

 builder.Register(context =>
            {
                MessageCorrelation.UseCorrelationId<MyWrapper>(x => x.CorrelationId);

                return Bus.Factory.CreateUsingRabbitMq(configurator =>
                {
                    configurator.Host(new Uri("rabbitmq://localhost:5671"), host =>
                    {
                        host.Username(***);
                        host.Password(***);
                    });
                    configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
                    configurator.Publish<MyWrapper>(x =>
                    {
                        x.AutoDelete = true;
                        x.Durable = true;
                        x.ExchangeType = true;
                    });

                });
            })
            .As<IBusControl>()
            .As<IBus>()
            .SingleInstance();

And the .NET Core Consumer Services are configured as follows:

        serviceCollection.AddScoped<MyWrapperConsumer>();

        serviceCollection.AddMassTransit(serviceConfigurator =>
        {
            serviceConfigurator.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(cfg =>
            {
                var host = cfg.Host(new Uri("rabbitmq://localhost:5671"), hostConfigurator =>
                {
                    hostConfigurator.Username(***);
                    hostConfigurator.Password(***);

                });
                cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
                {
                    exchangeConfigurator.AutoDelete = true;
                    exchangeConfigurator.Durable = true;
                    exchangeConfigurator.ExchangeType = "topic";
                    exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
                });
            }));
        });
        serviceCollection.AddSingleton<IHostedService, BusService>();

And then MyWrapperConsumer looks like this:

public class MyWrapperConsumer :
    IConsumer<MyWrapper>
{
    .
    .

    public MyWrapperConsumer(...) => (..) = (..);

    public async Task Consume(ConsumeContext<MyWrapper> context)
    {
        //Do Stuff 
    }
}
Lazarolazaruk answered 25/7, 2019 at 20:36 Comment(0)
L
7

Thanks to the Answer from Chris Patterson and the comment from Alexey Zimarev I now believe I have this working.

The guys pointed out (from my understanding, correct me if I am wrong) that I should get rid of specifying the Exchanges and Queues etc myself and stop being so granular with my configuration.

And let MassTransit do the work in knowing which exchange to create & publish to, and which queues to create and bind to that exchange based on my type MyWrapper. And my IConsumerimplementation type MyWrapperConsumer.

Then giving each consumer service its own unique ReceiveEndpoint name we will end up with the exchange fanning out messages of type MyWrapper to each unique queue which gets created by the unique names specified.

So, in my case..

THE PUBLISHER SERVICE config relevant lines of code changed FROM:

    configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
            configurator.Publish<MyWrapper>(x =>
            {
                x.AutoDelete = true;
                x.Durable = true;
                x.ExchangeType = true;
            });

TO THIS

       configurator.Message<MyWrapper>(x => { });
       configurator.AutoDelete = true;

AND EACH CONSUMERS SERVICE instance config relevant lines of code changed FROM:

        cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
                {
                    exchangeConfigurator.AutoDelete = true;
                    exchangeConfigurator.Durable = true;
                    exchangeConfigurator.ExchangeType = "topic";
                    exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
                });

TO THIS:

        cfg.ReceiveEndpoint(host, Environment.MachineName, queueConfigurator =>
                {
                    queueConfigurator.AutoDelete = true;
                    queueConfigurator.Consumer<MyWrapperConsumer>(provider);
                });

Note, the Environment.MachineName gives the unique queue name for each instance

Lazarolazaruk answered 26/7, 2019 at 13:21 Comment(0)
I
18

It sounds like you want to publish messages and have multiple consumer service instances receive them. In that case, each service instance needs to have its own queue. That way, every published message will result in a copy being delivered to each queue. Then, each receive endpoint will read that message from its own queue and consume it.

All that excessive configuration you're doing is going against what you want. To make it work, remove all that exchange type configuration, and just configure each service instance with a unique queue name (you can generate it from host, machine, whatever) and just call Publish on the message producer.

You can see how RabbitMQ topology is configured: https://masstransit-project.com/advanced/topology/rabbitmq.html

Insensate answered 25/7, 2019 at 21:29 Comment(5)
Thanks for the reply. I am having a hard time getting this, which is my bad. Newbie. I have tried what i think you are saying by changing my consuming service receive endpoint to the following. But no luck. I would be very grateful if you could answer with a code example of the config changes i should make please? Cant figure out how to bind each queue to the publisher exchange. Thank you cfg.ReceiveEndpoint(host, "my.exchange.123", exchangeConfigurator => { exchangeConfigurator.Consumer<MyWrapperConsumer>(provider); });Lazarolazaruk
Forget exchanges for a moment, Each service that has a different code for to consume that message should have a different endpoint name and it will get its own message. Endpoints that have the same name will be competing consumers in one queue. All those patterns are described in the Enterprise Integration Patterns boom and that specific case is mentioned in MT docs in the Common Gotchas section.Time
Thanks Alexey. I think I need a concrete code example posted here. I've read the common gotchas but can't translate it into code. When I change the config of the consumer service to what I posted in my previous comment the consumer stops consuming.Lazarolazaruk
Gents, i have upvoted your answers & comments. And will mark my own answer as the answer because it contains the specific code. (any issues with that let me know) Thank you for the advise, took me a while but i got there with your help.Lazarolazaruk
@chris-patterson I guess I misunderstood the difference between publish and send then still while I thought I got it :) ... I thought publish was to send the message to all your consumers where as send would send it to only one of them. But, what is the difference then?Lieutenant
L
7

Thanks to the Answer from Chris Patterson and the comment from Alexey Zimarev I now believe I have this working.

The guys pointed out (from my understanding, correct me if I am wrong) that I should get rid of specifying the Exchanges and Queues etc myself and stop being so granular with my configuration.

And let MassTransit do the work in knowing which exchange to create & publish to, and which queues to create and bind to that exchange based on my type MyWrapper. And my IConsumerimplementation type MyWrapperConsumer.

Then giving each consumer service its own unique ReceiveEndpoint name we will end up with the exchange fanning out messages of type MyWrapper to each unique queue which gets created by the unique names specified.

So, in my case..

THE PUBLISHER SERVICE config relevant lines of code changed FROM:

    configurator.Message<MyWrapper>(x => { x.SetEntityName("my.exchange"); });
            configurator.Publish<MyWrapper>(x =>
            {
                x.AutoDelete = true;
                x.Durable = true;
                x.ExchangeType = true;
            });

TO THIS

       configurator.Message<MyWrapper>(x => { });
       configurator.AutoDelete = true;

AND EACH CONSUMERS SERVICE instance config relevant lines of code changed FROM:

        cfg.ReceiveEndpoint(host, "my.exchange", exchangeConfigurator =>
                {
                    exchangeConfigurator.AutoDelete = true;
                    exchangeConfigurator.Durable = true;
                    exchangeConfigurator.ExchangeType = "topic";
                    exchangeConfigurator.Consumer<MyWrapperConsumer>(provider);
                });

TO THIS:

        cfg.ReceiveEndpoint(host, Environment.MachineName, queueConfigurator =>
                {
                    queueConfigurator.AutoDelete = true;
                    queueConfigurator.Consumer<MyWrapperConsumer>(provider);
                });

Note, the Environment.MachineName gives the unique queue name for each instance

Lazarolazaruk answered 26/7, 2019 at 13:21 Comment(0)
P
5

I want to share a slightly different code example. instanceId:

Specifies an identifier that uniquely identifies the endpoint instance, which is appended to the end of the endpoint name.

  services.AddMassTransit(x => {
    x.SetKebabCaseEndpointNameFormatter();
    Guid instanceId = Guid.NewGuid();
    x.AddConsumer<MyConsumer>()
      .Endpoint(c => c.InstanceId = instanceId.ToString());

    x.UsingRabbitMq((context, cfg) => {
      ...
      cfg.ConfigureEndpoints(context);
    });
  });
Polad answered 11/12, 2020 at 6:37 Comment(0)
M
3

We can achieve it by having separate queue for each consumer services and each queue bind with a single exchange. When we publish message to exchange it will send copy of message to each queue and eventually received by each consumer services.

Messages :

    namespace Masstransit.Message
{
    public interface ICustomerRegistered
    {
        Guid Id { get; }
        DateTime RegisteredUtc { get; }
        string Name { get; }
        string Address { get; }
    }
}

namespace Masstransit.Message
{
    public interface IRegisterCustomer
    {
        Guid Id { get; }
        DateTime RegisteredUtc { get; }
        string Name { get; }
        string Address { get; }
    }
}

Publisher Console App :

namespace Masstransit.Publisher
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.WriteLine("CUSTOMER REGISTRATION COMMAND PUBLISHER");
            Console.Title = "Publisher window";
            RunMassTransitPublisher();
        }

        private static void RunMassTransitPublisher()
        {
            string rabbitMqAddress = "rabbitmq://localhost:5672";
            string rabbitMqQueue = "mycompany.domains.queues";
            Uri rabbitMqRootUri = new Uri(rabbitMqAddress);

            IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
            {
                rabbit.Host(rabbitMqRootUri, settings =>
                {
                    settings.Password("guest");
                    settings.Username("guest");
                });
            });

            Task<ISendEndpoint> sendEndpointTask = rabbitBusControl.GetSendEndpoint(new Uri(string.Concat(rabbitMqAddress, "/", rabbitMqQueue)));
            ISendEndpoint sendEndpoint = sendEndpointTask.Result;

            Task sendTask = sendEndpoint.Send<IRegisterCustomer>(new
            {
                Address = "New Street",
                Id = Guid.NewGuid(),                
                RegisteredUtc = DateTime.UtcNow,
                Name = "Nice people LTD"                            
            }, c =>
            {
                c.FaultAddress = new Uri("rabbitmq://localhost:5672/accounting/mycompany.queues.errors.newcustomers");
            });

            Console.ReadKey();
        }
    }
}

Receiver Management console app :

namespace Masstransit.Receiver.Management
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.Title = "Management consumer";
            Console.WriteLine("MANAGEMENT");
            RunMassTransitReceiver();
        }

        private static void RunMassTransitReceiver()
        {
            IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
            {
                rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>
                {
                    settings.Password("guest");
                    settings.Username("guest");
                });

                rabbit.ReceiveEndpoint("mycompany.domains.queues.events.mgmt", conf =>
                {
                    conf.Consumer<CustomerRegisteredConsumerMgmt>();
                });
            });
            rabbitBusControl.Start();
            Console.ReadKey();
            rabbitBusControl.Stop();
        }
    }
}

Receiver Sales Console app:

namespace Masstransit.Receiver.Sales
{
    class Program
    {
        static void Main(string[] args)
        {
            Console.Title = "Sales consumer";
            Console.WriteLine("SALES");
            RunMassTransitReceiver();
        }

        private static void RunMassTransitReceiver()
        {
            IBusControl rabbitBusControl = Bus.Factory.CreateUsingRabbitMq(rabbit =>
            {
                rabbit.Host(new Uri("rabbitmq://localhost:5672"), settings =>
                {
                    settings.Password("guest");
                    settings.Username("guest");
                });

                rabbit.ReceiveEndpoint("mycompany.domains.queues.events.sales", conf =>
                {
                    conf.Consumer<CustomerRegisteredConsumerSls>();
                });
            });

            rabbitBusControl.Start();
            Console.ReadKey();

            rabbitBusControl.Stop();
        }
    }
}

You can find a working solution on https://github.com/prasantj409/Masstransit-PublishMultipleConsumer.git

Moult answered 7/6, 2021 at 7:28 Comment(4)
While this link may answer the question, it is better to include the essential parts of the answer here and provide the link for reference. Link-only answers can become invalid if the linked page changes. - From ReviewUncrown
@Labu: This is clearly more than just a link. If you remove the link, it still offers a solution. That said, the author should take care to read Stack Overflow's rules on self-promotion when linking to their own external content.Minacious
Sure - problem was that this was the only real option I had to choose from for the moderation tasks. The only other relevant one was "no comment" – which seemed unhelpful.Uncrown
@Labu: The reasons are only there to aid in offering common feedback. In a case where none of the canned comments fit, you should add your own comment. That said, in this case, a more appropriate option might have been to reflag the answer as spam since it links to the author’s own repository without disclosing their affiliation.Minacious
G
2

By default, RabbitMQ sends each message to all the consumers in sequence. This type of dispatching is called "round-robin" and made for load balancing (you can have multiple instances of your service consuming the same message). As Chris pointed, to ensure that your service always receives its copy of a message, you need to provide the unique Queue name.

Guzel answered 6/8, 2019 at 9:7 Comment(0)
H
1

What you need to do:

  1. Make sure that your consumers implements IConsumer interface with the same generic type
  2. Register all this consumers
  3. Use Publish method to send message

Generally there are two types of messages in MassTransit: Events and Commands, and in this case your message is Event. In the case when your message is a Command, only one consumer receives message and you need to use Send method.

Example of Event DTO:

public class OrderChecked
{
    public Guid OrderId { get; set; }
}

Consumers:

public class OrderSuccessfullyCheckedConsumer : IConsumer<OrderChecked>
{
    public async Task Consume(ConsumeContext<OrderChecked> context)
    {
        // some your consuming code
    }
}

public class OrderSuccessfullyCheckedConsumer2 : IConsumer<OrderChecked>
{
    public async Task Consume(ConsumeContext<OrderChecked> context)
    {
        // some your second consuming code
    }
}

Configuring:

services.AddMassTransit(c =>
{
    c.AddConsumer<OrderSuccessfullyCheckedConsumer>();
    c.AddConsumer<OrderSuccessfullyCheckedConsumer2>();
            
    c.SetKebabCaseEndpointNameFormatter();
    c.UsingRabbitMq((context, cfg) =>
    {
        cfg.ConfigureEndpoints(context);
    });
});
services.AddMassTransitHostedService(true);

Publishing the message:

var endpoint = await _bus.GetPublishSendEndpoint<OrderChecked>();
await endpoint.Send(new OrderChecked
{
    OrderId = newOrder.Id
});
Hydrostatics answered 23/2, 2022 at 12:1 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.