TPL Queue Processing

2020-03-07 05:42发布

问题:

I'm currently working on a a project and I have a need to queue some jobs for processing, here's the requirement:

  1. Jobs must be processed one at a time
  2. A queued item must be able to be waited on

So I want something akin to:

Task<result> QueueJob(params here)
{
   /// Queue the job and somehow return a waitable task that will wait until the queued job has been executed and return the result.
}

I've tried having a background running task that just pulls items off a queue and processes the job, but the difficulty is getting from a background task to the method.

If need be I could go the route of just requesting a completion callback in the QueueJob method, but it'd be great if I could get a transparent Task back that allows you to wait on the job to be processed (even if there are jobs before it in the queue).

回答1:

Func<T> takes no parameters and returns a value of type T. The jobs are run one by one and you can wait on the returned task to get the result.

public class TaskQueue
{
    private Queue<Task> InnerTaskQueue;

    private bool IsJobRunning;

    public void Start()
    {
        Task.Factory.StartNew(() =>
        {
            while (true)
            {
                if (InnerTaskQueue.Count > 0 && !IsJobRunning)
                {
                     var task = InnerTaskQueue.Dequeue()
                     task.Start();
                     IsJobRunning = true;
                     task.ContinueWith(t => IsJobRunning = false);
                }
                else
                {
                     Thread.Sleep(1000);
                }
            }
        }
    }

    public Task<T> QueueJob(Func<T> job)
    {
        var task = new Task<T>(() => job());
        InnerTaskQueue.Enqueue(task);
        return task;
    }
}


回答2:

You might find TaskCompletionSource<T> useful, it can be used to create a Task that completes exactly when you want it to. If you combine it with BlockingCollection<T>, you will get your queue:

class JobProcessor<TInput, TOutput> : IDisposable
{
    private readonly Func<TInput, TOutput> m_transform;

    // or a custom type instead of Tuple
    private readonly
        BlockingCollection<Tuple<TInput, TaskCompletionSource<TOutput>>>
        m_queue =
        new BlockingCollection<Tuple<TInput, TaskCompletionSource<TOutput>>>();

    public JobProcessor(Func<TInput, TOutput> transform)
    {
        m_transform = transform;
        Task.Factory.StartNew(ProcessQueue, TaskCreationOptions.LongRunning);
    }

    private void ProcessQueue()
    {
        Tuple<TInput, TaskCompletionSource<TOutput>> tuple;
        while (m_queue.TryTake(out tuple, Timeout.Infinite))
        {
            var input = tuple.Item1;
            var tcs = tuple.Item2;

            try
            {
                tcs.SetResult(m_transform(input));
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        }
    }

    public Task<TOutput> QueueJob(TInput input)
    {
        var tcs = new TaskCompletionSource<TOutput>();
        m_queue.Add(Tuple.Create(input, tcs));
        return tcs.Task;
    }

    public void Dispose()
    {
        m_queue.CompleteAdding();
    }
}


回答3:

I would go for something like this:

class TaskProcessor<TResult>
{
    // TODO: Error handling!

    readonly BlockingCollection<Task<TResult>> blockingCollection = new BlockingCollection<Task<TResult>>(new ConcurrentQueue<Task<TResult>>());

    public Task<TResult> AddTask(Func<TResult> work)
    {
        var task = new Task<TResult>(work);
        blockingCollection.Add(task);
        return task; // give the task back to the caller so they can wait on it
    }

    public void CompleteAddingTasks()
    {
        blockingCollection.CompleteAdding();
    }

    public TaskProcessor()
    {
        ProcessQueue();
    }

    void ProcessQueue()
    {
        Task<TResult> task;
        while (blockingCollection.TryTake(out task))
        {
            task.Start();
            task.Wait(); // ensure this task finishes before we start a new one...
        }
    }
}

Depending on the type of app that is using it, you could switch out the BlockingCollection/ConcurrentQueue for something simpler (eg just a plain queue). You can also adjust the signature of the "AddTask" method depending on what sort of methods/parameters you will be queueing up...