Why does the localInit Func get called multiple ti

2019-04-26 09:09发布

问题:

I was writing some code to process a lot of data, and I thought it would be useful to have Parallel.ForEach create a file for each thread it creates so the output doesn't need to be synchronized (by me at least).

It looks something like this:

Parallel.ForEach(vals,
    new ParallelOptions { MaxDegreeOfParallelism = 8 },
    ()=>GetWriter(), // returns a new BinaryWriter backed by a file with a guid name
    (item, state, writer)=>
    {
        if(something)
        {
            state.Break();
            return writer;
        }
        List<Result> results = new List<Result>();

        foreach(var subItem in item.SubItems)
            results.Add(ProcessItem(subItem));

        if(results.Count > 0)
        {
            foreach(var result in results)
                result.Write(writer);
        }
        return writer;
    },
    (writer)=>writer.Dispose());

What I expected to happen was that up to 8 files would be created and would persist through the entire run time. Then each would be Disposed when the entire ForEach call finishes. What really happens is that the localInit seems to be called once for each item, so I end up with hundreds of files. The writers are also getting disposed at the end of each item that is processed.

This shows the same thing happening:

var vals = Enumerable.Range(0, 10000000).ToArray();
        long sum = 0;
        Parallel.ForEach(vals,
            new ParallelOptions { MaxDegreeOfParallelism = 8 },
            () => { Console.WriteLine("init " + Thread.CurrentThread.ManagedThreadId); return 0L; },
            (i, state, common) =>
            {
                Thread.Sleep(10);
                return common + i;
            },
                (common) => Interlocked.Add(ref sum, common));

I see:

init 10
init 14
init 11
init 13
init 12
init 14
init 11
init 12
init 13
init 11
... // hundreds of lines over < 30 seconds
init 14
init 11
init 18
init 17
init 10
init 11
init 14
init 11
init 14
init 11
init 18

Note: if I leave out the Thread.Sleep call, it sometimes seems to function "correctly". localInit only gets called once each for the 4 threads that it decides to use on my pc. Not every time, however.

Is this the desired behavior of the function? What's going on behind the scenes that causes it to do this? And lastly, what's a good way to get my desired functionality, ThreadLocal?

This is on .NET 4.5, by the way.

回答1:

Parallel.ForEach does not work as you think it does. It's important to note that the method is build on top of Task classes and that the relationship between Task and Thread is not 1:1. You can have, for example, 10 tasks that run on 2 managed threads.

Try using this line in your method body instead of the current one:

Console.WriteLine("ThreadId {0} -- TaskId {1} ",
                  Thread.CurrentThread.ManagedThreadId, Task.CurrentId);

You should see that the ThreadId will be reused across many different tasks, shown by their unique ids. You'll see this more if you left in, or increased, your call to Thread.Sleep.

The (very) basic idea of how the Parallel.ForEach method works, is that it takes your enumerable creates a series of tasks that will run process sections of the enumeration, the way this is done depends a lot on the input. There is also some special logic that checks for the case of a task exceeding a certain number of milliseconds without completing. If that case is true, then a new task may be spawned to help relieve the work.

If you looked at the documentation for the localinit function in Parallel.ForEach, you'll notice that it says that it returns the initial state of the local data for each _task_, not each thread.

You might ask why there are more than 8 tasks being spawned. That answer is similar to the last, found in the documentation for ParallelOptions.MaxDegreeOfParallelism.

Changing MaxDegreeOfParallelism from the default only limits how many concurrent tasks will be used.

This limit is only on the number of concurrent tasks, not a hard-limit on the number of tasks that will be created during the entire time it is processing. And as I mentioned above, there are times where a separate task will be spawned, which results in your localinit function being called multiple times and writing hundreds of files to disk.

Writing to disk is certainly a operation with a bit of latency, particularly if you're using synchronous I/O. When the disk operation happens, it blocks the entire thread; the same happens with Thread.Sleep. If a Task does this, it will block the thread it is currently running on, and no other tasks can run on it. Usually in these cases, the scheduler will spawn a new Task to help pick up the slack.

And lastly, what's a good way to get my desired functionality, ThreadLocal?

The bottom line is that thread locals don't make sense with Parallel.ForEach because you're not dealing with threads; you're dealing with tasks. A thread local could be shared between tasks because many tasks can use the same thread at the same time. Also, a task's thread local could change mid-execution, because the scheduler could preempt it from running and then continue its execution on a different thread, which would have a different thread local.

I'm not sure the best way to do it, but you could rely on the localinit function to pass in whatever resource you'd like, only allowing a resource to be used in one thread at a time. You can use the localfinally to mark it as no longer in use and thus available for another task to acquire. This is what those methods were designed for; each method is only called once per task that is spawned (see the remarks section of the Parallel.ForEach MSDN documentation).

You can also split the work yourself, and create your own set of threads and run your work. However, this is less idea, in my opinion, since the Parallel class already does this heavy lifting for you.



回答2:

What you're seeing is the implementation trying to get your work done as quickly as possible.

To do this, it tries using different numbers of tasks to maximize throughput. It grabs a certain number of threads from the thread pool and runs your work for a bit. It then tries adding and removing threads to see what happens. It continues doing this until all your work is done.

The algorithm is quite dumb in that it doesn't know if your work is using a lot of CPU, or a lot of IO, or even if there is a lot of synchronization and the threads are blocking each other. All it can do is add and remove threads and measure how fast each unit of work completes.

This means it is continually calling your localInit and localFinally functions as it injects and retires threads - which is what you have found.

Unfortunately, there is no easy way to control this algorithm. Parallel.ForEach is a high-level construct that intentionally hides much of the thread-management code.


Using a ThreadLocal might help a bit, but it relies on the fact that the thread pool will reuse the same threads when Parallel.ForEach asks for new ones. This is not guarenteed - in fact, it is unlikely that the thread pool will use exactly 8 threads for the whole call. This means you will again be creating more files than necessary.


One thing that is guaranteed is that Parallel.ForEach will never use more than MaxDegreeOfParallelism threads at any one time.

You can use this to your advantage by creating a fixed-size "pool" of files that can be re-used by whichever threads are running at a particular time. You know that only MaxDegreeOfParallelism threads can run at once, so you can create that number of files before calling ForEach. Then grab one in your localInit and release it in your localFinally.

Of course, you will have to write this pool yourself and it must be thread-safe as it will be called concurrently. A simple locking strategy should be good enough, though, because threads are not injected and retired very quickly compared to the cost of a lock.



回答3:

According to MSDN the localInit method is called once for each task, not for each thread:

The localInit delegate is invoked once for each task that participates in the loop's execution and returns the initial local state for each of those tasks.



回答4:

localInit called when thread created. if body takes so long it must create another thread and suspends current thread, and if it creates another thread, it calls localInit

also when Parallel.ForEach called it creates threads as much as MaxDegreeOfParallelism value for example:

var k = Enumerable.Range(0, 1);
Parallel.ForEach(k,new ParallelOptions(){MaxDegreeOfParallelism = 4}.....

it create 4 thread when first it called