-->

表观BufferBlock.Post/Receive/ReceiveAsync种族/错误(Appar

2019-07-28 21:13发布

跨发布到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9

我知道......我真的不使用TplDataflow其最大潜力。 ATM我只是使用BufferBlock为安全队列消息传递,在生产者和消费者都以不同的速率运行。 我看到一些奇怪的行为让我难倒就如何继续。

private BufferBlock<object> messageQueue = new BufferBlock<object>();

public void Send(object message)
{
    var accepted=messageQueue.Post(message);
    logger.Info("Send message was called qlen = {0} accepted={1}",
    messageQueue.Count,accepted);
}

public async Task<object> GetMessageAsync()
{
    try
    {
        var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
        //despite messageQueue.Count>0 next line 
        //occasionally does not execute
        logger.Info("message received");
        //.......
    }
    catch(TimeoutException)
    {
        //do something
    }
}

在上面的代码(其是2000线的分布式解决方案的一部分), Send被定期调用每100ms左右。 这意味着一个产品Post编到messageQueue在大约10次。 这验证。 但是,有时似乎ReceiveAsync不超时(即内完成Post不会导致ReceiveAsync完成)和TimeoutException为30秒后升起。 在这一点上, messageQueue.Count是数百人。 这是出乎意料的。 这个问题已经在发布过的速度较慢被观察到(1篇帖子/秒),前1000个项目已经通过了通过通常发生BufferBlock

因此,要解决这个问题,我使用下面的代码,它的工作原理,但偶尔会导致接收时(由于上述发生的错误)1秒的延迟

    public async Task<object> GetMessageAsync()
    {
        try
        {
            object m;
            var attempts = 0;
            for (; ; )
            {
                try
                {
                    m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
                }
                catch (TimeoutException)
                {
                    attempts++;
                    if (attempts >= 30) throw;
                    continue;
                }
                break;

            }

            logger.Info("message received");
            //.......
        }
        catch(TimeoutException)
        {
            //do something
        }
   }

这看起来像在TDF竞争条件给我,但我不能得到的,为什么这不会在其他地方,我使用出现底部BufferBlock以类似的方式。 从实验上改变ReceiveAsyncReceive于事无补。 我没有检查,但我想孤立地看,上面的代码完美的作品。 这是我所见记录“介绍TPL数据流”的模式tpldataflow.docx 。

我能做些什么去的这条底线? 是否有可能帮助推断出发生了什么任何的指标? 如果我不能创建一个可靠的测试情况下,有什么更多的信息可以提供我?

救命!

Answer 1:

斯蒂芬似乎认为以下是解决方案

VAR米=等待messageQueue.ReceiveAsync();

代替:

VAR米=等待messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));

你能否证实或否认这一点?



文章来源: Apparent BufferBlock.Post/Receive/ReceiveAsync race/bug