Why is multiprocessing running things in the same

2019-05-15 11:33发布

I run the following solution from How can I recover the return value of a function passed to multiprocessing.Process?:

import multiprocessing
from os import getpid

def worker(procnum):
    print('I am number %d in process %d' % (procnum, getpid()))
    return getpid()

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes = 3)
    print(pool.map(worker, range(5)))

which is supposed to output something like:

I am number 0 in process 19139
I am number 1 in process 19138
I am number 2 in process 19140
I am number 3 in process 19139
I am number 4 in process 19140
[19139, 19138, 19140, 19139, 19140]

but instead I only get

[4212, 4212, 4212, 4212, 4212]

If I feed pool.map a range of 1,000,000 using more than 10 processes I see at most two different pids.

Why is my copy of multiprocessing seemingly running everything in the same process?

1条回答
爷的心禁止访问
2楼-- · 2019-05-15 11:56

TL;DR: tasks are not specifically distributed in any way, perhaps your tasks are so short they are all completed before the other processes get started.

From looking at the source of multiprocessing, it appears that tasks are simply put in a Queue, which the worker processes read from (function worker reads from Pool._inqueue). There's no calculated distribution going on, the workers just race to work as hard as possible.

The most likely bet then, would be that as the tasks are simply very short, so one process finishes all of them before the others have a chance to look or even get started. You can easily check if this is the case this by adding a two-second sleep to the task.

I'll note that on my machine, the tasks all get spread over the processes pretty homogeneously (also for #processes > #cores). So there seems to be some system-dependence, even though all processes should have .start()ed before work is queued.


Here's some trimmed source from worker, which shows that the tasks are just read from the queue by each process, so in pseudo-random order:

def worker(inqueue, outqueue, ...):
    ...
    get = inqueue.get
    ...
    while maxtasks is None or (maxtasks and completed < maxtasks):
        try:
            task = get()
        ...

SimpleQueue communicates between processes using a Pipe, from the SimpleQueue constructor:

self._reader, self._writer = Pipe(duplex=False)

EDIT: possibly the part about processes starting too slow is false, so I removed it. All processes are .start()ed before any work is queued (which may be platform-dependent). I can't find whether the process is ready at the moment .start() returns.

查看更多
登录 后发表回答