Make Parallel.ForEach wait to get work until a slo

2019-05-23 16:03发布

问题:

I'm using Parallel.ForEach to work a bunch of items. The problem is, I want to prioritize which items get worked depending on the number of workers (slots) that are open. E.g. if I am working 8 parallel things and a slot opens between task 1-4, I want to assign easy work to those slots. The bottom half of the slots will get the hard work. This way, I won't get all 8 slots tied up doing hard/long-running work, easy/quick items will be run first. I've implemented this as follows:

The Code

const int workers = 8;
List<Thing> thingsToDo = ...; //Get the things that need to be done.
Thing[] currentlyWorkingThings = new Thing[workers]; //One slot for each worker.

void Run() {
    Parallel.ForEach(PrioritizeThings(thingsToDo), o => {
        int index = 0;

        //"PrioritizeTasks" added this thing to the list of currentlyWorkingThings.
        //Find my position in this list.
        lock (currentlyWorkingThings)
            index = currentlyWorkingThings.IndexOf(o);

        //Do work on this thing...

        //Then remove it from the list of currently working things, thereby
        //  opening a new slot when this worker returns/finishes.
        lock (currentlyWorkingThings)
            currentlyWorkingThings[index] = null;
    });
}

IEnumerable<Thing> PrioritizeThings(List<Thing> thingsToDo) {
    int slots = workers;
    int halfSlots = (int)Math.Ceiling(slots / 2f);

    //Sort thingsToDo by their difficulty, easiest first.

    //Loop until we've worked every Thing.
    while (thingsToDo.Count > 0) {
        int slotToFill = ...; //Find the first open slot.
        Thing nextThing = null;

        lock (currentlyWorkingThings) {
            //If the slot is in the "top half", get the next easy thing - otherwise
            //  get the next hard thing.
            if (slotToFill < halfSlots)
                nextThing = thingsToDo.First();
            else
                nextThing = thingsToDo.Last();

            //Add the nextThing to the list of currentlyWorkingThings and remove it from
            //  the list of thingsToDo.
            currentlyWorkingThings[slotToFill] = nextThing;
            thingsToDo.Remove(nextThing);
        }

        //Return the nextThing to work.
        yield return nextThing;
    }
}

The Problem

So the issue I'm seeing here is that Parallel is requesting the next thing to work on from PrioritizeThings before a slot has opened (before an existing thing has been completed). I assume that Parallel is looking ahead and getting things to work ready in advance. I'd like it to not do this, and only fill a worker/slot when it is completely done. The only way I've thought of to fix this is to add a sleep/wait loop in PrioritizeThings which won't return a thing to work until it sees a legitimate open slot. But I don't like that and I was hoping that there was some way to make Parallel wait longer before getting work. Any suggestions?

回答1:

There is a way built in (kinda) to support exactly the situation you are describing.

When you create the ForEach you will need to pass in a ParallelOptions with a non-standard TaskScheduler. The hard part is creating a TaskSchedueler to do that priority system for you, fortunately Microsoft released a pack of examples that contains one such scheduler called "ParallelExtensionsExtras" with its scheduler QueuedTaskScheduler

private static void Main(string[] args)
{
    int totalMaxConcurrancy = Environment.ProcessorCount;
    int highPriorityMaxConcurrancy = totalMaxConcurrancy / 2;

    if (highPriorityMaxConcurrancy == 0)
        highPriorityMaxConcurrancy = 1;

    QueuedTaskScheduler qts = new QueuedTaskScheduler(TaskScheduler.Default, totalMaxConcurrancy);
    var highPriortiyScheduler = qts.ActivateNewQueue(0);
    var lowPriorityScheduler = qts.ActivateNewQueue(1);

    BlockingCollection<Foo> highPriorityWork = new BlockingCollection<Foo>();
    BlockingCollection<Foo> lowPriorityWork = new BlockingCollection<Foo>();

    List<Task> processors = new List<Task>(2);

    processors.Add(Task.Factory.StartNew(() =>
    {
        Parallel.ForEach(highPriorityWork.GetConsumingPartitioner(),  //.GetConsumingPartitioner() is also from ParallelExtensionExtras, it gives better performance than .GetConsumingEnumerable() with Parallel.ForEeach(
                         new ParallelOptions() { TaskScheduler = highPriortiyScheduler, MaxDegreeOfParallelism = highPriorityMaxConcurrancy }, 
                         ProcessWork);
    }, TaskCreationOptions.LongRunning));

    processors.Add(Task.Factory.StartNew(() =>
    {
        Parallel.ForEach(lowPriorityWork.GetConsumingPartitioner(), 
                         new ParallelOptions() { TaskScheduler = lowPriorityScheduler}, 
                         ProcessWork);
    }, TaskCreationOptions.LongRunning));


    //Add some work to do here to the highPriorityWork or lowPriorityWork collections


    //Lets the blocking collections know we are no-longer going to be adding new items so it will break out of the `ForEach` once it has finished the pending work.
    highPriorityWork.CompleteAdding();
    lowPriorityWork.CompleteAdding();

    //Waits for the two collections to compleatly empty before continueing
    Task.WaitAll(processors.ToArray());
}

private static void ProcessWork(Foo work)
{
    //...
}

Even though you have two instances of Parallel.ForEach running the combined total of both of them will not use more than the value you passed in for MaxConcurrency in to the QueuedTaskScheduler constructor and it will give preference to emptying the highPriorityWork collection first if there is work to do in both (up to a limit of 1/2 of all of the available slots so that you don't choke the low priority queue, you could easily adjust this to be a higher or lower ratio depending on your performance needs).

If you don't want the high priority to always win and you rather have a "round-robin" style scheduler that alternates between the two lists (so you don't want the quick items to always win, but just have them shuffled in with the slow items) you can set the same priority level to two or more queues (or just use the RoundRobinTaskSchedulerQueue which does the same thing)