限制通过并行任务库运行的活动任务数的最佳方法限制通过并行任务库运行的活动任务数的最佳方法(Best

2019-06-02 16:20发布

考虑一个队列拿着很多需要处理的工作。 队列的限制是只能得到一次1个工作,无法知道有多少就业机会也有办法。 该职位需要10秒才能完成,涉及到很多等待来自Web服务的响应,这样是不是CPU的约束。

如果我用这样的

while (true)
{
   var job = Queue.PopJob();
   if (job == null)
      break;
   Task.Factory.StartNew(job.Execute); 
}

然后,它会从队列速度远远超过它能够完成他们疯狂地流行的工作,耗尽内存,并落在它的屁股。 > <

我不能使用(我不认为) ParallelOptions.MaxDegreeOfParallelism因为我不能使用Parallel.Invoke或Parallel.ForEach

3个替代我发现

  1. 更换Task.Factory.StartNew与

     Task task = new Task(job.Execute,TaskCreationOptions.LongRunning) task.Start(); 

    这似乎在一定程度上解决这个问题,但我并不清楚到底是什么,这是做 ,如果这是最好的方法。

  2. 创建自定义任务调度器限制并发度

  3. 使用类似BlockingCollection启动时的工作加至收集和成品限制可以运行数时删除。

随着#1我必须相信,正确的决策是自动进行的,#2 /#3我得工作了,可以运行自己的任务的最大数量。

有我正确地理解这一点 - 这是更好的办法,或有另一种方式?

编辑 -这是我想出从下面的答案,生产者-消费者模式。

除了总吞吐量的目的是不是快出队的饭碗,可以处理,但多线程轮询队列(这里没有显示,但是这就是非阻塞运算,如果从多个地方高频调查会导致巨大的交易成本) 。

// BlockingCollection<>(1) will block if try to add more than 1 job to queue (no
// point in being greedy!), or is empty on take.
var BlockingCollection<Job> jobs = new BlockingCollection<Job>(1);

// Setup a number of consumer threads.
// Determine MAX_CONSUMER_THREADS empirically, if 4 core CPU and 50% of time
// in job is blocked waiting IO then likely be 8.
for(int numConsumers = 0; numConsumers < MAX_CONSUMER_THREADS; numConsumers++)
{
   Thread consumer = new Thread(() =>
   {
      while (!jobs.IsCompleted)
      {
         var job = jobs.Take();
         job.Execute();
      }
   }
   consumer.Start();
}

// Producer to take items of queue and put in blocking collection ready for processing
while (true)
{
    var job = Queue.PopJob();
    if (job != null)
       jobs.Add(job);
    else
    {
       jobs.CompletedAdding()
       // May need to wait for running jobs to finish
       break;
    }
}

Answer 1:

我只是给了一个答案 ,这是非常适用于这个问题。

基本上,TPL Task类是由调度CPU限制的工作。 它不阻止工作取得。

等待服务答复:您正在使用的资源,是不是CPU工作。 这意味着,因为它假定CPU有界到一定程度的TPL将mismange你的资源。

自行管理的资源:启动线程或LongRunning任务的固定数量(这基本上是一样的)。 决定线程的经验数量。

你不能把不可靠的系统投入生产。 出于这个原因,我建议#1,但节流 。 因为有工作项目不创造尽可能多的线程。 创建所需要饱和远程服务为多个线程。 自己写的这滋生N个线程,并用它们来处理M工作项目的辅助功能。 你得到完全可预测的和可靠的结果的方式。



Answer 2:

潜在的流动分流和延续所造成await在你的代码或第三方库,以后,不会长时间运行的任务(或线程)发挥很好,因此使用长时间运行的任务,不要打扰。 在async/await的世界里,他们是无用的。 更多细节在这里 。

您可以拨打ThreadPool.SetMaxThreads但你拨打这个电话之前,请确保您设定的最小线程数ThreadPool.SetMinThreads ,使用低于或等于最高者值。 顺便说一句,MSDN文档是错误的。 你可以去下面的内核数量的机器上使用这些方法调用,至少在.NET 4.5和4.6,我用这种方法来减少内存的处理能力有限的32位服务。

然而,如果你不希望限制整个应用程序,但只是它的处理部分,自定义任务调度程序将做的工作。 很久以前,微软发布了样品与几个自定义任务调度器,包括一个LimitedConcurrencyLevelTaskScheduler 。 与手动生成的主要处理任务Task.Factory.StartNew ,提供定制任务调度器,并且通过它产生了每一个其他任务将使用它,包括async/await甚至Task.Yield ,用于在早期对实现asynchronousy async方法。

但是,对于您的特定情况下,这两种解决方案将不会停止在完成之前用尽你的工作队列。 这也许不是理想的,这取决于你的那个队列的实施和目的。 他们更喜欢“火一串任务,让调度找到执行它们的时间”解决方案的类型。 因此,也许更合适些这里一点可能是在通过作业的执行控制的更严格的方法semaphores 。 代码如下所示:

semaphore = new SemaphoreSlim(max_concurrent_jobs);

while(...){
 job = Queue.PopJob();
 semaphore.Wait();
 ProcessJobAsync(job);
}

async Task ProcessJobAsync(Job job){
 await Task.Yield();
 ... Process the job here...
 semaphore.Release();
}

还有对皮肤有猫的方法不止一种。 使用你认为什么是适当的。



Answer 3:

微软有一个叫做数据流哪个是你想要(和更多)正是非常酷库。 细节在这里 。

您应该使用ActionBlock类,并设置MaxDegreeOfParallelism的ExecutionDataflowBlockOptions对象。 ActionBlock与异步/ AWAIT,所以即使您的外线呼叫等待,没有新的就业机会将开始处理很好地发挥。

ExecutionDataflowBlockOptions actionBlockOptions = new ExecutionDataflowBlockOptions
{
     MaxDegreeOfParallelism = 10
};

this.sendToAzureActionBlock = new ActionBlock<List<Item>>(async items => await ProcessItems(items),
            actionBlockOptions);
...
this.sendToAzureActionBlock.Post(itemsToProcess)


Answer 4:

这里似乎问题并不太多运行 Task S,这是太多的计划 Task秒。 您的代码将尝试安排尽可能多的Task S作为就可以了,不管他们是执行的速度有多快。 如果你有太多的工作,这意味着你会得到OOM。

正因为如此,关你提出的解决方案实际上会解决你的问题。 如果它似乎是简单地指定LongRunning解决您的问题,那么这很可能是因为创建一个新的Thread (这是什么LongRunning一样)需要一定的时间,这可以有效的控制得到新的就业机会。 因此,这种解决方案只能由意外的作品,并很可能会导致其他问题以后。

至于解决办法,我大多与USR同意:那效果相当好最简单的解决方法是创建一个固定数量LongRunning任务,并有一个循环调用Queue.PopJob()由保护lock ,如果这个方法是不是线程安全的)和Execute() S中的工作。

更新:经过一些更多的思考,我意识到下面尝试将最有可能表现得可怕。 只使用它,如果你真的相信它会为你工作。


但TPL试图找出并行的最佳程度,甚至IO绑定Task秒。 所以,你可以尝试用它来你的优势。 龙Task旨意在这里工作,因为从第三方物流的角度来看,好像没有工作做,它会启动新的Task结束了遍。 你可以做的反而是开始一个新的Task ,在每年年底Task 。 这样一来,太平人寿就知道发生了什么事情和它的算法可以很好地工作。 此外,为了让TPL决定的并行度,在开始Task是先在其行,开始的另一行Task秒。

该算法可以很好地工作。 但它也有可能是TPL将就并行度一个错误的决定,我还没有真正尝试这样的事。

在代码中,它应该是这样的:

void ProcessJobs(bool isFirst)
{
    var job = Queue.PopJob(); // assumes PopJob() is thread-safe
    if (job == null)
        return;

    if (isFirst)
        Task.Factory.StartNew(() => ProcessJobs(true));

    job.Execute();

    Task.Factory.StartNew(() => ProcessJob(false));
}

并与启动

Task.Factory.StartNew(() => ProcessJobs(true));


Answer 5:

TaskCreationOptions.LongRunning是阻塞的任务,并在这里使用它是合法有效。 它所做的是它表明的调度奉献一个线程的任务。 调度器本身试图保持在相同的水平的线程数,以避免过多的上下文切换的CPU内核的数量。

大家都在描述线程在C#约瑟夫阿尔巴哈利



Answer 6:

我用一个消息队列/邮箱的机制来实现这一目标。 这是类似于角色模型。 我有了一个邮箱的类。 我把这个班我的“工人”。 它可以接收消息。 这些消息进行排队,他们本质上,定义了我想要的工人运行的任务。 工人将使用Task.Wait()为它的任务出队的下一条消息,并开始下一个任务之前完成。

通过限制我的工人人数,我能以限制正在运行的并发线程/任务的数量。

这是概述,源代码,在我的博客文章的分布式计算引擎。 如果你看一下IActor和WorkerNode的代码,我希望这是有道理的。

https://long2know.com/2016/08/creating-a-distributed-computing-engine-with-the-actor-model-and-net-core/



文章来源: Best way to limit the number of active Tasks running via the Parallel Task Library