I'm trying to get my head around controlling dataflow in TPL Dataflow. I have a very fast producer, and a very slow consumer. (My real code is more complex, but none the less, this is a pretty good model and it reproduces the problem.)
When I run it, the code starts drinking memory like it's going out of style--and the output queue on the producer fills up as fast as it can. What I'd really prefer to see is the Producer stop running for a while, until the Consumer has a chance to ask for it. From my readings of the documentation, this is what is supposed to happen: that is, I thought that the producer waits until the consumer has space.
This isn't the case, clearly. How do I fix it so that the queue doesn't go crazy?
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using System.Threading;
namespace MemoryLeakTestCase
{
class Program
{
static void Main(string[] args)
{
var CreateData = new TransformManyBlock<int, string>(ignore =>
{
return Enumerable.Range(0, 1000 * 1000 * 1000).Select((s,i) => "Hello, World " + i);
});
var ParseFile = new TransformManyBlock<string, string>(fileContent =>
{
Thread.Sleep(1000);
return Enumerable.Range(0, 100).Select((sst, iii) => "Hello, " + iii);
}, new ExecutionDataflowBlockOptions() { BoundedCapacity = 1000 }
);
var EndOfTheLine = new ActionBlock<object>(f =>
{
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true, };
CreateData.LinkTo(ParseFile, linkOptions);
ParseFile.LinkTo(EndOfTheLine, linkOptions);
Task t = new Task(() =>
{
while (true)
{
Console.WriteLine("CreateData: " + Report(CreateData));
Console.WriteLine("ParseData: " + Report(ParseFile));
Console.WriteLine("NullTarget: " + EndOfTheLine.InputCount );
Thread.Sleep(1000);
}
});
t.Start();
CreateData.SendAsync(0);
CreateData.Complete();
EndOfTheLine.Completion.Wait();
}
public static string Report<T, U>(TransformManyBlock<T, U> block)
{
return String.Format("INPUT: {0} OUTPUT: {1} ", block.InputCount.ToString().PadLeft(10, ' '), block.OutputCount.ToString().PadLeft(10, ' '));
}
}
}