Purpose of multiprocessing.Pool.apply and multipro

2020-04-18 07:40发布

问题:

See example and execution result below:

#!/usr/bin/env python3.4
from multiprocessing import Pool
import time
import os

def initializer():
    print("In initializer pid is {} ppid is {}".format(os.getpid(),os.getppid()))

def f(x):
    print("In f pid is {} ppid is {}".format(os.getpid(),os.getppid()))
    return x*x

if __name__ == '__main__':
    print("In main pid is {} ppid is {}".format(os.getpid(), os.getppid()))
    with Pool(processes=4, initializer=initializer) as pool:  # start 4 worker processes
        result = pool.apply(f, (10,)) # evaluate "f(10)" in a single process
        print(result)

        #result = pool.apply_async(f, (10,)) # evaluate "f(10)" in a single process
        #print(result.get())

Gives:

$ ./pooleg.py
In main pid is 22783 ppid is 19542
In initializer pid is 22784 ppid is 22783
In initializer pid is 22785 ppid is 22783
In initializer pid is 22787 ppid is 22783
In f pid is 22784 ppid is 22783
In initializer pid is 22786 ppid is 22783
100

As is clear from the output: 4 processes were created but only one of them actually did the work (called f).

Question: Why would I create a pool of > 1 workers and call apply() when the work f is done only by one process ? And same thing for apply_async() because in that case also the work is only done by one worker.

I don't understand the use cases in which these functions are useful.

回答1:

First off, both are meant to operate on argument-tuples (single function calls), contrary to the Pool.map variants which operate on iterables. So it's not an error when you observe only one process used when you call these functions only once.


You would use Pool.apply_async instead of one of the Pool.map versions, where you need more fine grained control over the single tasks you want to distribute.

The Pool.map versions take an iterable and chunk them into tasks, where every task has the same (mapped) target function. Pool.apply_async typically isn't called only once with a pool of >1 workers. Since it's asynchronous, you can iterate over manually pre-bundled tasks and submit them to several worker-processes before any of them has completed. Your task-list here can consist of different target functions like you can see in this answer here. It also allows registering callbacks for results and errors like in this example.

These properties make Pool.apply_async pretty versatile and a first-choice tool for unusual problem scenarios you cannot get done with one of the Pool.map versions.


Pool.apply indeed is not widely usefull at first sight (and second). You could use it to synchronize control flow in a scenario where you start up multiple tasks with apply_async first and then have a task which has to be completed before you fire up another round of tasks with apply_async.

Using Pool.apply could also just mean sparing you to create a single extra Process for an in-between task, when you already have a pool which is currently idling.



回答2:

This line in your code:

Pool(processes=4, initializer=initializer) as pool:  # start 4 worker processes

doesn't start 4 worker processes. It just create a pool of them than can support running that many of them at a time running concurrently. It's methods like apply() that actually start separate processes running.

The difference is that apply() and apply_async() is that the former blocks until a result is ready but the latter returns a "result" object right away. This doesn't make much difference unless you want to submit more than one task to the Pool at a time (which of course is the whole point of using the multiprocessing module).

Here's are some modifications to your code showing how to actually do some concurrent processing with the Pool:

from multiprocessing import Pool
import time
import os

def initializer():
    print("In initializer pid is {} ppid is {}".format(os.getpid(),os.getppid()))

def f(x):
    print("In f pid is {} ppid is {}".format(os.getpid(),os.getppid()))
    return x*x

if __name__ == '__main__':
    print("In main pid is {} ppid is {}".format(os.getpid(), os.getppid()))
    with Pool(processes=4, initializer=initializer) as pool:  # Create 4 worker Pool.
#        result = pool.apply(f, (10,)) # evaluate "f(10)" in a single process
#        print(result)
        # Start multiple tasks.
        tasks = [pool.apply_async(f, (val,)) for val in range(10, 20)]
        pool.close()  # No more tasks.
        pool.join()  # Wait for all tasks to finish.
        results = [result.get() for result in tasks]  # Get the result of each.
        print(results)

map_sync() would be better suited for processing something like this (a sequence of values) as it will handle some of the details shown in the above code automatically.