Queue.Peek的模拟()用于BlockingCollection听消耗的IEnumerable

2019-07-04 04:01发布

我使用的是管道模式实现从生产脱钩的消息消费者,以避免缓慢的消费问题。

在对消息的处理阶段的任何异常的情况下, [1]将丢失,而不是调度到其他服务/层[2] 我该如何处理此类问题[3]因此消息不会丢失,什么是重要的! 消息的顺序不会被搞混了,因此上层业务/层会得到他们来到的顺序消息。我有一个涉及其他中间一个想法Queue ,但似乎复杂吗? 不幸的是BlockingCollection<T>不公开的任何类似物Queue.Peek()方法,所以我可以只是读下一个可用消息和在全成处理的情况下做Dequeue()

private BlockingCollection<IMessage> messagesQueue;    

// TPL Task does following:
// Listen to new messages and as soon as any comes in - process it
foreach (var cachedMessage in 
             messagesQueue.GetConsumingEnumerable(cancellation))
{    
    const int maxRetries = 3;
    int retriesCounter = 0;
    bool isSent = false;

    // On this point a message already is removed from messagesQueue
    while (!isSent && retriesCounter++ <= maxRetries)
    {
        try
        {
           // [1] Preprocess a message
           // [2] Dispatch to an other service/layer    
           clientProxyCallback.SendMessage(cachedMessage);
           isSent = true;
        }                                
        catch(Exception exception)
        {
           // [3]   
           // logging
           if (!isSent && retriesCounter < maxRetries)
           {
              Thread.Sleep(NSeconds);
           }
        }            

        if (!isSent && retriesCounter == maxRetries)
        {
           // just log, message is lost on this stage!
        }
    }
}

编辑 :忘了说,这是IIS托管WCF服务,通过客户端回调合同信息分配回Silverlight客户端WCF代理。

EDIT2:下面是我会怎么做这个用Peek()我缺少的东西?

bool successfullySent = true;
try
{
   var item = queue.Peek();
   PreProcessItem(item);
   SendItem(item);       
}
catch(Exception exception)
{
   successfullySent = false;
}
finally
{
   if (successfullySent)
   {
       // just remove already sent item from the queue
       queue.Dequeue();
   }
}

EDIT3:当然,我可以使用while循环,布尔标志,使用旧式的做法QueueAutoResetEvent ,但我只是想知道是否同样可能使用BlockingCollectionGetConsumingEnumerable()我认为像设施Peek在与消费枚举一起使用将是非常有益,否则所有的管道模式实现的例子一样新的东西BlockingCollectionGetConsumingEnumerable()看起来不耐用,我不得不搬回老办法。

Answer 1:

你应该考虑中间队列。

BlockingCollection<T>可以,因为它的性质不是“偷看”项目-可以有一个以上的消费者。 其中一人可以窥视一个项目,而另一个可以它-因此,第一个将尽量采取项目,已被采取。



Answer 2:

如丹尼斯在他的评论说, BlockingCollection<T>提供了一种阻挡包装器的任何实现者IProducerConsumerCollection<T>接口。

正如可以看到, IProducerConsumerCollection<T>通过设计,不限定Peek<T>必要或其他方法来实现的。 这意味着, BlockingCollection<T>不能,因为它代表,提供了一个analouge到Peek

如果你认为,这greately减少由公用事业行业创造了断的concurrencey问题Peek执行。 你怎么能消耗不消耗? 要Peek同时你必须锁定集合的头,直到Peek的操作已经完成,我和设计师BlockingCollection<T>视图作为次优的。 我认为这也将是混乱和难以实施,需要某种形式的一次性偷看上下文。

如果你消耗的消息,其消费失败,你将不得不与它来处理。 你可以把它添加到另一个失败的队列中,将其重新添加到正常处理队列为未来编重试或只需登录它的失败留给后人,或适当的对上下文一些其他的动作。

如果你不想消耗信息并存那么就没有必要使用BlockingCollection<T>因为你不需要同时食用。 你可以使用ConcurrentQueue<T>直接,你仍然会得到同步断增加,并且可以使用TryPeek<T>安全,因为您可以控制单个消费者。 如果消费失败在你的愿望,虽然你可以用无限重试循环停止消费,我认为这需要一些设计思想。



Answer 3:

BlockingCollection<T>是围绕一个包装IProducerConsumerCollection<T>这是比如更为通用ConcurrentQueue并给出实施者不具有实现的自由(Try)Peek方法。

但是,您可以随时拨打TryPeek直接底层队列:

ConcurrentQueue<T> useOnlyForPeeking = new ConcurrentQueue<T>();
BlockingCollection<T> blockingCollection = new BlockingCollection<T>(useOnlyForPeeking);
...
useOnlyForPeeking.TryPeek(...)

但是请注意, 您不能修改通过您的队列useOnlyForPeeking ,否则blockingCollection会感到困惑,并可能会抛出InvalidOperationException小号你,但如果调用非修改我会感到惊讶TryPeek这个并发数据结构将是一个问题。



Answer 4:

可以使用ConcurrentQueue<T>代替,它具有TryDequeue()方法。

ConcurrentQueue<T>.TryDequeue(out T result)试图移除并在并发队列的开始时返回的对象时如果除去的元素和从所述ConcurrentQueue开头成功返回则返回true。

所以,没有必要先检查偷看。

TryDequeue()是线程安全的:

ConcurrentQueue<T> 内部处理所有的同步 。 如果两个线程正好在同一时刻调用TryDequeue(T),无论是操作被阻止。

据我所知返回false只有在队列为空

如果队列用代码填充诸如q.Enqueue(“A”); q.Enqueue( “B”); q.Enqueue( “C”); 两个线程同时试图离队的元素,一个线程将出列与另一线程将出列湾 到TryDequeue(T)两种电话将返回true,因为他们都是能够出列的元素。 如果每个线程再次出队一个额外的元素,一个线程将出列c和返回true,而其他线程会发现队列空,将返回false。

http://msdn.microsoft.com/en-us/library/dd287208%28v=vs.100%29.aspx

UPDATE

也许,最简单的办法是使用TaskScheduler类。 有了它,你可以用你所有的处理任务到队列中的项目,简化同步实施。



文章来源: Analogue of Queue.Peek() for BlockingCollection when listening to consuming IEnumerable