C# queueing dependant tasks to be processed by a t

2019-03-19 20:25发布

问题:

I want to queue dependant tasks across several flows that need to be processed in order (in each flow). The flows can be processed in parallel.

To be specific, let's say I need two queues and I want the tasks in each queue to be processed in order. Here is sample pseudocode to illustrate the desired behavior:

Queue1_WorkItem wi1a=...;

enqueue wi1a;

... time passes ...

Queue1_WorkItem wi1b=...;

enqueue wi1b; // This must be processed after processing of item wi1a is complete

... time passes ...

Queue2_WorkItem wi2a=...;

enqueue wi2a; // This can be processed concurrently with the wi1a/wi1b

... time passes ...

Queue1_WorkItem wi1c=...;

enqueue wi1c; // This must be processed after processing of item wi1b is complete

Here is a diagram with arrows illustrating dependencies between work items:

The question is how do I do this using C# 4.0/.NET 4.0? Right now I have two worker threads, one per queue and I use a BlockingCollection<> for each queue. I would like to instead leverage the .NET thread pool and have worker threads process items concurrently (across flows), but serially within a flow. In other words I would like to be able to indicate that for example wi1b depends on completion of wi1a, without having to track completion and remember wi1a, when wi1b arrives. In other words, I just want to say, "I want to submit a work item for queue1, which is to be processed serially with other items I have already submitted for queue1, but possibly in parallel with work items submitted to other queues".

I hope this description made sense. If not please feel free to ask questions in the comments and I will update this question accordingly.

Thanks for reading.

Update:

To summarize "flawed" solutions so far, here are the solutions from the answers section that I cannot use and the reason(s) why I cannot use them:

TPL tasks require specifying the antecedent task for a ContinueWith(). I do not want to maintain knowledge of each queue's antecedent task when submitting a new task.

TDF ActionBlocks looked promising, but it would appear that items posted to an ActionBlock are processed in parallel. I need for the items for a particular queue to be processed serially.

Update 2:

RE: ActionBlocks

It would appear that setting the MaxDegreeOfParallelism option to one prevents parallel processing of work items submitted to a single ActionBlock. Therefore it seems that having an ActionBlock per queue solves my problem with the only disadvantage being that this requires the installation and deployment of the TDF library from Microsoft and I was hoping for a pure .NET 4.0 solution. So far, this is the candidate accepted answer, unless someone can figure out a way to do this with a pure .NET 4.0 solution that doesn't degenerate to a worker thread per queue (which I am already using).

回答1:

I understand you have many queues and don't want to tie up threads. You could have an ActionBlock per queue. The ActionBlock automates most of what you need: It processes work items serially, and only starts a Task when work is pending. When no work is pending, no Task/Thread is blocked.



回答2:

The best way is to use the Task Parallel Library (TPL) and Continuations. A continuation not only allows you to create a flow of tasks but also handles your exceptions. This is a great introduction to the TPL. But to give you some idea...

You can start a TPL task using

Task task = Task.Factory.StartNew(() => 
{
    // Do some work here...
});

Now to start a second task when an antecedent task finishes (in error or successfully) you can use the ContinueWith method

Task task1 = Task.Factory.StartNew(() => Console.WriteLine("Antecedant Task"));
Task task2 = task1.ContinueWith(antTask => Console.WriteLine("Continuation..."));

So as soon as task1 completes, fails or is cancelled task2 'fires-up' and starts running. Note that if task1 had completed before reaching the second line of code task2 would be scheduled to execute immediately. The antTask argument passed to the second lambda is a reference to the antecedent task. See this link for more detailed examples...

You can also pass continuations results from the antecedent task

Task.Factory.StartNew<int>(() => 1)
    .ContinueWith(antTask => antTask.Result * 4)
    .ContinueWith(antTask => antTask.Result * 4)
    .ContinueWith(antTask =>Console.WriteLine(antTask.Result * 4)); // Prints 64.

Note. Be sure to read up on exception handling in the first link provided as this can lead a newcomer to TPL astray.

One last thing to look at in particular for what you want is child tasks. Child tasks are those which are created as AttachedToParent. In this case the continuation will not run until all child tasks have completed

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew(() =>
{
    Task.Factory.StartNew(() => { SomeMethod() }, atp);
    Task.Factory.StartNew(() => { SomeOtherMethod() }, atp); 
}).ContinueWith( cont => { Console.WriteLine("Finished!") });

I hope this helps.

Edit: Have you had a look at ConcurrentCollections in particular the BlockngCollection<T>. So in your case you might use something like

public class TaskQueue : IDisposable
{
    BlockingCollection<Action> taskX = new BlockingCollection<Action>();

    public TaskQueue(int taskCount)
    {
        // Create and start new Task for each consumer.
        for (int i = 0; i < taskCount; i++)
            Task.Factory.StartNew(Consumer);  
    }

    public void Dispose() { taskX.CompleteAdding(); }

    public void EnqueueTask (Action action) { taskX.Add(Action); }

    void Consumer()
    {
        // This seq. that we are enumerating will BLOCK when no elements
        // are avalible and will end when CompleteAdding is called.
        foreach (Action action in taskX.GetConsumingEnumerable())
            action(); // Perform your task.
    }
}


回答3:

A .NET 4.0 solution based on TPL is possible, while hiding away the fact that it needs to store the parent task somewhere. For example:

class QueuePool
{
    private readonly Task[] _queues;

    public QueuePool(int queueCount)
    { _queues = new Task[queueCount]; }

    public void Enqueue(int queueIndex, Action action)
    {
        lock (_queues)
        {
           var parent = _queue[queueIndex];
           if (parent == null)
               _queues[queueIndex] = Task.Factory.StartNew(action);
           else
               _queues[queueIndex] = parent.ContinueWith(_ => action());
        }
    }
}

This is using a single lock for all queues, to illustrate the idea. In production code, however, I would use a lock per queue to reduce contention.



回答4:

It looks like the design you already have is good and working. Your worker threads (one per queue) are long-running so if you want to use Task's instead, specify TaskCreationOptions.LongRunning so you get a dedicated worker thread.

But there isn't really a need to use the ThreadPool here. It doesn't offer many benefits for long-running work.