How to unit test MassTransit consumer with no response
Asked Answered
K

2

7

I want to unit test my MassTransit consumer which does not send a response back. Currently my test does seem to be publishing a message, but the consumer is not being triggered, so my breakpoints are not getting hit within it at all.

The consumer is fairly straight forward, but it does have a service injected via DI.

public class BudgetExceededConsumer : IConsumer<IBudgetExceeded>
{
    private readonly INotificationHubService _notificationHubService;

    public BudgetExceededConsumer(INotificationHubService notificationHubService)
    {
        _notificationHubService = notificationHubService;
    }

    public async Task Consume(ConsumeContext<IBudgetExceeded> context)
    {
        try
        {
            var message = context.Message;

            await _notificationHubService.SendNotificationAsync(context.Message);
        }
        catch (Exception ex)
        {
            throw new Exception("Failed to send push notification for exceeding budget usage", ex);
        }
    }
}

The consumer is added to my Azure function using the following:

        builder.Services.AddMassTransitForAzureFunctions(cfg =>
        {
            cfg.AddConsumersFromNamespaceContaining<ConsumerNamespace>();

        });

And I have a relatively straightforward service that is used by other functions to send the messages:

    private readonly ISendEndpointProvider _sendEndpoint;

    public MessagingService(ISendEndpointProvider sendEndpoint)
    {
        _sendEndpoint = sendEndpoint;
    }

    public async Task SendMessage<T>(string queueName, object messageBody) where T : class, MessageBase
    {
        var endpoint = await _sendEndpoint.GetSendEndpoint(new Uri($"queue:{queueName}"));

        await endpoint.Send<T>(messageBody);
    }

I would like to write a simple test for the consumer so I could mock the service and then verify that the mocked service is being called. However I cannot get to the point of running a test and my consumer being hit by a breakpoint. I am not setting up the service injected into the consumer anywhere in the DI. Currently it is not complaining about that which makes me think I am missing something in the setup.

    public async Task Budget_message_gets_consumed()
    {
        await using var provider = new ServiceCollection()
            .AddMassTransitInMemoryTestHarness(cfg =>
            {
                cfg.AddConsumer<BudgetExceededConsumer>();
                cfg.AddConsumerTestHarness<BudgetExceededConsumer>();
            })
            .BuildServiceProvider(true);

        var harness = provider.GetRequiredService<InMemoryTestHarness>();

        await harness.Start();

        try
        {
            var bus = provider.GetRequiredService<IBus>();

            BudgetExceededMessage message = new BudgetExceededMessage
            {
                UserEmailAddress = "[email protected]",
                Budget = "£20.00",
                TotalSpend = "£23.56"
            };
            await bus.Publish(message);

            var result = await harness.Consumed.Any<IBudgetExceeded>();

            Assert.That(result, Is.True); //This is true
            
            var consumerHarness = provider.GetRequiredService<IConsumerTestHarness<BudgetExceededConsumer>>();
            var result2 = await consumerHarness.Consumed.Any<IBudgetExceeded>();
            Assert.That(result2, Is.True); //This is FALSE. 
        }
        finally
        {
            await harness.Stop();

            await provider.DisposeAsync();
        }
    }

As you can see the second Assert is false. I think if this was true then I would be seeing the breakpoint in my consumer getting hit.

Is there part of the setup here I need to change so the second assert will get evaluated correctly? I know my setup is slightly different to the docs since I am not using the approach that gives a response.

Thanks

Kroll answered 9/6, 2022 at 16:30 Comment(0)
L
4

I ran into more or less the same issue and ended up getting this to work correctly with two changes (second change optional).

1. My consumer was using a named endpoint so I needed to configure this in the setup of the consumer in the harness.

To check if this is your issue, you can look at the logs (note the 3rd and 5th lines).

08:00:18.424-D Starting bus: loopback://localhost/
08:00:18.440-D Endpoint Ready: loopback://localhost/mbp14_testhost_bus_gtwyyyrqpojbqfzhbdpgyrnng6
08:00:18.440-D Endpoint Ready: loopback://localhost/PublishPdp
08:00:18.442-I Bus started: loopback://localhost/
08:00:18.454-D Create send transport: loopback://localhost/publish-pdp?bind=true
08:00:18.507-D SEND loopback://localhost/publish-pdp?bind=true 34680000-8e6c-1217-6974-08da601042eb Ownit.Next.Common.Messages.PublishPdp

Notice the two different addresses: PublishPdp vs publish-pdp. In the test harness, adding this code:

EndpointConvention.Map<PublishPdp>(new Uri("queue:publish-pdp"));

Is not sufficient, I also had to update the consumer configuration:

await using var provider = new ServiceCollection()
    .AddMassTransitTestHarness(cfg =>
    {
        cfg.AddConsumer<PublishPdpConsumer>()
           .Endpoint(e => e.Name = "publish-pdp"); // Explicitly set the name.
    })
    .AddScoped(x => Log.Logger)
    .BuildServiceProvider(true);

or use the default naming:

// This also works by using the default naming scheme
EndpointConvention.Map<PublishPdp>(new Uri("queue:PublishPdp"));

await using var provider = new ServiceCollection()
    .AddMassTransitTestHarness(cfg =>
    {
        cfg.AddConsumer<PublishPdpConsumer>(); // No naming needed
    })
    .AddScoped(x => Log.Logger)
    .BuildServiceProvider(true);

I had assumed that the static EndpointConvention.Map<>() would also affect the consumer, but this is not the case. The consumer even has a static initalizer which sets this explicitly.

It seems that the test case must have explicit EndpointConvention.Map<>(new Url("queue:X")) and AddConsumer<>().Endpoint(e => e.Name = "X") when you are not using the default name. When you are using Send in the test harness, you still need to use EndpointConvention.Map<>(new Url("queue:X")) where X is the name of your message type.

2. Explicitly adding the AddConsumerTestHarness does not seem necessary.

There is some confusion because the docs only show two examples and most of the examples available on the GitHub issues shows the "old" style setup.

There is an ObsoleteAttribute on this method:

Consider migrating to AddMassTransitTestHarness, which does not require this extra configuration

Indeed, it does not appear necessary and the test case can be written like this:

[Fact]
public async Task Publish_Pdp_Is_Consumed_With_DI()
{
    // This mapping is needed in the test
    EndpointConvention.Map<PublishPdp>(new Uri("queue:publish-pdp"));

    await using var provider = new ServiceCollection()
        .AddMassTransitTestHarness(cfg =>
        {
            // Need to explicitly name the endpoint.  Commenting out one
            // or both causes the test to fail.
            cfg.AddConsumer<PublishPdpConsumer>()
               .Endpoint(e => e.Name = "publish-pdp");
        })
        .AddScoped(x => Log.Logger)  // I inject Serilog so I need this.
        .BuildServiceProvider(true);

    var harness = provider.GetRequiredService<ITestHarness>();

    await harness.Start();

    await harness.Bus.Send<PublishPdp>(new
    {
        Id = Guid.NewGuid(),
        CreatedBy = "test_user"
    });

    Assert.True(await harness.Sent.Any<PublishPdp>());
    Assert.True(await harness.Consumed.Any<PublishPdp>());
}

OR

[Fact]
public async Task Publish_Pdp_Is_Consumed_With_DI()
{
    // Map the default name
    EndpointConvention.Map<PublishPdp>(new Uri("queue:PublishPdp"));

    await using var provider = new ServiceCollection()
        .AddMassTransitTestHarness(cfg =>
        {
            // Don't name the endpoint, but explicitly map the default
            cfg.AddConsumer<PublishPdpConsumer>();
        })
        .AddScoped(x => Log.Logger)  // I inject Serilog so I need this.
        .BuildServiceProvider(true);

    var harness = provider.GetRequiredService<ITestHarness>();

    await harness.Start();

    await harness.Bus.Send<PublishPdp>(new
    {
        Id = Guid.NewGuid(),
        CreatedBy = "test_user"
    });

    Assert.True(await harness.Sent.Any<PublishPdp>());
    Assert.True(await harness.Consumed.Any<PublishPdp>());
}

Thanks to Chris for the helpful replies.

Lading answered 7/7, 2022 at 12:25 Comment(0)
T
0

If your consumer has a dependency, and that dependency is not configured in the service collection, it is likely failing when creating your consumer. If you add an ILoggerFactory to your container with a console output, you'd see that in the unit test output.

Or use v8 of MassTransit, with the new version of the test harness that automatically includes a console logger for test output.

Toscana answered 9/6, 2022 at 16:49 Comment(1)
Chris, I'm running into the same issue where the assert for consumerHarness.Consumed.Any<>() is returning false. If I test the code manually, everything works fine. Taking your comment into account, I removed all dependencies in the constructor and body (leaving only Console.WriteLine) and still observing the same. I have a static constructor on my consumer class with a Console.WriteLine and it simply doesn't get hit so I'm thinking the message is being sent (first assert works), but not delivered (second assert fails).Lading

© 2022 - 2025 — McMap. All rights reserved.