C# Parallel - Adding items to the collection being

2019-07-01 22:57发布

Right now, I've got a C# program that performs the following steps on a recurring basis:

  • Grab current list of tasks from the database
  • Using Parallel.ForEach(), do work for each task

However, some of these tasks are very long-running. This delays the processing of other pending tasks because we only look for new ones at the start of the program.

Now, I know that modifying the collection being iterated over isn't possible (right?), but is there some equivalent functionality in the C# Parallel framework that would allow me to add work to the list while also processing items in the list?

2条回答
▲ chillily
2楼-- · 2019-07-01 23:21

Generally speaking, you're right that modifying a collection while iterating it is not allowed. But there are other approaches you could be using:

  • Use ActionBlock<T> from TPL Dataflow. The code could look something like:

    var actionBlock = new ActionBlock<MyTask>(
        task => DoWorkForTask(task),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
    
    while (true)
    {
        var tasks = GrabCurrentListOfTasks();
        foreach (var task in tasks)
        {
            actionBlock.Post(task);
    
            await Task.Delay(someShortDelay);
            // or use Thread.Sleep() if you don't want to use async
        }
    }
    
  • Use BlockingCollection<T>, which can be modified while consuming items from it, along with GetConsumingParititioner() from ParallelExtensionsExtras to make it work with Parallel.ForEach():

    var collection = new BlockingCollection<MyTask>();
    
    Task.Run(async () =>
    {
        while (true)
        {
            var tasks = GrabCurrentListOfTasks();
            foreach (var task in tasks)
            {
                collection.Add(task);
    
                await Task.Delay(someShortDelay);
            }
        }
    });
    
    Parallel.ForEach(collection.GetConsumingPartitioner(), task => DoWorkForTask(task));
    
查看更多
我想做一个坏孩纸
3楼-- · 2019-07-01 23:32

Here is an example of an approach you could try. I think you want to get away from Parallel.ForEaching and do something with asynchronous programming instead because you need to retrieve results as they finish, rather than in discrete chunks that could conceivably contain both long running tasks and tasks that finish very quickly.

This approach uses a simple sequential loop to retrieve results from a list of asynchronous tasks. In this case, you should be safe to use a simple non-thread safe mutable list because all of the mutation of the list happens sequentially in the same thread.

Note that this approach uses Task.WhenAny in a loop which isn't very efficient for large task lists and you should consider an alternative approach in that case. (See this blog: http://blogs.msdn.com/b/pfxteam/archive/2012/08/02/processing-tasks-as-they-complete.aspx)

This example is based on: https://msdn.microsoft.com/en-GB/library/jj155756.aspx

private async Task<ProcessResult> processTask(ProcessTask task) 
{
    // do something intensive with data
}

private IEnumerable<ProcessTask> GetOutstandingTasks() 
{
    // retreive some tasks from db
}

private void ProcessAllData()
{
    List<Task<ProcessResult>> taskQueue = 
        GetOutstandingTasks()
        .Select(tsk => processTask(tsk))
        .ToList(); // grab initial task queue

    while(taskQueue.Any()) // iterate while tasks need completing
    {
        Task<ProcessResult> firstFinishedTask = await Task.WhenAny(taskQueue); // get first to finish
        taskQueue.Remove(firstFinishedTask); // remove the one that finished
        ProcessResult result = await firstFinishedTask; // get the result
        // do something with task result
        taskQueue.AddRange(GetOutstandingTasks().Select(tsk => processData(tsk))) // add more tasks that need performing
    }
}
查看更多
登录 后发表回答