Just wrote a sample producer consumer pattern using TPL DataFlow. I have some basic questions here.
The consumer is active only after all the items are posted from the producer. Does asynchronous means that both produce and consume tasks can run in parallel.
Given a sleep time in consumer to verify if its blocking other data items. It seems to be executing sequentially and not getting any parallelism.
Am I doing something wrong here?
class AscDataBlocks
{
public Int64 start;
public Int64 End;
//public string ThreadName;
public void AscBufferProducer(ITargetBlock<Int64> targetAscTransform)
// This is using TPL DataBlock producer consumer pattern.
{
for (var i = start; i < End; i++)
{
Console.WriteLine("Postingasc : {0}", i);
targetAscTransform.Post(i);
}
}
public void ProcessDataBuffer(Int64 ascDataSet)
{
if (ascDataSet == 5)
// Testing if this will delay all the other data processing
Thread.Sleep(5000);
else
Thread.Sleep(500);
Console.WriteLine(ascDataSet);
}
// Demonstrates the consumption end of the producer and consumer pattern.
public async Task<Int64> AscTransConsumerAsync(IReceivableSourceBlock<Int64> source)
{
// Initialize a counter to track the number of bytes that are processed.
int status = 0;
// Read from the source buffer until the source buffer has no
// available output data.
while (await source.OutputAvailableAsync())
{
Int64 data;
source.TryReceive(out data);
ProcessDataBuffer(data);//This function processed the data buffer for ascollection and writes the data to the database.
// Increment the count of bytes received.
status = 1;
}
return status;
}
}
static void Main(string[] args)
{
AscDataBlocks ascb;
BufferBlock<Int64> ascbuffer;
System.Threading.Tasks.Task<Int64> ascProcessingconsumer;
CancellationToken ct = new CancellationToken();
CancellationTokenSource cts = new CancellationTokenSource();
ascb = new AscDataBlocks();
ascb.start = 1;
ascb.End = 100;
try
{
ascbuffer = new BufferBlock<Int64>(new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
CancellationToken = ct
});// Initiallize the buffer bloack
ascProcessingconsumer = ascb.AscTransConsumerAsync(ascbuffer); //Initialize the consumer.
//Post source data to the dataflow block.
ascb.AscBufferProducer(ascbuffer);
ascProcessingconsumer.Wait();
}
catch (Exception ex)
{
//foreach (var v in ex.InnerExceptions)
// Console.WriteLine("msg: " + v.Message);
}
}