I'm attempting to build an API like thing against a set of REST services. There are basically different operations that report their progress and support cancelation. I opted to use the TPL to make async functions and managed load. Here is my most basic operation that is to be inherited accordingly:
using FileOnlineCore.Enums;
using FileOnlineCore.EventArguments;
using FileOnlineCore.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace FileOnlineCore.Operations {
public abstract class BaseOperation {
#region Store
private readonly Task _InnerTask;
private readonly CancellationTokenSource _InnerCancelTokenSource;
private readonly CancellationToken _InnerCancelToken;
public Task InnerTask {
get { return _InnerTask; }
}
protected CancellationTokenSource InnerCancelTokenSource {
get { return _InnerCancelTokenSource; }
}
protected CancellationToken InnerCancelToken {
get { return _InnerCancelToken; }
}
public Int32 TaskId {
get { return _InnerTask != null ? _InnerTask.Id : -1; }
}
public OperationRunningStatus Status { get; set; }
public OperationType TaskType { get; set; }
public OperationProgressReportEventHandler onProgressReport;
public OperationCompletedEventHandler onOperationCompleted;
#endregion
public BaseOperation() {
_InnerCancelTokenSource = new CancellationTokenSource();
_InnerCancelToken = _InnerCancelTokenSource.Token;
_InnerTask = new Task(() => PerformWork(), _InnerCancelToken, TaskCreationOptions.None);
Status = OperationRunningStatus.Idle;
}
public void Start() {
_InnerTask.Start();
Status = OperationRunningStatus.Working;
}
public void Cancel() {
_InnerCancelTokenSource.Cancel();
Status = OperationRunningStatus.Completed;
}
protected abstract void PerformWork();
protected void ReportProgress(int Progress) {
if (onProgressReport == null)
return;
onProgressReport(new OperationProgressEventArg {
Progress = Progress,
TaskId = TaskId
});
}
protected void TaskCompleted(RemoteOperationEventArg arg) {
if (onOperationCompleted == null)
return;
onOperationCompleted(arg);
Status = OperationRunningStatus.Completed;
}
}
}
As you can see, I'm using delegates
to accomplish the requirements. Just like a Thread
class, the Task
class cannot be inherited so a Task
object is actually composed within my BaseOperation
. Here is an actual operation implemented:
using FileOnlineCore.EventArguments;
using FileOnlineCore.Events;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using FileOnlineCore.Objects;
using FileOnlineCore.Enums;
namespace FileOnlineCore.Operations {
public class DownloadObjectOperation : BaseOperation {
private readonly FileSystemObject _FileSystemObject;
private readonly String _LocalFolder;
public FileSystemObjectDownloadCompletedEventHandler onDownloadCompleted;
public DownloadObjectOperation(FileSystemObject FileSystemObject, String LocalFolder){
TaskType = OperationType.DownloadFile;
_FileSystemObject = FileSystemObject;
_LocalFolder = LocalFolder;
}
protected override void PerformWork() {
try {
ReportProgress(0);
for (int i = 1; i <= 100; i+=5) {
// Check for the cancellation to be signaled
InnerCancelToken.ThrowIfCancellationRequested();
// Write out a little bit showing the progress of the task
//Console.WriteLine("Task {0}: {1}/100 In progress", Task.CurrentId, i + 1);
Thread.Sleep(50); // Simulate doing some work
// Report progress of the work.
ReportProgress(i);
}
// By getting here the task will RunToCompletion even if the token has been signaled.
base.TaskCompleted(new RemoteOperationEventArg {
Error = null,
ResultSource = OperationResultSource.Unknown,
Status = OperationExitStatus.Success,
TaskId = TaskId
});
this.DownloadTaskCompleted(new DownloadObjectOperationEventArg {
Error = null,
ResultSource = OperationResultSource.Unknown,
Status = OperationExitStatus.Success,
TaskId = TaskId,
ObtainedFileSystemObject = null
}, _LocalFolder, "TheFileName.Extension");
}
catch (OperationCanceledException exp_Canceled) {
// Any clean up code goes here.
base.TaskCompleted(new RemoteOperationEventArg {
Error = exp_Canceled,
ResultSource = OperationResultSource.Unknown,
Status = OperationExitStatus.Error,
TaskId = TaskId
});
this.DownloadTaskCompleted(new DownloadObjectOperationEventArg {
Error = exp_Canceled,
ResultSource = OperationResultSource.Unknown,
Status = OperationExitStatus.Error,
TaskId = TaskId,
ObtainedFileSystemObject = null
}, _LocalFolder, "TheFileName.Extension");
// To ensure that the calling code knows the task was cancelled
//throw;
}
catch (Exception exp) {
// Clean up other stuff
base.TaskCompleted(new RemoteOperationEventArg {
Error = exp,
ResultSource = OperationResultSource.Unknown,
Status = OperationExitStatus.Error,
TaskId = TaskId
});
this.DownloadTaskCompleted( new DownloadObjectOperationEventArg {
Error = exp,
ResultSource = OperationResultSource.Unknown,
Status = OperationExitStatus.Error,
TaskId = TaskId
}, _LocalFolder, "TheFileName.Extension");
// If the calling code also needs to know.
//throw;
}
}
protected void DownloadTaskCompleted(DownloadObjectOperationEventArg arg, String LocalFolder, String FileName) {
if (onDownloadCompleted == null)
return;
onDownloadCompleted(arg, LocalFolder, FileName);
Status = OperationRunningStatus.Completed;
}
}
}
Now, here is a main class that exposes the single method that, in turns, does all the plumbing to create the DownloadObjectOperation
object and make it work:
using FileOnlineCore.Enums;
using FileOnlineCore.Events;
using FileOnlineCore.Objects;
using FileOnlineCore.Operations;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace FileOnlineCore {
public sealed class FileOnline {
// Construct a task scheduler from the current SynchronizationContext (UI thread)
private TaskScheduler OperationTaskScheduler;
// Construct a new TaskFactory using our UI scheduler
private TaskFactory OperationFactory;
//Something to manage my operations
private List<BaseOperation> OperationPool;
//Originally here to massively cancel all operations. Epic fail so far.
private CancellationTokenSource CancelAllTokenSource;
private CancellationToken CancelAllToken;
public FileOnline() {
InitializeStore();
}
private void InitializeStore() {
OperationPool = new List<BaseOperation>();
CancelAllTokenSource = new CancellationTokenSource();
CancelAllToken = CancelAllTokenSource.Token;
OperationTaskScheduler = TaskScheduler.Default;
OperationFactory = new TaskFactory(CancelAllToken, TaskCreationOptions.PreferFairness, TaskContinuationOptions.ExecuteSynchronously, OperationTaskScheduler);
//Low leve thrading stuff for performence optimization
//int workerThreads, complete;
//ThreadPool.GetMinThreads(out workerThreads, out complete);
//Console.WriteLine("Idle worker threads: " + workerThreads);
// Comment out this line to see the difference. With this commented out, the second iteration will be immediate
//ThreadPool.SetMinThreads(100, complete);
}
public Int32 DownloadFileSystemObject(FileSystemObject FileSystemObject, String LocalFolder, Boolean Overwrite, OperationProgressReportEventHandler onOperationProgressReport, FileSystemObjectDownloadCompletedEventHandler onOperationCompleted) {
var DownloadOp = new DownloadObjectOperation(FileSystemObject, LocalFolder) {
onProgressReport = onOperationProgressReport,
onDownloadCompleted = onOperationCompleted
};
OperationPool.Add(DownloadOp);
OperationFactory.StartNew(() => {
DownloadOp.Start();
}).ContinueWith(t => {
DownloadOp.InnerTask.Dispose(); //Exception!
t.Dispose();
OperationPool.Remove(DownloadOp);
DownloadOp = null;
});
return DownloadOp.TaskId;
}
...
public void CancelTask(Int32 TaskId) {
var FoundOperation = OperationPool.SingleOrDefault(where => where.TaskId == TaskId);
if (FoundOperation != null && FoundOperation.InnerTask.Status == TaskStatus.Running && FoundOperation.Status == OperationRunningStatus.Working) {
FoundOperation.Cancel();
}
}
public void CancelAllTasks() {
OperationPool.ForEach(Item => {
if (Item != null && Item.InnerTask.Status == TaskStatus.Running && Item.Status == OperationRunningStatus.Working) {
Item.Cancel();
}
});
}
}
}
Now I can simply skip the OperationFactory
(my custom TaskFactory
) and start the DownloadObjectOperation.Start()
directly but from what I've seen on the web, The TaskFatory does load balancing internally depending upon the ThreadPool
.
If I skip the OperationFactory, I'm able to chain the ContnueWith easily like so:
DownloadOp.InnerTask.ContinueWith(t => {
t.Dispose();
OperationPool.Remove(DownloadOp);
DownloadOp = null;
});
However, using the OperationFactory
, I get an exception as the sequence doesn't wait for the process to complete and immediately jumps into the ContinueWith
block. Now DownloadOp.InnerTask.Dispose();
cause an exception as the actual internal task is still running.
I need to gracefully end my tasks as this API is going to be used by a large number of people. If the Tasks
or my BaseOperations
are not properly disposed, I'm afraid the server will hang.