跨发布到http://social.msdn.microsoft.com/Forums/en-US/tpldataflow/thread/89b3f71d-3777-4fad-9c11-50d8dc81a4a9
我知道......我真的不使用TplDataflow其最大潜力。 ATM我只是使用BufferBlock
为安全队列消息传递,在生产者和消费者都以不同的速率运行。 我看到一些奇怪的行为让我难倒就如何继续。
private BufferBlock<object> messageQueue = new BufferBlock<object>();
public void Send(object message)
{
var accepted=messageQueue.Post(message);
logger.Info("Send message was called qlen = {0} accepted={1}",
messageQueue.Count,accepted);
}
public async Task<object> GetMessageAsync()
{
try
{
var m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(30));
//despite messageQueue.Count>0 next line
//occasionally does not execute
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
在上面的代码(其是2000线的分布式解决方案的一部分), Send
被定期调用每100ms左右。 这意味着一个产品Post
编到messageQueue
在大约10次。 这验证。 但是,有时似乎ReceiveAsync
不超时(即内完成Post
不会导致ReceiveAsync
完成)和TimeoutException
为30秒后升起。 在这一点上, messageQueue.Count
是数百人。 这是出乎意料的。 这个问题已经在发布过的速度较慢被观察到(1篇帖子/秒),前1000个项目已经通过了通过通常发生BufferBlock
。
因此,要解决这个问题,我使用下面的代码,它的工作原理,但偶尔会导致接收时(由于上述发生的错误)1秒的延迟
public async Task<object> GetMessageAsync()
{
try
{
object m;
var attempts = 0;
for (; ; )
{
try
{
m = await messageQueue.ReceiveAsync(TimeSpan.FromSeconds(1));
}
catch (TimeoutException)
{
attempts++;
if (attempts >= 30) throw;
continue;
}
break;
}
logger.Info("message received");
//.......
}
catch(TimeoutException)
{
//do something
}
}
这看起来像在TDF竞争条件给我,但我不能得到的,为什么这不会在其他地方,我使用出现底部BufferBlock
以类似的方式。 从实验上改变ReceiveAsync
到Receive
于事无补。 我没有检查,但我想孤立地看,上面的代码完美的作品。 这是我所见记录“介绍TPL数据流”的模式tpldataflow.docx 。
我能做些什么去的这条底线? 是否有可能帮助推断出发生了什么任何的指标? 如果我不能创建一个可靠的测试情况下,有什么更多的信息可以提供我?
救命!