我使用的是管道模式实现从生产脱钩的消息消费者,以避免缓慢的消费问题。
在对消息的处理阶段的任何异常的情况下, [1]
将丢失,而不是调度到其他服务/层[2]
我该如何处理此类问题[3]
因此消息不会丢失,什么是重要的! 消息的顺序不会被搞混了,因此上层业务/层会得到他们来到的顺序消息。我有一个涉及其他中间一个想法Queue
,但似乎复杂吗? 不幸的是BlockingCollection<T>
不公开的任何类似物Queue.Peek()
方法,所以我可以只是读下一个可用消息和在全成处理的情况下做Dequeue()
private BlockingCollection<IMessage> messagesQueue;
// TPL Task does following:
// Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in
messagesQueue.GetConsumingEnumerable(cancellation))
{
const int maxRetries = 3;
int retriesCounter = 0;
bool isSent = false;
// On this point a message already is removed from messagesQueue
while (!isSent && retriesCounter++ <= maxRetries)
{
try
{
// [1] Preprocess a message
// [2] Dispatch to an other service/layer
clientProxyCallback.SendMessage(cachedMessage);
isSent = true;
}
catch(Exception exception)
{
// [3]
// logging
if (!isSent && retriesCounter < maxRetries)
{
Thread.Sleep(NSeconds);
}
}
if (!isSent && retriesCounter == maxRetries)
{
// just log, message is lost on this stage!
}
}
}
编辑 :忘了说,这是IIS托管WCF服务,通过客户端回调合同信息分配回Silverlight客户端WCF代理。
EDIT2:下面是我会怎么做这个用Peek()
我缺少的东西?
bool successfullySent = true;
try
{
var item = queue.Peek();
PreProcessItem(item);
SendItem(item);
}
catch(Exception exception)
{
successfullySent = false;
}
finally
{
if (successfullySent)
{
// just remove already sent item from the queue
queue.Dequeue();
}
}
EDIT3:当然,我可以使用while循环,布尔标志,使用旧式的做法Queue
和AutoResetEvent
,但我只是想知道是否同样可能使用BlockingCollection
和GetConsumingEnumerable()
我认为像设施Peek
在与消费枚举一起使用将是非常有益,否则所有的管道模式实现的例子一样新的东西BlockingCollection
和GetConsumingEnumerable()
看起来不耐用,我不得不搬回老办法。