why is more than one worker used in `multiprocessi

2019-05-29 02:07发布

问题:

Problem

From the multiprocessing.Pool docs:

apply_async(func ...): A variant of the apply() method which returns a result object. ...

Reading further ...

apply(func[, args[, kwds]]): Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks, apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

The last bold line suggests only one worker from a pool is used. I find this is only true under certain conditions.

Given

Here is code that executes Pool.apply_async() in three similar cases. In all cases, the process id is printed.

import os
import time
import multiprocessing as mp


def blocking_func(x, delay=0.1):
    """Return a squared argument after some delay."""
    time.sleep(delay)                                  # toggle comment here
    return x*x, os.getpid()


def apply_async():
    """Return a list applying func to one process with a callback."""
    pool = mp.Pool()
    # Case 1: From the docs
    results = [pool.apply_async(os.getpid, ()) for _ in range(10)]
    results = [res.get(timeout=1) for res in results]
    print("Docs    :", results)

    # Case 2: With delay
    results = [pool.apply_async(blocking_func, args=(i,)) for i in range(10)]
    results = [res.get(timeout=1)[1] for res in results]
    print("Delay   :", results)

    # Case 3: Without delay
    results = [pool.apply_async(blocking_func, args=(i, 0)) for i in range(10)]
    results = [res.get(timeout=1)[1] for res in results]
    print("No delay:", results)

    pool.close()
    pool.join()


if __name__ == '__main__':
    apply_async()

Results

The example from the docs (Case 1) confirms only one worker is run. We extend this example in the next cases by applying blocking_func, which blocks with some delay.

Commenting the time.sleep() line in blocking_func() brings all cases in agreement.

# Time commented
# 1. Docs    : [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]
# 2. Delay   : [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]
# 3. No delay: [8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208, 8208]

Each call to apply_async() creates a new process pool, which is why new process ids differ from the latter.

# Time uncommented
# 1. Docs    : [6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780, 6780]
# 2. Delay   : [6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112]
# 3. No delay: [6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112, 6780, 2112]

However when time.sleep() is uncommented, even with zero delay, more than one worker is used.

In short, uncommented we expect one worker as in Case 1, but we get multiple workers as in Cases 2 and 3.

Question

Although I expect only one worker to be used by Pool().apply_async(), why are more than one used when time.sleep() is uncommented? Should blocking even effect the number of workers used by apply or apply_async?

Note: previous, related questions ask "why is only one worker used?" This question asks the opposite - "why isn't only one worker used?" I'm using 2 cores on a Windows machine.

回答1:

Your confusion seems to come from thinking [pool.apply_async(...) for i in range(10)] is one call, when there are really ten independent calls. A call to any pool-method is a "job". A job generally can lead to one or multiple tasks being distributed. apply-methods always produce only a single task under the hood. A task is an indivisible unit of work which will be received as a whole by a random pool-worker.

There's only one shared inqueue, all workers are fed over. Which idling worker will be woken up from waiting to get() a task from that queue is up to the OS. Your result-entropy for case 1 is still somewhat surprising and probably very lucky, at least unless you confirm you have only two cores.

And yes, your observation for this run is also influenced by the computation time needed for a task, since threads (the scheduled execution unit within a process) usually are scheduled with time slicing policies (e.g. ~20ms for Windows).



回答2:

Only one worker is used for that call. A single apply_async cannot be executed in two workers. That doesn't prevent multiple apply_async calls from being executed in different workers. Such a restriction would run completely counter to the point of having a process pool at all.



回答3:

Spurred by @Darkonaut's comment, I inspected further and observed the blocking function was too fast. I tested the latter code, modified, with a new intensive blocking function.

Code

The new blocking function iteratively computes Fibonacci numbers. An optional argument can be passed in to broaden the range and compute larger numbers.

def blocking_func(n, offset=0):
    """Return an intensive result via Fibonacci number."""
    n += offset
    a, b = 0, 1
    for _ in range(n):
        a, b = b, a + b
    return a, os.getpid()


def blocking_func(n, offset=0):
    """Return an intensive result via recursive fibonacci number."""
    func = blocking_func
    n += offset
    if n <= 1:
        return n, os.getpid()
    return func(n-1) + func(n-2)

if __name__ == '__main__':        
    start = time.time()
    apply_async()
    end = time.time()
    print(f"Duration : {end - start:.2f}s")

Demo

Passing in a large integer (100000) to the offset parameter, e.g. ...[pool.apply_async(blocking_func, args=(i, 100000)) ...] and running the code, we are more reliably able to trigger process switching.

# Results
Docs     : [10032, 10032, 10032, 10032, 10032, 10032, 10032, 10032, 10032, 10032]
Offset   : [10032, 8268, 10032, 8268, 10032, 8268, 10032, 8268, 10032, 8268]
Duration : 1.67s

It's intriguing to note that 100k Fibonacci numbers are asynchronously being computed 10 times in less than 2 seconds. By contrast, using a recursive implementation of Fibonacci would be comparably intensive at ~30 iterations (not shown).