In the following code I'm using the CancellationToken to wake up the GetConsumingEnumerable() when the producer is not producing and I want to break out of the foreach and exit the Task. But I dont see IsCancellationRequested being logged and my Task.Wait(timeOut) waits for the full timeOut period. What am I doing wrong?
userToken.Task = Task.Factory.StartNew(state =>
{
userToken.CancelToken = new CancellationTokenSource();
foreach (var broadcast in userToken.BroadcastQueue.GetConsumingEnumerable(userToken.CancelToken.Token))
{
if (userToken.CancelToken.IsCancellationRequested)
{
Log.Write("BroadcastQueue IsCancellationRequested");
break;
...
}
}
return 0;
}, "TaskSubscribe", TaskCreationOptions.LongRunning);
later...
UserToken.CancelToken.Cancel();
try
{
task.Wait(timeOut);
}
catch (AggregateException ar)
{
Log.Write("AggregateException " + ar.InnerException, MsgType.InfoMsg);
}
catch (OperationCanceledException)
{
Log.Write("BroadcastQueue Cancelled", MsgType.InfoMsg);
}
You could use CompleteAdding()
which signifies that no more items will be added to the collection. If GetConsumingEnumerable
is used, the foreach will end gracefully as it will know there's no point in waiting for more items.
Basically once you have finished adding items to the BlockingCollection
just do:
myBlockingCollection.CompleteAdding()
Any threads which are doing a foreach loop with GetConsumingEnumerable will stop looping.
I've created quick prototype, and it seems work for me.
Note Thread.Sleep(1000) right before token cancel request.
It is possible that you are creating a race condition for Token variable, since you create and access item.CancelToken
variable in different threads.
E.g. the code that is intended to cancel task might invoke cancel on wrong (previous or null) cancellation token.
static void Main(string[] args)
{
CancellationTokenSource token = null;
BlockingCollection<string> coll = new BlockingCollection<string>();
var t = Task.Factory.StartNew(state =>
{
token = new CancellationTokenSource();
try
{
foreach (var broadcast in coll.GetConsumingEnumerable(token.Token))
{
if (token.IsCancellationRequested)
{
return;
}
}
}
catch (OperationCanceledException)
{
Console.WriteLine("Cancel");
return;
}
}, "TaskSubscribe", TaskCreationOptions.LongRunning);
Thread.Sleep(1000);
token.Cancel();
t.Wait();
}