Cancel Specific Task using CancellationToken

2019-08-09 02:45发布

I have the following Loop which may or may not create a series of tasks dynamically:

While(CreateNewTask == true)
{
 if(isWorkerFree() && isValidJob() && isExecutable())
 {
   CancellationTokenSource cs = new  CancellationTokenSource();
   var myTask = Task.Run(() => Execute(cs.token);
 }
}

Now since these tasks are created dynamically, how can I track them and cancel a specific task or send a cancellation token to a specific task? There may be 6-7 tasks running at any time, I need the functionality to know which ones are running and cancel a specific one.

标签: c# .net task
2条回答
放我归山
2楼-- · 2019-08-09 03:23

You can track each such task using a DTO: class Item { Task Task; CancellationTokenSource CTS; }. Keep a list of those items. You can then cancel them at will.

查看更多
贪生不怕死
3楼-- · 2019-08-09 03:29

TL;DR;
I think that the TPL Dataflow (https://msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx) is a better choice here but I will answer using the TPL

Answer
To limit the concurrency, what you need is a scheduler that limits concurrency. I suggest that you look at https://msdn.microsoft.com/library/system.threading.tasks.taskscheduler.aspx and search for LimitedConcurrencyLevelTaskScheduler.

The code below is a simple example of what you are trying to achieve.

[TestClass]
public class UnitTest1
{
    [TestMethod]
    public async Task TestMethod1()
    {
        var factoryCancellation = new CancellationTokenSource();
        var scheduler = new LimitedConcurrencyLevelTaskScheduler(maxDegreeOfParallelism: 7);
        var taskFactory = new TaskFactory(factoryCancellation.Token, TaskCreationOptions.None, TaskContinuationOptions.None, scheduler);

        var taskCancellation1 = new CancellationTokenSource();
        var taskCancellation2 = new CancellationTokenSource();
        var token1 = taskCancellation1.Token;
        var token2 = taskCancellation2.Token;

        var myTask1 = taskFactory.StartNew(async () => await Execute(0, token1), token1).Unwrap();
        var myTask2 = taskFactory.StartNew(async () => await Execute(1, token2), token2).Unwrap();

        taskCancellation1.CancelAfter(500);

        try
        {
            await Task.WhenAll(myTask1, myTask2);
        }
        catch
        {
            //ThrowIfCancellationRequested Exception
        }
    }

    private async Task Execute(int i, CancellationToken token)
    {
        Console.WriteLine($"Running Task {i} : Before Delay 1");
        await Task.Delay(1000);
        token.ThrowIfCancellationRequested();
        Console.WriteLine($"Running Task {i} : Before Delay 2");
        await Task.Delay(1000);
        token.ThrowIfCancellationRequested();
        Console.WriteLine($"Running Task {i} : Before Delay 3");
        await Task.Delay(1000);
        token.ThrowIfCancellationRequested();
    }
}

This will generate the following log

QueueTask 1  WaitingToRun
QueueTask 2  WaitingToRun
TryExecuteTask Start 1  WaitingToRun CreationOptions None
TryExecuteTask Start 2  WaitingToRun CreationOptions None
Running Task 1 : Before Delay 1
Running Task 0 : Before Delay 1
TryExecuteTask End 2 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTask End 1 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTaskInline Start 5 taskWasPreviouslyQueued False
TryExecuteTaskInline End 5 WaitingToRun IsCanceled False IsCompleted False IsFaulted False
QueueTask 5 System.Action WaitingToRun
TryExecuteTask Start 5 System.Action WaitingToRun CreationOptions None
Running Task 1 : Before Delay 2
TryExecuteTask End 5 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTaskInline Start 6 taskWasPreviouslyQueued False
TryExecuteTaskInline End 6 WaitingToRun IsCanceled False IsCompleted False IsFaulted False
QueueTask 6 System.Action WaitingToRun
TryExecuteTask Start 6 System.Action WaitingToRun CreationOptions None
TryExecuteTaskInline Start 8 taskWasPreviouslyQueued False
TryExecuteTaskInline End 8 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTask End 6 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTaskInline Start 12 taskWasPreviouslyQueued False
TryExecuteTaskInline End 12 WaitingToRun IsCanceled False IsCompleted False IsFaulted False
QueueTask 12 System.Action WaitingToRun
TryExecuteTask Start 12 System.Action WaitingToRun CreationOptions None
Running Task 1 : Before Delay 3
TryExecuteTask End 12 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTaskInline Start 14 taskWasPreviouslyQueued False
TryExecuteTaskInline End 14 WaitingToRun IsCanceled False IsCompleted False IsFaulted False
QueueTask 14 System.Action WaitingToRun
TryExecuteTask Start 14 System.Action WaitingToRun CreationOptions None
TryExecuteTaskInline Start 16 taskWasPreviouslyQueued False
TryExecuteTaskInline End 16 RanToCompletion IsCanceled False IsCompleted True IsFaulted False
TryExecuteTask End 14 RanToCompletion IsCanceled False IsCompleted True IsFaulted False

You can see that the Task 0 is cancelled as soon as possible and Task 1 continue to process. This example does not show but will never be more than 7 concurrent tasks.

Unfortunately the TAP pattern does not works with the AttachToParent option of the Scheduler, or this code could be even more clean. See: TaskCreationOptions.AttachedToParent is not waiting for child task

And to manage the CancellationTokens you can create a specific TaskFactory that allows something like this:

taskFactory.StartNew(0 , () => {...});
taskFactory.Cancel(0);

None of the TaskFactory methods are virtual so you will have to create overloaded method.

查看更多
登录 后发表回答