Is there something in the framework that would allow me to asynchronously execute a queue of delegates?
What I mean by that is I want the delegates to execute one at a time in the order they are queued but I want this whole process to run asynchronously. The queue is not fixed either, additional delegates would be added periodically and should be processed as soon as it reaches the top of the queue.
I don't need to use a Queue
in particular, it's just how I would describe the desired behavior.
I could write something myself to do it but if there is something built in I could use instead that would be better.
I briefly looked at ThreadPool.QueueUserWorkItem
as it allows executing in order but could find a satisfactory way to prevent more than one execution at a time.
Is there something in the framework that would allow me to
asynchronously execute a queue of delegates?
I'd implement this as a custom task scheduler. You could then queue and run your delegates as tasks, which would give you all benefits of exception handling, cancellation, and async/await
.
Implementing a task scheduler which would execute your delegates in the serial order is quite simple, using BlockingCollection
. The SerialTaskScheduler
below is a simplified version of Stephen Toub's StaTaskScheduler
:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Console_21628490
{
// Test
class Program
{
static async Task DoWorkAsync()
{
using (var scheduler = new SerialTaskScheduler())
{
var tasks = Enumerable.Range(1, 10).Select(i =>
scheduler.Run(() =>
{
var sleep = 1000 / i;
Thread.Sleep(sleep);
Console.WriteLine("Task #" + i + ", sleep: " + sleep);
}, CancellationToken.None));
await Task.WhenAll(tasks);
}
}
static void Main(string[] args)
{
DoWorkAsync().Wait();
Console.ReadLine();
}
}
// SerialTaskScheduler
public sealed class SerialTaskScheduler : TaskScheduler, IDisposable
{
Task _schedulerTask;
BlockingCollection<Task> _tasks;
Thread _schedulerThread;
public SerialTaskScheduler()
{
_tasks = new BlockingCollection<Task>();
_schedulerTask = Task.Run(() =>
{
_schedulerThread = Thread.CurrentThread;
foreach (var task in _tasks.GetConsumingEnumerable())
TryExecuteTask(task);
});
}
protected override void QueueTask(Task task)
{
_tasks.Add(task);
}
protected override IEnumerable<Task> GetScheduledTasks()
{
return _tasks.ToArray();
}
protected override bool TryExecuteTaskInline(
Task task, bool taskWasPreviouslyQueued)
{
return _schedulerThread == Thread.CurrentThread &&
TryExecuteTask(task);
}
public override int MaximumConcurrencyLevel
{
get { return 1; }
}
public void Dispose()
{
if (_schedulerTask != null)
{
_tasks.CompleteAdding();
_schedulerTask.Wait();
_tasks.Dispose();
_tasks = null;
_schedulerTask = null;
}
}
public Task Run(Action action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
}
public Task Run(Func<Task> action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
}
public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token)
{
return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
}
}
}
Output:
Task #1, sleep: 1000
Task #2, sleep: 500
Task #3, sleep: 333
Task #4, sleep: 250
Task #5, sleep: 200
Task #6, sleep: 166
Task #7, sleep: 142
Task #8, sleep: 125
Task #9, sleep: 111
Task #10, sleep: 100
You can use TPL dataflow
's ActionBlock
and simply queue up a class that holds a Delegate
and a lists of parameters. The ActionBlock
will simply executes those delegates one at a time.
var block = new ActionBlock<Item>(_ => _.Action.DynamicInvoke(_.Paramters));
class Item
{
public Delegate Action { get; private set; }
public object[] Parameters { get; private set; }
public Item(Delegate action, object[] parameters)
{
Action = action;
Parameters = parameters;
}
}
An even easier option would be to use an ActionBlock
of Action
, but that forces you to capture the parameters:
var block = new ActionBlock<Action>(action => action());
block.Post(() => Console.WriteLine(message));