Queuing Actions/Delegates for Asyncronous Executio

2019-01-15 11:03发布

Is there something in the framework that would allow me to asynchronously execute a queue of delegates?

What I mean by that is I want the delegates to execute one at a time in the order they are queued but I want this whole process to run asynchronously. The queue is not fixed either, additional delegates would be added periodically and should be processed as soon as it reaches the top of the queue.

I don't need to use a Queue in particular, it's just how I would describe the desired behavior.

I could write something myself to do it but if there is something built in I could use instead that would be better.

I briefly looked at ThreadPool.QueueUserWorkItem as it allows executing in order but could find a satisfactory way to prevent more than one execution at a time.

2条回答
老娘就宠你
2楼-- · 2019-01-15 11:25

Is there something in the framework that would allow me to asynchronously execute a queue of delegates?

I'd implement this as a custom task scheduler. You could then queue and run your delegates as tasks, which would give you all benefits of exception handling, cancellation, and async/await.

Implementing a task scheduler which would execute your delegates in the serial order is quite simple, using BlockingCollection. The SerialTaskScheduler below is a simplified version of Stephen Toub's StaTaskScheduler:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Console_21628490
{
    // Test
    class Program
    {
        static async Task DoWorkAsync()
        {
            using (var scheduler = new SerialTaskScheduler())
            {
                var tasks = Enumerable.Range(1, 10).Select(i =>
                    scheduler.Run(() =>
                    {
                        var sleep = 1000 / i;
                        Thread.Sleep(sleep);
                        Console.WriteLine("Task #" + i + ", sleep: " + sleep);
                    }, CancellationToken.None));

                await Task.WhenAll(tasks);
            }
        }

        static void Main(string[] args)
        {
            DoWorkAsync().Wait();
            Console.ReadLine();
        }
    }

    // SerialTaskScheduler
    public sealed class SerialTaskScheduler : TaskScheduler, IDisposable
    {
        Task _schedulerTask;
        BlockingCollection<Task> _tasks;
        Thread _schedulerThread;

        public SerialTaskScheduler()
        {
            _tasks = new BlockingCollection<Task>();

            _schedulerTask = Task.Run(() =>
            {
                _schedulerThread = Thread.CurrentThread;

                foreach (var task in _tasks.GetConsumingEnumerable())
                    TryExecuteTask(task);
            });
        }

        protected override void QueueTask(Task task)
        {
            _tasks.Add(task);
        }

        protected override IEnumerable<Task> GetScheduledTasks()
        {
            return _tasks.ToArray();
        }

        protected override bool TryExecuteTaskInline(
            Task task, bool taskWasPreviouslyQueued)
        {
            return _schedulerThread == Thread.CurrentThread &&
                TryExecuteTask(task);
        }

        public override int MaximumConcurrencyLevel
        {
            get { return 1; }
        }

        public void Dispose()
        {
            if (_schedulerTask != null)
            {
                _tasks.CompleteAdding();
                _schedulerTask.Wait();
                _tasks.Dispose();
                _tasks = null;
                _schedulerTask = null;
            }
        }

        public Task Run(Action action, CancellationToken token)
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this);
        }

        public Task Run(Func<Task> action, CancellationToken token)
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
        }

        public Task<T> Run<T>(Func<Task<T>> action, CancellationToken token)
        {
            return Task.Factory.StartNew(action, token, TaskCreationOptions.None, this).Unwrap();
        }
    }
}

Output:

Task #1, sleep: 1000
Task #2, sleep: 500
Task #3, sleep: 333
Task #4, sleep: 250
Task #5, sleep: 200
Task #6, sleep: 166
Task #7, sleep: 142
Task #8, sleep: 125
Task #9, sleep: 111
Task #10, sleep: 100
查看更多
淡お忘
3楼-- · 2019-01-15 11:29

You can use TPL dataflow's ActionBlock and simply queue up a class that holds a Delegate and a lists of parameters. The ActionBlock will simply executes those delegates one at a time.

var block = new ActionBlock<Item>(_ => _.Action.DynamicInvoke(_.Paramters));

class Item
{
    public Delegate Action { get; private set; }
    public object[] Parameters { get; private set; }

    public Item(Delegate action, object[] parameters)
    {
        Action = action;
        Parameters = parameters;
    }
}

An even easier option would be to use an ActionBlock of Action, but that forces you to capture the parameters:

var block = new ActionBlock<Action>(action => action());

block.Post(() => Console.WriteLine(message));
查看更多
登录 后发表回答