MassTransit: Message contracts, polymorphism and dynamic proxy objects
Asked Answered
L

1

0

TL;DR On contract subscription, how can I get the raw message content or the original published object, rather than a dynamic proxy?

I am having a bad time trying to create a modular application based on MassTransit.

My idea is to have a Websocket server connected to queue, it reads events from the socket and inserts it in the queue as a "connection request", and reads events from the queue and send them to the sockets as "connection events". Both have a contract that allows the WS server know to which connection is the event going, and the rest of the system where is coming from:

public interface IConnectionRequest
{
    String ConnectionId { get; set; }
}

public interface IConnectionEvent
{
    String ConnectionId { get; set; }
}

There a object that maintain session and other data. This object accepts requests (like action requests or subscription requests) and push events as a result of the requests or just because the state changed. I want to create objects that listen for a particular event or set of events, and perform actions on the state, so I created this contract:

public interface IConnectionRequestHandler<T> : Consumes<T>.Selected
    where T : class, IConnectionRequest
{
}

For example, I want to create a handler that creates the actual session in the server, and replies to the connection notifying when the session is ready. I create an object that represents the request, other for the event and the handler it self.

public class CreateSessionRequest : IConnectionRequest
{
    public String ConnectionId { get; set; }
}

public class CreatedSessionEvent : IConnectionEvent
{
    public String ConnectionId { get; set; }
    public Guid SessionId { get; set; }
}

public class CreateSessionEventHandler : IConnectionRequestHandler<CreateSessionRequest>
{
    IServiceBus _bus;

    public CreateSessionEventHandler(IServiceBus bus)
    {
        _bus = bus;
    }

    public bool Accept(CreateSessionRequest message)
    {
        return true;
    }

    public void Consume(CreateSessionRequest message)
    {
        // do stuff, create the session
        var evt = new CreatedSessionEvent() { SessionId =Guid.NewGuid(), ConnectionId = message.ConnectionId };
        _bus.Publish(evt, evt.GetType());
    }
}

Now for testing purposes, I create this code that emulates the scenario. Basically, it creates a communication bus and subscribes the request handler:

var bus = ServiceBusFactory.New(sbc =>
{
    sbc.ReceiveFrom("loopback://localhost/queue");
});

bus.SubscribeInstance<CreateSessionEventHandler>(new CreateSessionEventHandler(bus));

Then, simulating the Websocket server, I write the part that reads from the WS and sends it to the queue:

IConnectionRequest e = new CreateSessionRequest() { ConnectionId = "myId" };
bus.Publish(e, e.GetType());

And now the part that is supposed to hear events from the queue and forward them to the appropiate connection:

bus.SubscribeHandler<IConnectionEvent>(evt => Console.WriteLine("Sending event '{0}' to connection: {1}",
                                                                evt.GetType().Name,  
                                                                evt.ConnectionId));

But this last part does not work as expected. The object I get in the subscription is not my original event, it is a dynamic proxy DynamicImpl.IConnectionEvent, so I cannot serialize this object in JSON since it would only contain the members of IConnectionEvent.

If I specify the type in the subscription, it works:

bus.SubscribeHandler<CreatedSessionEvent>(evt => Console.WriteLine("Sending event '{0}' to connection: {1}",
                                                                evt.GetType().FullName,  
                                                                evt.ConnectionId));

But then means that for each new event, I have to touch the websocket server to register that new type.

Is there a way of avoiding this?

Lucie answered 20/8, 2014 at 11:36 Comment(0)
R
1

TL;DR - MT only supports data on the contract for most serializes. There's a bunch of reasons for this but if you want non-proxy type then you need to use the binary serializer.

So XML & JSON serialization (XML actually just uses the JSON serializer under the hood, oddly faster) generates a proxy for all requests if it can. The subscribed type is the contract you're using and expecting to interrogate the object for more details not in the contract leads to a ton of complexity. We suggest you avoid doing that.

So that would mean you would need to go to the websocket server for each type of message it was expected to consume. If the only thing you are doing is forwarding the message off, there's a SingalR backplane someone did (https://github.com/mdevilliers/SignalR.RabbitMq I think is one) that might be better fit than MT consumers.

Ringworm answered 20/8, 2014 at 14:47 Comment(3)
Right, the binary serializer does the trick. To be honest, I struggle to imagine how this will lead to "a ton of complexity". Probably it's something I will learn through the "way of pain" soon but, do you have any example to get me started? Cheers.Lucie
Sure! So to start we have contract changes become brittle because you're dynamically casting to the type after loading the Consumer. You won't notice this being a problem at first, down the line there'll be some changes that you miss in one of Consumers and bug bites. If you literally will only have two consumers (which I've seen), it's not as big of a deal but still can be a problem. Next, if at some point you have to split Consumer for load or other reasons you'll notice there's likely some coupling you now have to overcome. And I'm running out of space.Ringworm
But there's a couple examples that could end up biting you later. It's possible you'll never discover them if your app never hits a level of complexity or doesn't change much over time. Though, even knowing this, sometimes it's worth the trade-offs - no matter what I tell you.Ringworm

© 2022 - 2024 — McMap. All rights reserved.