Serial processing of a certain message type in Reb

2019-08-05 13:05发布

问题:

We have a Rebus message handler that talks to a third party webservice. Due to reasons beyond our immediate control, this WCF service frequently throws an exception because it encountered a database deadlock in its own database. Rebus will then try to process this message five times, which in most cases means that one of those five times will be lucky and not get a deadlock. But it frequently happens that a message does get deadlock after deadlock and ends up in our error queue.

Besides fixing the source of the deadlocks, which would be a longterm goal, I can think of two options:

  1. Keep trying with only this particular message type until it succeeds. Preferably I would be able to set a timeout, so "if five deadlocks then try again in 5 minutes" rather than choke the process up even more by trying continuously. I already do a Thread.Sleep(random) to spread the messages somewhat, but it will still give up after five tries.

  2. Send this particular message type to a different queue that has only one worker that processes the message, so that this happens serially rather than in parallel. Our current configuration uses 8 worker threads, but this just makes the deadlock situation worse as the webservice now gets called concurrently and the messages get in each other's way.

Option #2 has my preference, but I'm not sure if this is possible. Our configuration on the receiving side currently looks like this:

var adapter = new Rebus.Ninject.NinjectContainerAdapter(this.Kernel);

var bus = Rebus.Configuration.Configure.With(adapter)
    .Logging(x => x.Log4Net())
   .Transport(t => t.UseMsmqAndGetInputQueueNameFromAppConfig())
   .MessageOwnership(d => d.FromRebusConfigurationSection())
   .CreateBus().Start();

And the .config for the receiving side:

<rebus inputQueue="app.msg.input" errorQueue="app.msg.error" workers="8">
  <endpoints>
  </endpoints>
</rebus>

From what I can tell from the config, it's only possible to set one input queue to 'listen' to. I can't really find a way to do this via the fluent mapping API either. That seems to take only one input- and error queue as well:

.Transport(t =>t.UseMsmq("input", "error"))

Basically, what I'm looking for is something along the lines of:

<rebus workers="8">
  <input name="app.msg.input" error="app.msg.error" />
  <input name="another.input.queue" error="app.msg.error" />
</rebus>

Any tips on how to handle my requirements?

回答1:

I suggest you make use of a saga and Rebus' timeout service to implement a retry strategy that fits your needs. This way, in your Rebus-enabled web service facade, you could do something like this:

public void Handle(TryMakeWebServiceCall message)
{
    try
    {
        var result = client.MakeWebServiceCall(whatever);

        bus.Reply(new ResponseWithTheResult{ ... });
    }
    catch(Exception e)
    {
        Data.FailedAttempts++;

        if (Data.FailedAttempts < 10)
        {
            bus.Defer(TimeSpan.FromSeconds(1), message);
            return;
        }

        // oh no! we failed 10 times... this is probably where we'd
        // go and do something like this:
        emailService.NotifyAdministrator("Something went wrong!");
    }
}

where Data is the saga data that is made magically available to you and persisted between calls.

For inspiration on how to create a saga, check out the wiki page on coordinating stuff that happens over time where you can see an example on how a service might have some state (i.e. number of failed attempts in your case) stored locally that is made available between handling messages.

When the time comes to make bus.Defer work, you have two options: 1) use an external timeout service (which I usually have installed one of on each server), or 2) just use "yourself" as a timeout service.

At configuration time, you go

Configure.With(...)
    .(...)
    .Timeouts(t => // configure it here)

where you can either StoreInMemory, StoreInSqlServer, StoreInMongoDb, StoreInRavenDb, or UseExternalTimeoutManager.

If you choose (1), you need to check out the Rebus code and build Rebus.Timeout yourself - it's basically just a configurable, Topshelf-enabled console application that has a Rebus endpoint inside.

Please let me know if you need more help making this work - bus.Defer is where your system becomes awesome, and will be capable of overcoming all of the little glitches that make all others' go down :)