How to limit the Maximum number of parallel tasks

2019-01-17 10:56发布

I have a collection of 1000 input message to process. I'm looping the input collection and starting the new task for each message to get processed.

//Assume this messages collection contains 1000 items
var messages = new List<string>();

foreach (var msg in messages)
{
   Task.Factory.StartNew(() =>
   {
    Process(msg);
   });
 }

Can we guess how many maximum messages simultaneously get processed at the time (assuming normal Quad core processor), or can we limit the maximum number of messages to be processed at the time?

How to ensure this message get processed in the same sequence/order of the Collection?

7条回答
Luminary・发光体
2楼-- · 2019-01-17 11:42

You can create your own TaskScheduler and override QueueTask there.

protected virtual void QueueTask(Task task)

Then you can do anything you like.

One example here:

Limited concurrency level task scheduler (with task priority) handling wrapped tasks

查看更多
来,给爷笑一个
3楼-- · 2019-01-17 11:43

You could use Parallel.Foreach and rely on MaxDegreeOfParallelism instead.

Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10},
msg =>
{
     // logic
     Process(msg);
});
查看更多
我只想做你的唯一
4楼-- · 2019-01-17 11:45

think would be better to user Parallel LINQ

  Parallel.ForEach(messages ,
     new ParallelOptions{MaxDegreeOfParallelism = 4},
            x => Process(x);
        );

where x is max degree of Parallelism

查看更多
【Aperson】
5楼-- · 2019-01-17 11:52
 public static void RunTasks(List<NamedTask> importTaskList)
    {
        List<NamedTask> runningTasks = new List<NamedTask>();

        try
        {
            foreach (NamedTask currentTask in importTaskList)
            {
                currentTask.Start();
                runningTasks.Add(currentTask);

                if (runningTasks.Where(x => x.Status == TaskStatus.Running).Count() >= MaxCountImportThread)
                {
                    Task.WaitAny(runningTasks.ToArray());
                }
            }

            Task.WaitAll(runningTasks.ToArray());
        }
        catch (Exception ex)
        {
            Log.Fatal("ERROR!", ex);
        }
    }
查看更多
聊天终结者
6楼-- · 2019-01-17 11:58

If you need in-order queuing (processing might finish in any order), there is no need for a semaphore. Old fashioned if statements work fine:

        const int maxConcurrency = 5;
        List<Task> tasks = new List<Task>();
        foreach (var arg in args)
        {
            var t = Task.Run(() => { Process(arg); } );

            tasks.Add(t);

            if(tasks.Count >= maxConcurrency)
                Task.WaitAny(tasks.ToArray());
        }

        Task.WaitAll(tasks.ToArray());
查看更多
我命由我不由天
7楼-- · 2019-01-17 11:59

SemaphoreSlim is a very good solution in this case and I higly recommend OP to try this, but @Manoj's answer has flaw as mentioned in comments.semaphore should be waited before spawning the task like this.

Updated Answer: As @Vasyl pointed out Semaphore may be disposed before completion of tasks and will raise exception when Release() method is called so before exiting the using block must wait for the completion of all created Tasks.

int maxConcurrency=10;
var messages = new List<string>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
    List<Task> tasks = new List<Task>();
    foreach(var msg in messages)
    {
        concurrencySemaphore.Wait();

        var t = Task.Factory.StartNew(() =>
        {

            try
            {
                 Process(msg);
            }
            finally
            {
                concurrencySemaphore.Release();
            }
        });

        tasks.Add(t);
    }

    Task.WaitAll(tasks.ToArray());
}

Answer to Comments for those who want to see how semaphore can be disposed without Task.WaitAll Run below code in console app and this exception will be raised.

System.ObjectDisposedException: 'The semaphore has been disposed.'

static void Main(string[] args)
        {
            int maxConcurrency = 5;
            List<string> messages =  Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();

            using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
            {
                List<Task> tasks = new List<Task>();
                foreach (var msg in messages)
                {
                    concurrencySemaphore.Wait();

                    var t = Task.Factory.StartNew(() =>
                    {

                        try
                        {
                            Process(msg);
                        }
                        finally
                        {
                            concurrencySemaphore.Release();
                        }
                    });

                    tasks.Add(t);
                }

               // Task.WaitAll(tasks.ToArray());
            }
            Console.WriteLine("Exited using block");

            Console.ReadKey();
        }

        private static void Process(string msg)
        {            
            Thread.Sleep(2000);
            Console.WriteLine(msg);

        }
    }
查看更多
登录 后发表回答