刚刚撰写了使用TPL数据流的样品生产者消费者模式。 我有一些基本的问题在这里。
消费者是积极的所有项目都从生产者发布之后。 是否异步意味着这两个生产和消费的任务可以并行运行。
鉴于消费者的睡眠时间,以验证其是否阻塞其他数据项。 这似乎是在执行顺序,并没有得到任何的并行性。
难道我做错了什么吗?
class AscDataBlocks
{
public Int64 start;
public Int64 End;
//public string ThreadName;
public void AscBufferProducer(ITargetBlock<Int64> targetAscTransform)
// This is using TPL DataBlock producer consumer pattern.
{
for (var i = start; i < End; i++)
{
Console.WriteLine("Postingasc : {0}", i);
targetAscTransform.Post(i);
}
}
public void ProcessDataBuffer(Int64 ascDataSet)
{
if (ascDataSet == 5)
// Testing if this will delay all the other data processing
Thread.Sleep(5000);
else
Thread.Sleep(500);
Console.WriteLine(ascDataSet);
}
// Demonstrates the consumption end of the producer and consumer pattern.
public async Task<Int64> AscTransConsumerAsync(IReceivableSourceBlock<Int64> source)
{
// Initialize a counter to track the number of bytes that are processed.
int status = 0;
// Read from the source buffer until the source buffer has no
// available output data.
while (await source.OutputAvailableAsync())
{
Int64 data;
source.TryReceive(out data);
ProcessDataBuffer(data);//This function processed the data buffer for ascollection and writes the data to the database.
// Increment the count of bytes received.
status = 1;
}
return status;
}
}
static void Main(string[] args)
{
AscDataBlocks ascb;
BufferBlock<Int64> ascbuffer;
System.Threading.Tasks.Task<Int64> ascProcessingconsumer;
CancellationToken ct = new CancellationToken();
CancellationTokenSource cts = new CancellationTokenSource();
ascb = new AscDataBlocks();
ascb.start = 1;
ascb.End = 100;
try
{
ascbuffer = new BufferBlock<Int64>(new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 5,
CancellationToken = ct
});// Initiallize the buffer bloack
ascProcessingconsumer = ascb.AscTransConsumerAsync(ascbuffer); //Initialize the consumer.
//Post source data to the dataflow block.
ascb.AscBufferProducer(ascbuffer);
ascProcessingconsumer.Wait();
}
catch (Exception ex)
{
//foreach (var v in ex.InnerExceptions)
// Console.WriteLine("msg: " + v.Message);
}
}