Apparent BufferBlock.Post/Receive/ReceiveAsync rac

2019-04-18 11:59发布

问题:

cross-posted to http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9

I know... I'm not really using TplDataflow to its maximum potential. ATM I'm simply using BufferBlock as a safe queue for message passing, where producer and consumer are running at different rates. I'm seeing some strange behaviour that leaves me stumped as to how to proceed.

private BufferBlock<object> messageQueue = new BufferBlock<object>();

public void Send(object message)
{
    var accepted=messageQueue.Post(message);
    logger.Info("Send message was called qlen = {0} accepted={1}",
    messageQueue.Count,accepted);
}

public async Task<object> GetMessageAsync()
{
    try
    {
        var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
        //despite messageQueue.Count>0 next line 
        //occasionally does not execute
        logger.Info("message received");
        //.......
    }
    catch(TimeoutException)
    {
        //do something
    }
}

In the code above (which is part of a 2000 line distributed solution), Send is being called periodically every 100ms or so. This means an item is Posted to messageQueue at around 10 times a second. This is verified. However, occasionally it appears that ReceiveAsync does not complete within the timeout (i.e. the Post is not causing ReceiveAsync to complete) and TimeoutException is being raised after 30s. At this point, messageQueue.Count is in the hundreds. This is unexpected. This problem has been observed at slower rates of posting too (1 post/second) and usually happens before 1000 items have passed through the BufferBlock.

So, to work around this issue, I am using the following code, which works, but occasionally causes 1s latency when receiving (due to the bug above occurring)

    public async Task<object> GetMessageAsync()
    {
        try
        {
            object m;
            var attempts = 0;
            for (; ; )
            {
                try
                {
                    m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
                }
                catch (TimeoutException)
                {
                    attempts++;
                    if (attempts >= 30) throw;
                    continue;
                }
                break;

            }

            logger.Info("message received");
            //.......
        }
        catch(TimeoutException)
        {
            //do something
        }
   }

This looks like a race condition in TDF to me, but I can't get to the bottom of why this doesn't occur in the other places where I use BufferBlock in a similar fashion. Experimentally changing from ReceiveAsync to Receive doesn't help. I haven't checked, but I imagine in isolation, the code above works perfectly. It's a pattern I've seen documented in "Introduction to TPL Dataflow" tpldataflow.docx.

What can I do to get to the bottom of this? Are there any metrics that might help infer what's happening? If I can't create a reliable test case, what more information can I offer?

Help!

回答1:

Stephen seems to think the following is the solution

var m = await messageQueue.ReceiveAsync();

instead of:

var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));

Can you confirm or deny this?