Just wrote a sample producer consumer pattern using TPL DataFlow. I have some basic questions here.
The consumer is active only after all the items are posted from the producer. Does asynchronous means that both produce and consume tasks can run in parallel.
Given a sleep time in consumer to verify if its blocking other data items. It seems to be executing sequentially and not getting any parallelism.
Am I doing something wrong here?
class AscDataBlocks
{
public Int64 start;
public Int64 End;
//public string ThreadName;
public void AscBufferProducer(ITargetBlock<Int64> targetAscTransform)
// This is using TPL DataBlock producer consumer pattern.
{
for (var i = start; i < End; i++)
{
Console.WriteLine("Postingasc : {0}", i);
targetAscTransform.Post(i);
}
}
public void ProcessDataBuffer(Int64 ascDataSet)
{
if (ascDataSet == 5)
// Testing if this will delay all the other data processing
Thread.Sleep(5000);
else
Thread.Sleep(500);
Console.WriteLine(ascDataSet);
}
// Demonstrates the consumption end of the producer and consumer pattern.
public async Task<Int64> AscTransConsumerAsync(IReceivableSourceBlock<Int64> source)
{
// Initialize a counter to track the number of bytes that are processed.
int status = 0;
// Read from the source buffer until the source buffer has no
// available output data.
while (await source.OutputAvailableAsync())
{
Int64 data;
source.TryReceive(out data);
ProcessDataBuffer(data);//This function processed the data buffer for ascollection and writes the data to the database.
// Increment the count of bytes received.
status = 1;
}
return status;
}
}
static void Main(string[] args)
{
AscDataBlocks ascb;
BufferBlock<Int64> ascbuffer;
System.Threading.Tasks.Task<Int64> ascProcessingconsumer;
CancellationToken ct = new CancellationToken();
CancellationTokenSource cts = new CancellationTokenSource();
ascb = new AscDataBlocks();
ascb.start = 1;
ascb.End = 100;
try
{
ascbuffer = new BufferBlock<Int64>(new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
CancellationToken = ct
});// Initiallize the buffer bloack
ascProcessingconsumer = ascb.AscTransConsumerAsync(ascbuffer); //Initialize the consumer.
//Post source data to the dataflow block.
ascb.AscBufferProducer(ascbuffer);
ascProcessingconsumer.Wait();
}
catch (Exception ex)
{
//foreach (var v in ex.InnerExceptions)
// Console.WriteLine("msg: " + v.Message);
}
}
This happens because you post all your items very quickly, before the consumer has a chance to start. If you added
Thread.Sleep(100)
, you would see that they actually do work in parallel.TPL Dataflow is not magic: it won't modify your code to execute in parallel. It's you who calls
AscTransConsumerAsync()
once, so don't be surprised that it actually executes only once.TDF does support processing in parallel, but you would need to actually let it execute the processing code. To do this, use one of the execution blocks. In your case
ActionBlock
seems appropriate.If you use that, you can then configure the block to execute in parallel by setting
MaxDegreeOfParallelism
. Of course, doing that means you need to ensure that the processing delegate is thread-safe.With that,
AscTransConsumerAsync()
might now look something like: