NServiceBus - How to get separate queue for each message type receiver subscribes to?
Asked Answered
D

2

5

I have a following situation:

enter image description here

So, receiver subscribes to two kind of events: eventA and eventB. NServiceBus creates queue for receiver (Receiver) and places messages of type eventA and eventB to to same queue. Question is, if I can configure NServiceBus to use separate queues (ReceiverEventA and ReceiverEventB) for each type of event for receiver? Or can I have two receivers in single process (and each receiver separate queue). Thing is, that EventA takes much longer to process than EventB, and they are independent - so if they would be in separate queues, they could be processed concurrently.

Update: If i'm going with naive approach like this, receiver fails to start with null reference exception:

 private static IBus GetBus<THandler, TEvent>()
    {                   
        var bus = Configure.With(new List<Type>
                                     {
                                         typeof(THandler), 
                                         typeof(TEvent),
                                         typeof(CompletionMessage)
                                     })
            .Log4Net()
            .DefaultBuilder()
            .XmlSerializer()
            .MsmqTransport()
            .IsTransactional(true)
            .PurgeOnStartup(false)
            .UnicastBus()
            .LoadMessageHandlers()
            .ImpersonateSender(false);

        bus.Configurer.ConfigureProperty<MsmqTransport>(x => x.InputQueue, "Queue" + typeof(THandler).Name);

        return bus.CreateBus().Start();
    }

    [STAThread]
    static void Main()
    {
        Busses = new List<IBus>
                     {
                         GetBus<ItemEventHandlerA, ItemEventA>(),
                         GetBus<ItemEventHandlerB, ItemEventB>()
                     };          

        Application.EnableVisualStyles();
        Application.SetCompatibleTextRenderingDefault(false);
        Application.Run(new TestForm());
    }

Exception stack trace is:

at NServiceBusTest2.WinFormsReceiver.Program.GetBusTHandler,TEvent in C:\Users\User\Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs:line 57
at NServiceBusTest2.WinFormsReceiver.Program.Main() in C:\Users\User\Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs:line 26
at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args)
at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args) at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.ThreadHelper.ThreadStart()

Donnettedonni answered 23/2, 2012 at 10:56 Comment(0)
D
5

I started writing a more drawn-out explanation of why you should NOT have two separate processes, in support of my comment to the answer @stephenl posted. NServiceBus basically enforces one input queue per process.

So normally, you would have two separate processes. EventAService would read EventA from QForEventA, in a separate process from EventBService which would read EventB from QForEventB.

Then I looked more carefully at your example code and realized you were in a Windows Forms app. Duh! Now I feel a bit foolish. Of course you can only have one process. Imagine if after launching Outlook you also had to launch MailService.exe to actually get mail!

So the problem is really that you have drastically different processing times for EventA and EventB within your Windows Forms app. I don't have any way of knowing what that work is, but this is a little bit odd for a client application.

Most of the time it's a service that has big processing to do, and any message received by a client is fairly lightweight - something along the lines of "Entity X has changed so next time you'll need to load it direct from the database" and processing it involves just dropping something out of cache - certainly not a long-running process.

But it sounds like for whatever reason the processing in YOUR client takes longer, which in a WinForms app is concerning because of concerns about UI thread marshalling, blocking the UI thread, etc.

I would suggest a system where you don't do all the processing in the NServiceBus handler in your WinForms app, but marshal it off somewhere else. Throw it over to the ThreadPool as a work item, or something like that. Or put the long-running items into a Queue and have a background thread crunch on those at its own speed. That way all the NServiceBus message handler does is "Yep, got the message. Thank you very much." Then it shouldn't really matter if the NServiceBus messages are processed one at a time.

Update:

In the comments, the OP asks what happens if you throw the work to the ThreadPool after NServiceBus finsihes receiving it. That is, of course, the flip side to this approach - after NServiceBus is done, you're on your own - if it fails in the ThreadPool, then it's up to you to create your own retry logic, or just catch the exception, alert the user of the WinForms app, and let it die.

Obviously that's optimal, but it begs the question - just exactly what sort of work are you trying to accomplish in the WinForms app? If the robustness that NServiceBus offers (the automatic retries and error queue for poison messages) is a critical piece of this puzzle, then why is it going on in a WinForms application in the first place? This logic probably needs to be offloaded to a service external to the WinForms application, where having separate queues for each message type (by deploying separate services) becomes easy, and then only the pieces affecting the UI are ever sent back to the WinForms client. When the messages to the UI only affect the UI, processing them is almost always trivial, and you won't have any need for offloading to the ThreadPool to keep up.

Speaking directly to the situation you described in the GitHub Issue, this really does sound like the kind of scenario where separate processes for each message type are exactly the prescribed solution. I hear that it sounds overwhelming to deploy and manage that many processes, but I think you'll find that it's not as bad as it sounds. There are even advantages - if you have to redeploy your connector with Amazon.com, for instance, you only have to redeploy THAT endpoint, with no downtime for any of the others, or any worries that bugs may have been introduced to the others.

To ease deployment, hopefully you're using a continuous integration server, and then check in to tools like DropkicK that help deployments to be scripted. Personally, my favorite deployment tool is good old Robocopy. Even something as simple as 1) NET STOP ServiceName, 2) ROBOCOPY, 3) NET START ServiceName is quite effective.

Downstream answered 16/3, 2012 at 18:37 Comment(3)
I was thinking about throwing to ThreadPool as work item, but what if it fails there? Handler will be finished it's job, so message will be removed from queue and there's no standard way of getting it back there (or am I missing something?). We're using windows forms as a process management tool - I've mentioned more concrete example here: github.com/NServiceBus/NServiceBus/issues/219Donnettedonni
Thanks for your help. To answer your question - WinForms is for having a dashboard to pause/resume, also to see current status of sales channels - which is what processing. We kind of found a way to have several receivers using separate AppDomains, but in general that is painful workaround. Looks like we will have to either accept workaround with AppDomains, either search for alternative for NServiceBus, either rethink deployment (now we have cc.net + clickOnce) and have dashboard and handlers set as separate pieces.Donnettedonni
I think you can probably already guess my advice would be the latter option. ;-) Good luck!Downstream
E
2

You don't need to, you just need to create a handler for each event in the receiver. For example

public class EventAHandler : IHandleMessages<EventA>{}

public class EventBHandler : IHandleMessages<EventB>{}

If you want to separate queues then you need to put each handler into a separate endpoint. Does that make sense?

Also I think your diagram needs a bit of work. Its a labelling thing I'm sure, but the way I see it is that Host is the actual publisher, not the client endpoints (which you've name Publisher A and Publisher B).

UPDATE: Its a simple example but illustrates what I mean - https://github.com/sliedig/sof9411638

I've extended the Full Duplex sample that comes with NServiceBus to include three additional endpoints, two of which subscribe to events being published by the server respectively and one endpoint that handles both. HTH.

Ebullition answered 23/2, 2012 at 13:27 Comment(5)
regarding naming, that is just sample, so may be you're right, those are more like senders and host is publisher. Regarding queues, if both events are pushed to same queue, isn't that if EventA comes first and it takes long time to process it, EventB will wait till EventA will be processed? I think if they were on separate queues, they could be processed concurently or I'm I wrong?Donnettedonni
If you configured it to use multiple threads, then you probably wouldn't have much of a problem with them blocking each other. That being said, you may still have a major influx of EventAs which will tie up all the threads and block the processing of EventBs. If that's something you're very concerned about, you should have a separate receiving endpoint for EventBs.Organo
Well, configuring it to use multiple threads requires commercial licence as I understand :) And we don't have that much messaging, to buy commercial licence for that, just EventA and EventB average processing times are very different. Could you point me to sample of receiver having two receiving endpoints?Donnettedonni
The receiver wouldn't have two receiving endpoints. There would just be two receivers. In your diagram above, you'd split Receiver so that you'd have ReceiverA (with Input Queue ReceiverAQueue) subscribing to EventA, and ReceiverB (with Input Queue ReceiverBQueue) subscribing to EventB. Then each (logical) receiver could be individually scaled (with number of threads or a distributor and multiple physical worker processes) according to the SLA requirements of each event type.Downstream
@DavidBoike, if you would give very short example how that would look like in code, I would mark it as answer.Donnettedonni

© 2022 - 2024 — McMap. All rights reserved.