我希望在整个需要在顺序处理(在每个流动)若干流队列相关任务。 流可以并行地处理。
具体而言,假设我需要两个队列,我想每个队列的任务,以便进行处理。 下面是示例伪代码来说明所需的行为:
Queue1_WorkItem wi1a=...;
enqueue wi1a;
... time passes ...
Queue1_WorkItem wi1b=...;
enqueue wi1b; // This must be processed after processing of item wi1a is complete
... time passes ...
Queue2_WorkItem wi2a=...;
enqueue wi2a; // This can be processed concurrently with the wi1a/wi1b
... time passes ...
Queue1_WorkItem wi1c=...;
enqueue wi1c; // This must be processed after processing of item wi1b is complete
这里是用箭头示出的工作项目之间的依赖关系的图:
现在的问题是我怎么做到这一点使用C#4.0 / .NET 4.0? 现在我有两个工作线程,每个队列之一,我使用BlockingCollection<>
针对每个队列。 我想,而不是使用.NET线程池,并有工作线程处理项目同时(横跨流),但串行内的流动。 换句话说,我希望能够表明,例如wi1b取决于wi1a完成,而不必跟踪完成并记住wi1a,当wi1b到达。 换句话说,我只想说,“我想提出一个工作项队列1,这是与我已经提交了队列1,但可能与提交给其他队列工作项目并行的其他项目串行处理”。
我希望这说明是有意义的。 如果没有,请随时提问的意见,我会相应地更新这个问题。
谢谢阅读。
更新:
总结“硬伤”的解决方案,到目前为止,这里有从答案部分的解决方案,我不能用,为什么我不能使用它们的原因(S):
TPL任务需要指定任务前期的ContinueWith()
我不希望提交一个新的任务时保持每个队列的先行任务的知识。
TDF ActionBlocks看起来前途无量,但它会出现贴到ActionBlock项目进行并行处理。 我需要进行串行处理特定队列中的项目。
更新2:
RE:ActionBlocks
这样看来,在设置MaxDegreeOfParallelism
选项一个防止提交一个工作项目并行处理ActionBlock
。 因此,它似乎有一个ActionBlock
每个队列解决我与唯一的缺点是,这需要从微软的TDF库的安装和部署,我希望的是纯.NET 4.0解决方案的问题。 到目前为止,这是候选人接受的答案,除非有人能想出一个办法与不退化为每个队列工作者线程(我已经使用)纯.NET 4.0解决方案做到这一点。
我知道你有很多队列,不想占用线程。 你可以有一个ActionBlock每个队列。 该ActionBlock自动化你最需要的东西:它处理的工作项顺序,只有开始时的工作正在等待任务。 当没有工作之前,没有任务/线程被阻塞。
最好的办法是使用Task Parallel Library (TPL)
和Continuations
。 的延续,不仅可以让你创建任务的流程,而且处理您的例外。 这是一个伟大的介绍给TPL。 但是,为了给你一些想法...
您可以使用启动TPL任务
Task task = Task.Factory.StartNew(() =>
{
// Do some work here...
});
现在启动第二个任务时的先行任务结束(错误或成功),你可以使用ContinueWith
方法
Task task1 = Task.Factory.StartNew(() => Console.WriteLine("Antecedant Task"));
Task task2 = task1.ContinueWith(antTask => Console.WriteLine("Continuation..."));
因此,只要task1
完成,失败或被取消task2
“火灾跟进及开始运行。 请注意,如果task1
已经达到代码的第二条线之前完成task2
将被安排为立即执行。 该antTask
传递给第二拉姆达参数是对先行任务的参考。 见这个链接进行更详细的例子...
您也可以从先行任务通过延续的结果
Task.Factory.StartNew<int>(() => 1)
.ContinueWith(antTask => antTask.Result * 4)
.ContinueWith(antTask => antTask.Result * 4)
.ContinueWith(antTask =>Console.WriteLine(antTask.Result * 4)); // Prints 64.
注意。 一定要在,因为这提供的第一个链接的异常处理读了会导致一个初来乍到TPL引入歧途。
看特别是你想要的最后一件事是孩子的任务。 儿童的任务是那些为创建AttachedToParent
。 在这种情况下,继续将不会运行,直到所有的子任务已完成
TaskCreationOptions atp = TaskCreationOptions.AttachedToParent;
Task.Factory.StartNew(() =>
{
Task.Factory.StartNew(() => { SomeMethod() }, atp);
Task.Factory.StartNew(() => { SomeOtherMethod() }, atp);
}).ContinueWith( cont => { Console.WriteLine("Finished!") });
我希望这有帮助。
编辑:你有看看ConcurrentCollections
特别是BlockngCollection<T>
所以你的情况,你可能会使用类似
public class TaskQueue : IDisposable
{
BlockingCollection<Action> taskX = new BlockingCollection<Action>();
public TaskQueue(int taskCount)
{
// Create and start new Task for each consumer.
for (int i = 0; i < taskCount; i++)
Task.Factory.StartNew(Consumer);
}
public void Dispose() { taskX.CompleteAdding(); }
public void EnqueueTask (Action action) { taskX.Add(Action); }
void Consumer()
{
// This seq. that we are enumerating will BLOCK when no elements
// are avalible and will end when CompleteAdding is called.
foreach (Action action in taskX.GetConsumingEnumerable())
action(); // Perform your task.
}
}
基于TPL一个.NET 4.0的解决方案是可能的,而藏起来了,它需要存储父任务某处的事实。 例如:
class QueuePool
{
private readonly Task[] _queues;
public QueuePool(int queueCount)
{ _queues = new Task[queueCount]; }
public void Enqueue(int queueIndex, Action action)
{
lock (_queues)
{
var parent = _queue[queueIndex];
if (parent == null)
_queues[queueIndex] = Task.Factory.StartNew(action);
else
_queues[queueIndex] = parent.ContinueWith(_ => action());
}
}
}
这是用一个锁的所有队列,来说明这个想法。 在生产代码,但是,我会用一个锁每个队列,以减少争。
看起来你已经设计好和工作。 你的工作线程(每个队列一个)是长期运行的,所以如果你想使用任务的,而不是指定TaskCreationOptions.LongRunning
所以你得到一个专用的工作线程。
但是,是不是真的有必要在这里使用线程池。 它不提供长期运行的工作带来许多好处。