Multiple consumers for the same message through Unity not working in MassTransit
Asked Answered
C

1

6

I'm having a lot of problems lately because of what seems to be a bug in the MassTransit.UnityIntegration package, primarily due to the fact that registration names are not being considered.

For instance, if I register my classes like this:

var container = new UnityContainer()
    .RegisterType<Consumes<Command1>.All, Handler1>("Handler1")
    .RegisterType<Consumes<Command1>.All, Handler3>("Handler3");

A few lines later, I use the LoadFrom extension method to get the registered consumers in the container like this:

IServiceBus massTransitBus = ServiceBusFactory.New(_sbc =>
    {
        _sbc.UseBinarySerializer();
        _sbc.UseControlBus();
        _sbc.ReceiveFrom("msmq://localhost/MyQueue");
        _sbc.UseMsmq(_x =>
            {
                _x.UseSubscriptionService("msmq://localhost/mt_subscriptions");
                _x.VerifyMsmqConfiguration();
            });
        _sbc.Subscribe(_s => _s.LoadFrom(container));
    });

What happens is that my handlers are never called when the associated messages hit the bus.

After pondering for a while, I decided to take a look at the implementation and it became clear why this happens:

This is the main code inside the LoadFrom method:

public static void LoadFrom(this SubscriptionBusServiceConfigurator configurator, IUnityContainer container)
{
    IList<Type> concreteTypes = FindTypes<IConsumer>(container, x => !x.Implements<ISaga>());
    if (concreteTypes.Count > 0)
    {
        var consumerConfigurator = new UnityConsumerFactoryConfigurator(configurator, container);

        foreach (Type concreteType in concreteTypes)
            consumerConfigurator.ConfigureConsumer(concreteType);
    }

    ...

}

Notice that it only finds the types and does not pass any information of the names forward. This is the FindTypes<T> implementation:

static IList<Type> FindTypes<T>(IUnityContainer container, Func<Type, bool> filter)
{
    return container.Registrations
                        .Where(r => r.MappedToType.Implements<T>())
                        .Select(r => r.MappedToType)
                        .Where(filter)
                        .ToList();
}

After a few indirections, it all comes down to this single line, inside the UnityConsumerFactory<T> class, that actually creates the instance of the consumer:

var consumer = childContainer.Resolve<T>();

This absolutely will not work with Unity when there are multiple registrations, because the only way to register (and then resolve) multiple implementations in Unity is to give them a name on the RegisterType call and later on specifying this name on the Resolve call.

Perhaps I'm missing something completely basic in all this and the error is on my part? The source for the MassTransit Unity components can be found here. I did not look into the code for the other containers because I'm not familiar with them, but I assume this has been handled in some way? I think having more than one consumer for the same message type inside the same container is actually quite common.

In this particular case, it would be better to pass along not only the Type from the registration in the container, but also the Name used for the registration.

Update

Well the problem is a bit more clear now that Travis took the time to explain it. I should have noticed it earlier.

It seems I should be registering the types directly for they to be correctly resolved inside the factory, like this:

var container = new UnityContainer()
    .RegisterType<Handler1>()
    .RegisterType<Handler3>();

With this approach, I can also omit the registration name, since now their build keys inside the container are different.

Well, this would work perfectly if this was our real scenario, but it isn't. Let me explain what exactly we are doing:

Before we started using MassTransit, we already had an interface used for the command pattern, called ICommandHandler<TCommand>, where TCommand is a base model for commands in the system. When we started considering the use of a service bus, it was clear from the get go that it should be possible to switch later on to another service bus implementation without much hassle. With that in mind, I proceeded to create an abstraction over our commanding interface to behave like one of the consumers that MT expects. This is what I came up with:

public class CommandHandlerToConsumerAdapter<T> : Consumes<T>.All
    where T : class, ICommand
{
    private readonly ICommandHandler<T> _commandHandler;

    public CommandHandlerToConsumerAdapter(ICommandHandler<T> commandHandler)
    {
        _commandHandler = commandHandler;
    }

    public void Consume(T _message)
    {
        _commandHandler.Handle(_message);
    }
}

It's a very simple adapter class. It receives an ICommandHandler<T> implementation and makes it behave like a Consumes<T>.All instance. It was unfortunate that MT required message models to be classes, since we did not have that constraint on our commands, but that was a small inconvenience, and we proceeded to add the where T : class constraint to our interfaces.

Then, since our handler interfaces were already registered in the container, it would be a matter of registering the MT interface with this adapter implementation and letting the container inject the real implementations over it. For instance, a more realistic example (taken straight from our code base):

.RegisterType<ICommandHandler<ApplicationInstallationCommand>, CommandRecorder>("Recorder")
.RegisterType<ICommandHandler<ApplicationInstallationCommand>, InstallOperation>("Executor")
.RegisterType<Consumes<ApplicationInstallationResult>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationResult>>()
.RegisterType<Consumes<ApplicationInstallationCommand>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>>
  ("Recorder", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Recorder")))
.RegisterType<Consumes<ApplicationInstallationCommand>.All, CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>>
  ("Executor", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Executor")))

The named registrations there are a bit convoluted but required, since we now have two consumers for the same message. Although not as clean as we hoped, we could live with that since this promotes a huge decoupling of our code from MassTransit specific logic: the adapter class is in a separate assembly, referenced ONLY by the final layer in the system, for container registration purposes. That seems like a very nice idea, but is confirmed unsupported now by the lookup logic behind the container integration classes.

Notice that I'm unable to register the concrete classes here, since there is a generic adapter class in the middle.

Update 2:

After following Travis' advice, I tried this simple code which also does not work (I fail to see why, as it seems perfectly valid). It's an explicit consumer factory registration without any automatic container integration:

_sbc.Consume(() => container.resolve<Consumes<ApplicationInstallationCommand>.All>("Recorder"))

That resolve call correctly gives me the previously registered CommandHandlerToConsumerAdapter<ApplicationInstallationCommand> instance, which implements the Consumes<ApplicationInstallationCommand>.All, which in turn should be one of THE base interfaces supported. Publishing an ApplicationInstallationCommand right after this does nothing, as if the handler was invalid or something similar.

This works though:

_sbc.Consume(() => (CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>) container.resolve<Consumes<ApplicationInstallationCommand>.All>("Recorder"))

It is clear that something deep down in the API is handling the compile type in a non generic way of sorts, instead of basing itself on the generic interface.

I mean... it is workable with this but the registration code is getting convoluted for no apparent reason (due to what I would consider as 'non-standard implementation details' on MT's part). Maybe I'm just grasping at straws here? Perhaps this all boils down to 'why does MT not accept it's own, already generic, interface?' Why does it need the concrete type at compile time to see that it is a message handler even though the instance that I'm passing to it is typed as Consumes<X>.All, also at compile time?

Update 3:

After discussing with Travis below, I decided to drop the UnityIntegration assembly completely and go with standalone Consumer calls on the subscription.

I've created a small extension class in our MassTransit specific assembly to facilitate things:

public static class CommandHandlerEx
{
    public static CommandHandlerToConsumerAdapter<T> ToConsumer<T>(this ICommandHandler<T> _handler)
        where T : class, ICommand
    {
        return new CommandHandlerToConsumerAdapter<T>(_handler);
    }
}

And finally registered the handlers like this:

var container = new UnityContainer()
    .RegisterType<ICommandHandler<ApplicationInstallationCommand>, CommandRecorder>("Recorder")
    .RegisterType<ICommandHandler<ApplicationInstallationCommand>, InstallOperation>("Executor");

IServiceBus massTransitBus = ServiceBusFactory.New(_sbc =>
    {
        _sbc.UseBinarySerializer();
        _sbc.UseControlBus();
        _sbc.ReceiveFrom("msmq://localhost/MyQueue");
        _sbc.UseMsmq(_x =>
            {
                _x.UseSubscriptionService("msmq://localhost/mt_subscriptions");
                _x.VerifyMsmqConfiguration();
            });
        _sbc.Subscribe(RegisterConsumers);
    });

private void RegisterConsumers(SubscriptionBusServiceConfigurator _s)
{
    _s.Consumer(() => container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Recorder").ToConsumer());
    _s.Consumer(() => container.Resolve<ICommandHandler<ApplicationInstallationCommand>>("Executor").ToConsumer());
}

After using the whole day yesterday to try working things out, I strongly suggest that you stay away from the container extension assemblies if you want expected behavior out of the container and/or if you want to customize the classes etc (like I did to decouple our messaging classes from MT specific code) for 2 main reasons:

  1. The logic in the extensions traverse the registrations in the container to find the consumer classes. This is, in my opinion, terrible design. If something wants an implementation from the container, it should just call Resolve or ResolveAll on it's interface (or their equivalent in non Unity terms), without caring for what exactly is registered and what their concrete types are. This can have serious consequences with code that assumes the container can return types that were not explicitly registered. Luckily it's not the case with these classes, but we do have a container extension that automatically creates decorator types based on the build key, and they don't need to be explicitly registered on the container.

  2. The consumer registration uses the MappedToType property on the ContainerRegistration instance to call Resolve on the container. This is just completely wrong on any situation, not just in MassTransit's context. Types in Unity are either registered as a mapping (like in the excerpts above, with a From and To component) or directly as a single concrete type. In BOTH cases the logic should use the RegisteredType type to resolve from the container. The way it works now is that, if you happen to register the handlers with their interfaces, MT will completely bypass your registration logic and call resolve on the concrete type instead, which works in Unity out of the box, possibly causing unpredictable behavior because you think it should be a singleton like you registered but it ends up being a transient object (the default) instead, for instance.

Looking back at it now I can see it was much more complicated that I originally believed. There was quite a bit of learning in the process too, so that's good.

Update 4:

Yesterday I decided to refactor the whole adapter approach a bit before making the final checkin. I went with MassTransit's interface pattern to create my adapters too, because I think that is a very nice and clean syntax.

Here is the result:

public sealed class CommandHandlerToConsumerAdapter<T>
    where T : class, ICommand
{
    public sealed class All : Consumes<T>.All
    {
        private readonly ICommandHandler<T> m_commandHandler;

        public All(ICommandHandler<T> _commandHandler)
        {
            m_commandHandler = _commandHandler;
        }

        public void Consume(T _message)
        {
            m_commandHandler.Handle(_message);
        }
    }
}

Unfortunatelly this breaks MassTransit's code because of an unhandled exception on a utility method in the referenced Magnum library, on an extension method called ToShortTypeName.

Here is the exception:

ArgumentOutOfRangeException in MassTransit receive

at System.String.Substring(Int32 startIndex, Int32 length)
at Magnum.Extensions.ExtensionsToType.ToShortTypeName(Type type)
at MassTransit.Pipeline.Sinks.ConsumerMessageSink2.<>c__DisplayClass1.<Selector>b__0(IConsumeContext1 context) in d:\BuildAgent-02\work\aa063b4295dfc097\src\MassTransit\Pipeline\Sinks\ConsumerMessageSink.cs:line 51 at MassTransit.Pipeline.Sinks.InboundConvertMessageSink`1.<>c__DisplayClass2.<>c__DisplayClass4.b__1(IConsumeContext x) in d:\BuildAgent-02\work\aa063b4295dfc097\src\MassTransit\Pipeline\Sinks\InboundConvertMessageSink.cs:line 45 at MassTransit.Context.ServiceBusReceiveContext.DeliverMessageToConsumers(IReceiveContext context) in d:\BuildAgent-02\work\aa063b4295dfc097\src\MassTransit\Context\ServiceBusReceiveContext.cs:line 162

Codicodices answered 20/12, 2013 at 19:23 Comment(0)
C
4

While I don't know the Unity integration, with all the containers, you must register your consumers as the concrete type in the container and not the Consumes<> interfaces. I assume it's just RegisterType<Handler1, Handler1>() but I'm not totally sure on that.

If you don't like the LoadFrom extension for your container, you don't need to use it anyways. You can always just resolve the consumers yourself and register them via _sbc.Consume(() => container.resolve<YourConsumerType>()) in the configuration instead. The LoadFrom extension is just a convince for people who are using the container in a common way.

The following code works, which is using the container the way I would expect, without knowing your domain more, one to use it. If you want to understand how messages are bound a little bit better, I'd suggest using RabbitMQ because you can easily see where things end up by fallowing the exchange bindings. At this point, this is well beyond a SO question, I'd bring this to the mailing list if you have anything further.

using System;
using MassTransit;
using Microsoft.Practices.Unity;

namespace MT_Unity
{
    class Program
    {
        static void Main(string[] args)
        {
            using (var container = new UnityContainer()
                .RegisterType<ICommandHandler<MyCommand>, MyCommandHandler>()
                .RegisterType<CommandHandlerToConsumerAdapter<MyCommand>>())

            using (IServiceBus consumerBus = ServiceBusFactory.New(sbc =>
                    {
                        sbc.ReceiveFrom("rabbitmq://localhost/consumer");
                        sbc.UseRabbitMq();


                        sbc.Subscribe(s => s.Consumer(() => container.Resolve<CommandHandlerToConsumerAdapter<MyCommand>>()));
                    }))
            using (IServiceBus publisherBus = ServiceBusFactory.New(sbc =>
                    {
                        sbc.ReceiveFrom("rabbitmq://localhost/publisher");
                        sbc.UseRabbitMq();
                    }))
            {
                publisherBus.Publish(new MyCommand());

                Console.ReadKey();
            }
        }
    }

    public class CommandHandlerToConsumerAdapter<T> : Consumes<T>.All where T : class, ICommand
    {
        private readonly ICommandHandler<T> _commandHandler;

        public CommandHandlerToConsumerAdapter(ICommandHandler<T> commandHandler)
        {
            _commandHandler = commandHandler;
        }

        public void Consume(T message)
        {
            _commandHandler.Handle(message);
        }
    }

    public interface ICommand { }
    public class MyCommand : ICommand { }

    public interface ICommandHandler<T> where T : class, ICommand
    {
        void Handle(T message);
    }

    public class MyCommandHandler : ICommandHandler<MyCommand>
    {
        public MyCommandHandler()
        {

        }
        public void Handle(MyCommand message)
        {
            Console.WriteLine("Handled MyCommand");
        }
    }

}
Chaps answered 20/12, 2013 at 20:42 Comment(12)
Could you elaborate a bit? I'm not getting what you are saying. What I'm doing there is mapping the Consumes<Command1>.All interface to two concrete types, Handler1 and Handler3. This is extremely usual with unity, maybe it's just syntax confusion here on your part? By the way, you probably know who actually coded the unity integration parts. Would it be possible to perhaps contact them directly about this and/or point them here? Thanks for the quick reply as always Travis ;)Codicodices
github.com/MassTransit/MassTransit/blob/v2.8.0/src/Containers/… We look for types that implement Consumes<> not types of Consumes<>. This is how we do it in all the containers. So when you register, instead of registering as Consumes<>.All register it as it's concrete type.Chaps
Ah ok, now I see what you meant by registering the concrete types. Well, to me, this makes absolutely no sense now. I didn't realize before that you guys were using the .MappedTo type to later on resolve on the container. I'm sorry, but this seems completely wrong. I got the code for these classes on Friday and started modifying the source. Later on today I'll probably finish it with what seems to be correct behavior to me: I'll pass the Registered type and the name to the factory and resolve with those there. I'll edit the question to show you why your approach does not work in my context.Codicodices
I edited the question to show our real problem now, should make things a little more clear on why the problem exists to begin with. Also, I just created another question about the class constraint that I would also love if you could take a look at.Codicodices
Wow, now we're really digging into Unity so again I'm totally guessing. I'd suggest joining the mailing list to dig into this more, so maybe someone like Chris, who wrote this integration, could speak to it some more. groups.google.com/group/masstransit-discuss but... can you just .RegisterType< CommandHandlerToConsumerAdapter<ApplicationInstallationCommand>> ("Recorder", new InjectionConstructor(new ResolvedParameter<ICommandHandler<ApplicationInstallationCommand>>("Recorder"))) instead?Chaps
I could code that, but it would not work. If I specify the same type on the registration like that and keep the names, the code you guys wrote wouldn't pick it up because it tries to .Resolve from the container without passing any names (if it is registered in the container with a name, calling Resolve without it will return nothing).Codicodices
At the same time, if I just remove the names in the registrations, Unity would not create an entry for each implementation, since the registered type (which generates the 'buildKey' in Unity's terms) would be the same. It would instead overwrite the previous registration and the first implementation would be lost. I REALLY think you guys should be basing yourselves on the Resolve methods and not by iterating the registrations though... That seems to be a very bad design decision on the part of your team, with all due respect, as it breaks every assumption one can make with a IoC container.Codicodices
We do use Resolve to get the object from the container. It's just the LoadFrom you have a problem with. We look for a way to automatically bind all your consumers without much fuss. At this point I would just skip that and not get into a disagreement about what's the right way to use a container. I've updated my answer to reflect that option.Chaps
I've been trying to get my own version of the extension to work to no avail. I think there is something deep inside the code that is just completely messing things up. I did not delve too much in your code because it's a tad more confusing than I hoped it would be. For instance, the method you proposed only works if I cast the resolve call explicitly to CommandHandlerToConsumerAdapter<T>. To me, the fact that MT does not recognize the Consumes<T>.All type as a candidate for receiving messages is crazy, since it is one of the very base interfaces! Once again, I'll update the OP to show thisCodicodices
Updated the question again to showcase the weird behavior. As I said, this is probably why I couldn't make the modifications inside my own LoadFrom and associated classes to work, even though the internal Resolve calls on the container and such were working fine. Could you perhaps point me to the point in the source were MT decides which handlers should handle an incoming message so that I can at least understand what is happening?Codicodices
Just saw your update, thanks for the concise sample there. A small correction though: drop the string in the resolve call there since in the example you are not naming the registration (just so that if someone copy pastes the code it works without problems). I'm quite bummed by the fact that I had to work around the integration classes to the point of not using them at all, but I'm closing the question anyways since it is working fine with the [typed] Consumer calls. Will update the question again with the final approach. Do you mind upvoting the question though? Is there a reason you didn't?Codicodices
Travis, I found a bug in the Magnum library that is directly related to a small refactor I made to my approach here. A call to an extension method ToShortTypeName inside the receiving end of MT is throwing an exception and completely killing MT. I would love if you could take a loot at that here and potentially update the library on nuget. In the meantime I will keep a workaround here that doesn't use nested classes. Will update the question again to reflect what exactly I ended up with.Codicodices

© 2022 - 2024 — McMap. All rights reserved.