在Rebus的特定消息类型的串行处理(Serial processing of a certain

2019-10-18 05:17发布

我们有一个会谈到第三方Web服务一个卤面消息处理程序。 由于我们无法直接控制的原因,因为它遇到了自己的数据库数据库死锁这个WCF服务经常会抛出异常。 然后画谜将尝试处理此消息五次,在大多数情况下是指那些五次之一将是幸运的,并没有得到一个僵局。 但是频繁地发生的消息确实死锁后,得到的僵局,在我们的错误队列结束。

除了固定的死锁的来源,这将是一个长期的目标,我能想到的两个选项:

  1. 保持仅与该特定消息类型尝试,直到成功为止。 最好我将能够设置一个超时,所以“如果有五个死锁然后在5分钟内重试”,而不是不断地尝试更呛的过程了。 我已经做了Thread.Sleep(随机),在一定程度上传播的消息,但它仍然在5次尝试放弃。

  2. 发送该特定消息类型,以不同的队列仅具有一个工人处理该消息,以使这种情况发生串联而非并联连接。 我们目前的配置使用8个工作者线程,但这只是使死锁情况更糟,因为web服务现在被同时调用和消息在对方的方式获得。

方案2有我的偏好,但我不知道这是可能的。 我们在接收端配置目前看起来是这样的:

var adapter = new Rebus.Ninject.NinjectContainerAdapter(this.Kernel);

var bus = Rebus.Configuration.Configure.With(adapter)
    .Logging(x => x.Log4Net())
   .Transport(t => t.UseMsmqAndGetInputQueueNameFromAppConfig())
   .MessageOwnership(d => d.FromRebusConfigurationSection())
   .CreateBus().Start();

而为的.config 接收方

<rebus inputQueue="app.msg.input" errorQueue="app.msg.error" workers="8">
  <endpoints>
  </endpoints>
</rebus>

从我可以从配置来讲,它只能设定一个输入队列“听”到。 我真的不能找到一种方法,通过流畅的地图API来做到这一点无论是。 这似乎只需要一个输入和错误队列,以及:

.Transport(t =>t.UseMsmq("input", "error"))

基本上,我正在寻找的是沿着线的东西:

<rebus workers="8">
  <input name="app.msg.input" error="app.msg.error" />
  <input name="another.input.queue" error="app.msg.error" />
</rebus>

如何处理我的要求有什么建议?

Answer 1:

我建议你使用一个传奇和卤面超时服务实行适合您需求的重试策略。 这样一来,在启用了卤面的Web服务的门面,你可以这样做:

public void Handle(TryMakeWebServiceCall message)
{
    try
    {
        var result = client.MakeWebServiceCall(whatever);

        bus.Reply(new ResponseWithTheResult{ ... });
    }
    catch(Exception e)
    {
        Data.FailedAttempts++;

        if (Data.FailedAttempts < 10)
        {
            bus.Defer(TimeSpan.FromSeconds(1), message);
            return;
        }

        // oh no! we failed 10 times... this is probably where we'd
        // go and do something like this:
        emailService.NotifyAdministrator("Something went wrong!");
    }
}

其中Data是由神奇地提供给您和通话之间持续的传奇数据。

有关如何创建一个传奇,对检查出的wiki页面灵感协调的东西出现这种情况随着时间的推移存储在那里,你可以看到一个服务如何可能有一些状态(即在你的案件数量的尝试失败)为例本地即发可处理消息之间。

在时机成熟时进行bus.Defer工作,你有两个选择:1)使用外部超时服务(我通常已经安装了一个在每个服务器上),或2),只要用“自己”作为超时服务。

在配置时,你走

Configure.With(...)
    .(...)
    .Timeouts(t => // configure it here)

在那里你既可以StoreInMemoryStoreInSqlServerStoreInMongoDbStoreInRavenDb ,或UseExternalTimeoutManager

如果您选择(1),你需要检查的卤面代码和生成Rebus.Timeout自己-它基本上只是一个配置,启用Topshelf的控制台,里面有卤面端点应用。

请让我知道如果你需要更多的帮助,使这项工作- bus.Defer是你的系统变得真棒,并且将能够克服所有的小毛刺,使所有其他人的下去:)



文章来源: Serial processing of a certain message type in Rebus