Gracefully killing a TPL Task

2019-06-10 00:48发布

问题:

I'm attempting to build an API like thing against a set of REST services. There are basically different operations that report their progress and support cancelation. I opted to use the TPL to make async functions and managed load. Here is my most basic operation that is to be inherited accordingly:

using FileOnlineCore.Enums;
using FileOnlineCore.EventArguments;
using FileOnlineCore.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace FileOnlineCore.Operations {

    public abstract class BaseOperation {

        #region Store
        private readonly Task _InnerTask;
        private readonly CancellationTokenSource _InnerCancelTokenSource;
        private readonly CancellationToken _InnerCancelToken;

        public Task InnerTask {
            get { return _InnerTask; }
        }

        protected CancellationTokenSource InnerCancelTokenSource {
            get { return _InnerCancelTokenSource; }
        }

        protected CancellationToken InnerCancelToken {
            get { return _InnerCancelToken; }
        }

        public Int32 TaskId {
            get { return _InnerTask != null ? _InnerTask.Id : -1; }
        }

        public OperationRunningStatus Status { get; set; }
        public OperationType TaskType { get; set; }
        public OperationProgressReportEventHandler onProgressReport;
        public OperationCompletedEventHandler onOperationCompleted;
        #endregion

        public BaseOperation() {
            _InnerCancelTokenSource = new CancellationTokenSource();
            _InnerCancelToken = _InnerCancelTokenSource.Token;
            _InnerTask = new Task(() => PerformWork(), _InnerCancelToken, TaskCreationOptions.None);
            Status = OperationRunningStatus.Idle;
        }

        public void Start() {
            _InnerTask.Start();
            Status = OperationRunningStatus.Working;
        }

        public void Cancel() {
            _InnerCancelTokenSource.Cancel();
            Status = OperationRunningStatus.Completed;
        }

        protected abstract void PerformWork();

        protected void ReportProgress(int Progress) {
            if (onProgressReport == null)
                return;

            onProgressReport(new OperationProgressEventArg { 
                Progress = Progress, 
                TaskId = TaskId
            });
        }

        protected void TaskCompleted(RemoteOperationEventArg arg) {
            if (onOperationCompleted == null)
                return;

            onOperationCompleted(arg);
            Status = OperationRunningStatus.Completed;
        }

    }

}

As you can see, I'm using delegates to accomplish the requirements. Just like a Thread class, the Task class cannot be inherited so a Task object is actually composed within my BaseOperation. Here is an actual operation implemented:

using FileOnlineCore.EventArguments;
using FileOnlineCore.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FileOnlineCore.Objects;
using FileOnlineCore.Enums;

namespace FileOnlineCore.Operations {

    public class DownloadObjectOperation : BaseOperation {

        private readonly FileSystemObject _FileSystemObject;
        private readonly String _LocalFolder;

        public FileSystemObjectDownloadCompletedEventHandler onDownloadCompleted;

        public DownloadObjectOperation(FileSystemObject FileSystemObject, String LocalFolder){
            TaskType = OperationType.DownloadFile;

            _FileSystemObject = FileSystemObject;
            _LocalFolder = LocalFolder;
        }

        protected override void PerformWork() {
            try {
                ReportProgress(0);

                for (int i = 1; i <= 100; i+=5) {
                    // Check for the cancellation to be signaled
                    InnerCancelToken.ThrowIfCancellationRequested();

                    // Write out a little bit showing the progress of the task
                    //Console.WriteLine("Task {0}: {1}/100 In progress", Task.CurrentId, i + 1);
                    Thread.Sleep(50); // Simulate doing some work

                    // Report progress of the work.
                    ReportProgress(i);
                }

                // By getting here the task will RunToCompletion even if the token has been signaled.

                base.TaskCompleted(new RemoteOperationEventArg { 
                    Error = null, 
                    ResultSource = OperationResultSource.Unknown, 
                    Status = OperationExitStatus.Success, 
                    TaskId = TaskId 
                });

                this.DownloadTaskCompleted(new DownloadObjectOperationEventArg { 
                    Error = null, 
                    ResultSource = OperationResultSource.Unknown, 
                    Status = OperationExitStatus.Success, 
                    TaskId = TaskId, 
                    ObtainedFileSystemObject = null
                }, _LocalFolder, "TheFileName.Extension");
            }
            catch (OperationCanceledException exp_Canceled) {
                // Any clean up code goes here.

                base.TaskCompleted(new RemoteOperationEventArg {
                    Error = exp_Canceled,
                    ResultSource = OperationResultSource.Unknown,
                    Status = OperationExitStatus.Error,
                    TaskId = TaskId
                });

                this.DownloadTaskCompleted(new DownloadObjectOperationEventArg {
                    Error = exp_Canceled,
                    ResultSource = OperationResultSource.Unknown,
                    Status = OperationExitStatus.Error,
                    TaskId = TaskId,
                    ObtainedFileSystemObject = null
                }, _LocalFolder, "TheFileName.Extension");

                // To ensure that the calling code knows the task was cancelled
                //throw; 
            }
            catch (Exception exp) {
                // Clean up other stuff

                base.TaskCompleted(new RemoteOperationEventArg { 
                    Error = exp, 
                    ResultSource = OperationResultSource.Unknown, 
                    Status = OperationExitStatus.Error, 
                    TaskId = TaskId 
                });

                this.DownloadTaskCompleted( new DownloadObjectOperationEventArg { 
                    Error = exp, 
                    ResultSource = OperationResultSource.Unknown, 
                    Status = OperationExitStatus.Error, 
                    TaskId = TaskId 
                }, _LocalFolder, "TheFileName.Extension");

                // If the calling code also needs to know.
                //throw;
            }
        }

        protected void DownloadTaskCompleted(DownloadObjectOperationEventArg arg, String LocalFolder, String FileName) {
            if (onDownloadCompleted == null)
                return;

            onDownloadCompleted(arg, LocalFolder, FileName);
            Status = OperationRunningStatus.Completed;


        }

    }

}

Now, here is a main class that exposes the single method that, in turns, does all the plumbing to create the DownloadObjectOperation object and make it work:

using FileOnlineCore.Enums;
using FileOnlineCore.Events;
using FileOnlineCore.Objects;
using FileOnlineCore.Operations;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace FileOnlineCore {

    public sealed class FileOnline {

        // Construct a task scheduler from the current SynchronizationContext (UI thread)
        private TaskScheduler OperationTaskScheduler;

        // Construct a new TaskFactory using our UI scheduler
        private TaskFactory OperationFactory;

        //Something to manage my operations
        private List<BaseOperation> OperationPool;

        //Originally here to massively cancel all operations. Epic fail so far.
        private CancellationTokenSource CancelAllTokenSource;
        private CancellationToken CancelAllToken;

        public FileOnline() {
            InitializeStore();
        }

        private void InitializeStore() {
            OperationPool = new List<BaseOperation>();      

            CancelAllTokenSource = new CancellationTokenSource();
            CancelAllToken = CancelAllTokenSource.Token;

            OperationTaskScheduler = TaskScheduler.Default;
            OperationFactory = new TaskFactory(CancelAllToken, TaskCreationOptions.PreferFairness, TaskContinuationOptions.ExecuteSynchronously, OperationTaskScheduler);

            //Low leve thrading stuff for performence optimization
            //int workerThreads, complete;
            //ThreadPool.GetMinThreads(out workerThreads, out complete);
            //Console.WriteLine("Idle worker threads: " + workerThreads);

            // Comment out this line to see the difference. With this commented out, the second iteration will be immediate
            //ThreadPool.SetMinThreads(100, complete);
        }


        public Int32 DownloadFileSystemObject(FileSystemObject FileSystemObject, String LocalFolder, Boolean Overwrite, OperationProgressReportEventHandler onOperationProgressReport, FileSystemObjectDownloadCompletedEventHandler onOperationCompleted) {

            var DownloadOp = new DownloadObjectOperation(FileSystemObject, LocalFolder) {
                onProgressReport = onOperationProgressReport,
                onDownloadCompleted = onOperationCompleted
            };

            OperationPool.Add(DownloadOp);

            OperationFactory.StartNew(() => {
                DownloadOp.Start();
            }).ContinueWith(t => {
                DownloadOp.InnerTask.Dispose(); //Exception!
                t.Dispose();
                OperationPool.Remove(DownloadOp);
                DownloadOp = null;
            });

            return DownloadOp.TaskId;
        }

        ...

        public void CancelTask(Int32 TaskId) {
            var FoundOperation = OperationPool.SingleOrDefault(where => where.TaskId == TaskId);

            if (FoundOperation != null && FoundOperation.InnerTask.Status == TaskStatus.Running && FoundOperation.Status == OperationRunningStatus.Working) {
                FoundOperation.Cancel();
            }
        }
        public void CancelAllTasks() {
            OperationPool.ForEach(Item => {
                if (Item != null && Item.InnerTask.Status == TaskStatus.Running && Item.Status == OperationRunningStatus.Working) {
                    Item.Cancel();
                }
            });
        }

    }

}

Now I can simply skip the OperationFactory (my custom TaskFactory) and start the DownloadObjectOperation.Start() directly but from what I've seen on the web, The TaskFatory does load balancing internally depending upon the ThreadPool.

If I skip the OperationFactory, I'm able to chain the ContnueWith easily like so:

DownloadOp.InnerTask.ContinueWith(t => {
                t.Dispose();
                OperationPool.Remove(DownloadOp);
                DownloadOp = null;
            });

However, using the OperationFactory, I get an exception as the sequence doesn't wait for the process to complete and immediately jumps into the ContinueWith block. Now DownloadOp.InnerTask.Dispose(); cause an exception as the actual internal task is still running.

I need to gracefully end my tasks as this API is going to be used by a large number of people. If the Tasks or my BaseOperations are not properly disposed, I'm afraid the server will hang.