Processing only n items at a time concurrently usi

2019-03-27 06:35发布

This is all happening in a windows service.

I have a Queue<T> (actually a ConcurrentQueue<T>) holding items waiting to be processed. But, I don't want to process only one at a time, I want to process n items concurrently, where n is a configurable integer.

How do I go about doing this using the Task Parallel Library?

I know that TPL will partition collections on behalf of the developer for concurrent processing, but not sure if that's the feature that I'm after. I'm new to multithreading and TPL.

3条回答
Ridiculous、
2楼-- · 2019-03-27 07:14

Here is one idea that involves creating an extension method for TaskFactory.

public static class TaskFactoryExtension
{
    public static Task StartNew(this TaskFactory target, Action action, int parallelism)
    {
        var tasks = new Task[parallelism];
        for (int i = 0; i < parallelism; i++)
        {
            tasks[i] = target.StartNew(action);
        }
        return target.StartNew(() => Task.WaitAll(tasks));
    }
}

Then your calling code would look like the following.

ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
var task = Task.Factory.StartNew(
  () =>
  {
    T item;
    while (queue.TryDequeue(out item))
    {
      ProcessItem(item);
    }
  }, n);
task.Wait(); // Optionally wait for everything to finish.

Here is another idea using Parallel.ForEach. The problem with this approach is that your degrees of parallelism might not necessarily be honored. You are only indicating the maximum amount allowed and not the absolute amount.

ConcurrentQueue<T> queue = GetQueue();
int n = GetDegreeOfParallelism();
Parallel.ForEach(queue, new ParallelOptions { MaxDegreeOfParallelism = n },
  (item) =>
  {
    ProcessItem(item);    
  });
查看更多
走好不送
3楼-- · 2019-03-27 07:16

Use BlockingCollection<T> instead of ConcurrentQueue<T>, then you can start any number of consumer threads and use Take method of the BlockingCollection. if the collection is empty, the Take method will automatically block in the caller thread waiting for items to be added, otherwise the threads will consume all the queue items in parallel. However as your question mentioned out the use of TPL it turns out that Parallel.ForEach have some issues when using with BlockingCollection check this post for more details. so you have to manage creation of your consumer threads your self. new Thread(/*consumer method*/) or new Task()...

查看更多
相关推荐>>
4楼-- · 2019-03-27 07:24

I'd also recommend using a BlockingCollection instead of directly using a ConcurrentQueue.

Here's an example:

public class QueuingRequestProcessor
{
  private BlockingCollection<MyRequestType> queue;

  public void QueuingRequestProcessor(int maxConcurrent)
  {
    this.queue = new BlockingCollection<MyRequestType>(maxConcurrent);

    Task[] consumers = new Task[maxConcurrent];

    for (int i = 0; i < maxConcurrent; i++)
    {
      consumers[i] = Task.Factory.StartNew(() =>
      {
        // Will wait when queue is empty, until CompleteAdding() is called
        foreach (var request in this.queue.GetConsumingEnumerable())
        {
          Process(request);
        }
      });
    }
  }

  public void Add(MyRequest request)
  {
    this.queue.Add(request);
  }

  public void Stop()
  {
    this.queue.CompleteAdding();
  }

  private void Process(MyRequestType request)
  {
    // Do your processing here
  }
}

Note that maxConcurrent in the constructor defines how many requests will be processed concurrently.

查看更多
登录 后发表回答