C#排队依赖任务由一个线程池来处理(C# queueing dependant tasks to b

2019-07-30 03:51发布

我希望在整个需要在顺序处理(在每个流动)若干流队列相关任务。 流可以并行地处理。

具体而言,假设我需要两个队列,我想每个队列的任务,以便进行处理。 下面是示例伪代码来说明所需的行为:

Queue1_WorkItem wi1a=...;

enqueue wi1a;

... time passes ...

Queue1_WorkItem wi1b=...;

enqueue wi1b; // This must be processed after processing of item wi1a is complete

... time passes ...

Queue2_WorkItem wi2a=...;

enqueue wi2a; // This can be processed concurrently with the wi1a/wi1b

... time passes ...

Queue1_WorkItem wi1c=...;

enqueue wi1c; // This must be processed after processing of item wi1b is complete

这里是用箭头示出的工作项目之间的依赖关系的图:

现在的问题是我怎么做到这一点使用C#4.0 / .NET 4.0? 现在我有两个工作线程,每个队列之一,我使用BlockingCollection<>针对每个队列。 我想,而不是使用.NET线程池,并有工作线程处理项目同时(横跨流),但串行内的流动。 换句话说,我希望能够表明,例如wi1b取决于wi1a完成,而不必跟踪完成并记住wi1a,当wi1b到达。 换句话说,我只想说,“我想提出一个工作项队列1,这是与我已经提交了队列1,但可能与提交给其他队列工作项目并行的其他项目串行处理”。

我希望这说明是有意义的。 如果没有,请随时提问的意见,我会相应地更新这个问题。

谢谢阅读。

更新:

总结“硬伤”的解决方案,到目前为止,这里有从答案部分的解决方案,我不能用,为什么我不能使用它们的原因(S):

TPL任务需要指定任务前期的ContinueWith() 我不希望提交一个新的任务时保持每个队列的先行任务的知识。

TDF ActionBlocks看起来前途无量,但它会出现贴到ActionBlock项目进行并行处理。 我需要进行串行处理特定队列中的项目。

更新2:

RE:ActionBlocks

这样看来,在设置MaxDegreeOfParallelism选项一个防止提交一个工作项目并行处理ActionBlock 。 因此,它似乎有一个ActionBlock每个队列解决我与唯一的缺点是,这需要从微软的TDF库的安装和部署,我希望的是纯.NET 4.0解决方案的问题。 到目前为止,这是候选人接受的答案,除非有人能想出一个办法与不退化为每个队列工作者线程(我已经使用)纯.NET 4.0解决方案做到这一点。

Answer 1:

我知道你有很多队列,不想占用线程。 你可以有一个ActionBlock每个队列。 该ActionBlock自动化你最需要的东西:它处理的工作项顺序,只有开始时的工作正在等待任务。 当没有工作之前,没有任务/线程被阻塞。



Answer 2:

最好的办法是使用Task Parallel Library (TPL)Continuations 。 的延续,不仅可以让你创建任务的流程,而且处理您的例外。 这是一个伟大的介绍给TPL。 但是,为了给你一些想法...

您可以使用启动TPL任务

Task task = Task.Factory.StartNew(() => 
{
    // Do some work here...
});

现在启动第二个任务时的先行任务结束(错误或成功),你可以使用ContinueWith方法

Task task1 = Task.Factory.StartNew(() => Console.WriteLine("Antecedant Task"));
Task task2 = task1.ContinueWith(antTask => Console.WriteLine("Continuation..."));

因此,只要task1完成,失败或被取消task2 “火灾跟进及开始运行。 请注意,如果task1已经达到代码的第二条线之前完成task2将被安排为立即执行。 该antTask传递给第二拉姆达参数是对先行任务的参考。 见这个链接进行更详细的例子...

您也可以从先行任务通过延续的结果

Task.Factory.StartNew<int>(() => 1)
    .ContinueWith(antTask => antTask.Result * 4)
    .ContinueWith(antTask => antTask.Result * 4)
    .ContinueWith(antTask =>Console.WriteLine(antTask.Result * 4)); // Prints 64.

注意。 一定要在,因为这提供的第一个链接的异常处理读了会导致一个初来乍到TPL引入歧途。

看特别是你想要的最后一件事是孩子的任务。 儿童的任务是那些为创建AttachedToParent 。 在这种情况下,继续将不会运行,直到所有的子任务已完成

TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew(() =>
{
    Task.Factory.StartNew(() => { SomeMethod() }, atp);
    Task.Factory.StartNew(() => { SomeOtherMethod() }, atp); 
}).ContinueWith( cont => { Console.WriteLine("Finished!") });

我希望这有帮助。

编辑:你有看看ConcurrentCollections特别是BlockngCollection<T> 所以你的情况,你可能会使用类似

public class TaskQueue : IDisposable
{
    BlockingCollection<Action> taskX = new BlockingCollection<Action>();

    public TaskQueue(int taskCount)
    {
        // Create and start new Task for each consumer.
        for (int i = 0; i < taskCount; i++)
            Task.Factory.StartNew(Consumer);  
    }

    public void Dispose() { taskX.CompleteAdding(); }

    public void EnqueueTask (Action action) { taskX.Add(Action); }

    void Consumer()
    {
        // This seq. that we are enumerating will BLOCK when no elements
        // are avalible and will end when CompleteAdding is called.
        foreach (Action action in taskX.GetConsumingEnumerable())
            action(); // Perform your task.
    }
}


Answer 3:

基于TPL一个.NET 4.0的解决方案是可能的,而藏起来了,它需要存储父任务某处的事实。 例如:

class QueuePool
{
    private readonly Task[] _queues;

    public QueuePool(int queueCount)
    { _queues = new Task[queueCount]; }

    public void Enqueue(int queueIndex, Action action)
    {
        lock (_queues)
        {
           var parent = _queue[queueIndex];
           if (parent == null)
               _queues[queueIndex] = Task.Factory.StartNew(action);
           else
               _queues[queueIndex] = parent.ContinueWith(_ => action());
        }
    }
}

这是用一个锁的所有队列,来说明这个想法。 在生产代码,但是,我会用一个锁每个队列,以减少争。



Answer 4:

看起来你已经设计好和工作。 你的工作线程(每个队列一个)是长期运行的,所以如果你想使用任务的,而不是指定TaskCreationOptions.LongRunning所以你得到一个专用的工作线程。

但是,是不是真的有必要在这里使用线程池。 它不提供长期运行的工作带来许多好处。



文章来源: C# queueing dependant tasks to be processed by a thread pool