NServiceBus - How to get separate queue for each m

2019-06-23 10:23发布

问题:

I have a following situation:

So, receiver subscribes to two kind of events: eventA and eventB. NServiceBus creates queue for receiver (Receiver) and places messages of type eventA and eventB to to same queue. Question is, if I can configure NServiceBus to use separate queues (ReceiverEventA and ReceiverEventB) for each type of event for receiver? Or can I have two receivers in single process (and each receiver separate queue). Thing is, that EventA takes much longer to process than EventB, and they are independent - so if they would be in separate queues, they could be processed concurrently.

Update: If i'm going with naive approach like this, receiver fails to start with null reference exception:

 private static IBus GetBus<THandler, TEvent>()
    {                   
        var bus = Configure.With(new List<Type>
                                     {
                                         typeof(THandler), 
                                         typeof(TEvent),
                                         typeof(CompletionMessage)
                                     })
            .Log4Net()
            .DefaultBuilder()
            .XmlSerializer()
            .MsmqTransport()
            .IsTransactional(true)
            .PurgeOnStartup(false)
            .UnicastBus()
            .LoadMessageHandlers()
            .ImpersonateSender(false);

        bus.Configurer.ConfigureProperty<MsmqTransport>(x => x.InputQueue, "Queue" + typeof(THandler).Name);

        return bus.CreateBus().Start();
    }

    [STAThread]
    static void Main()
    {
        Busses = new List<IBus>
                     {
                         GetBus<ItemEventHandlerA, ItemEventA>(),
                         GetBus<ItemEventHandlerB, ItemEventB>()
                     };          

        Application.EnableVisualStyles();
        Application.SetCompatibleTextRenderingDefault(false);
        Application.Run(new TestForm());
    }

Exception stack trace is:

at NServiceBusTest2.WinFormsReceiver.Program.GetBusTHandler,TEvent in C:\Users\User\Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs:line 57
at NServiceBusTest2.WinFormsReceiver.Program.Main() in C:\Users\User\Documents\Visual Studio 2010\Projects\NServiceBusTest2\NServiceBusTest2.WinFormsReceiver\Program.cs:line 26
at System.AppDomain._nExecuteAssembly(RuntimeAssembly assembly, String[] args)
at System.AppDomain.ExecuteAssembly(String assemblyFile, Evidence assemblySecurity, String[] args) at Microsoft.VisualStudio.HostingProcess.HostProc.RunUsersAssembly()
at System.Threading.ThreadHelper.ThreadStart_Context(Object state)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state)
at System.Threading.ThreadHelper.ThreadStart()

回答1:

I started writing a more drawn-out explanation of why you should NOT have two separate processes, in support of my comment to the answer @stephenl posted. NServiceBus basically enforces one input queue per process.

So normally, you would have two separate processes. EventAService would read EventA from QForEventA, in a separate process from EventBService which would read EventB from QForEventB.

Then I looked more carefully at your example code and realized you were in a Windows Forms app. Duh! Now I feel a bit foolish. Of course you can only have one process. Imagine if after launching Outlook you also had to launch MailService.exe to actually get mail!

So the problem is really that you have drastically different processing times for EventA and EventB within your Windows Forms app. I don't have any way of knowing what that work is, but this is a little bit odd for a client application.

Most of the time it's a service that has big processing to do, and any message received by a client is fairly lightweight - something along the lines of "Entity X has changed so next time you'll need to load it direct from the database" and processing it involves just dropping something out of cache - certainly not a long-running process.

But it sounds like for whatever reason the processing in YOUR client takes longer, which in a WinForms app is concerning because of concerns about UI thread marshalling, blocking the UI thread, etc.

I would suggest a system where you don't do all the processing in the NServiceBus handler in your WinForms app, but marshal it off somewhere else. Throw it over to the ThreadPool as a work item, or something like that. Or put the long-running items into a Queue and have a background thread crunch on those at its own speed. That way all the NServiceBus message handler does is "Yep, got the message. Thank you very much." Then it shouldn't really matter if the NServiceBus messages are processed one at a time.

Update:

In the comments, the OP asks what happens if you throw the work to the ThreadPool after NServiceBus finsihes receiving it. That is, of course, the flip side to this approach - after NServiceBus is done, you're on your own - if it fails in the ThreadPool, then it's up to you to create your own retry logic, or just catch the exception, alert the user of the WinForms app, and let it die.

Obviously that's optimal, but it begs the question - just exactly what sort of work are you trying to accomplish in the WinForms app? If the robustness that NServiceBus offers (the automatic retries and error queue for poison messages) is a critical piece of this puzzle, then why is it going on in a WinForms application in the first place? This logic probably needs to be offloaded to a service external to the WinForms application, where having separate queues for each message type (by deploying separate services) becomes easy, and then only the pieces affecting the UI are ever sent back to the WinForms client. When the messages to the UI only affect the UI, processing them is almost always trivial, and you won't have any need for offloading to the ThreadPool to keep up.

Speaking directly to the situation you described in the GitHub Issue, this really does sound like the kind of scenario where separate processes for each message type are exactly the prescribed solution. I hear that it sounds overwhelming to deploy and manage that many processes, but I think you'll find that it's not as bad as it sounds. There are even advantages - if you have to redeploy your connector with Amazon.com, for instance, you only have to redeploy THAT endpoint, with no downtime for any of the others, or any worries that bugs may have been introduced to the others.

To ease deployment, hopefully you're using a continuous integration server, and then check in to tools like DropkicK that help deployments to be scripted. Personally, my favorite deployment tool is good old Robocopy. Even something as simple as 1) NET STOP ServiceName, 2) ROBOCOPY, 3) NET START ServiceName is quite effective.



回答2:

You don't need to, you just need to create a handler for each event in the receiver. For example

public class EventAHandler : IHandleMessages<EventA>{}

public class EventBHandler : IHandleMessages<EventB>{}

If you want to separate queues then you need to put each handler into a separate endpoint. Does that make sense?

Also I think your diagram needs a bit of work. Its a labelling thing I'm sure, but the way I see it is that Host is the actual publisher, not the client endpoints (which you've name Publisher A and Publisher B).

UPDATE: Its a simple example but illustrates what I mean - https://github.com/sliedig/sof9411638

I've extended the Full Duplex sample that comes with NServiceBus to include three additional endpoints, two of which subscribe to events being published by the server respectively and one endpoint that handles both. HTH.