Sharing a counter with multiprocessing.Pool

2020-03-29 08:15发布

问题:

I'd like to use multiprocessing.Value + multiprocessing.Lock to share a counter between separate processes. For example:

import itertools as it
import multiprocessing

def func(x, val, lock):
    for i in range(x):
        i ** 2
    with lock:
        val.value += 1
        print('counter incremented to:', val.value)

if __name__ == '__main__':
    v = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    with multiprocessing.Pool() as pool:
        pool.starmap(func, ((i, v, lock) for i in range(25)))
    print(counter.value())

This will throw the following exception:

RuntimeError: Synchronized objects should only be shared between processes through inheritance

What I am most confused by is that a related (albeit not completely analogous) pattern works with multiprocessing.Process():

if __name__ == '__main__':
    v = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()

    procs = [multiprocessing.Process(target=func, args=(i, v, lock))
             for i in range(25)]
    for p in procs: p.start()
    for p in procs: p.join()

Now, I recognize that these are two different markedly things:

  • the first example uses a number of worker processes equal to cpu_count(), and splits an iterable range(25) between them
  • the second example creates 25 worker processes and tasks each with one input

That said: how can I share an instance with pool.starmap() (or pool.map()) in this manner?

I've seen similar questions here, here, and here, but those approaches doesn't seem to be suited to .map()/.starmap(), regarldess of whether Value uses ctypes.c_int.


I realize that this approach technically works:

def func(x):
    for i in range(x):
        i ** 2
    with lock:
        v.value += 1
        print('counter incremented to:', v.value)

v = None
lock = None

def set_global_counter_and_lock():
    """Egh ... """
    global v, lock
    if not any((v, lock)):
        v = multiprocessing.Value('i', 0)
        lock = multiprocessing.Lock()

if __name__ == '__main__':
    # Each worker process will call `initializer()` when it starts.
    with multiprocessing.Pool(initializer=set_global_counter_and_lock) as pool:
        pool.map(func, range(25))

Is this really the best-practices way of going about this?

回答1:

The RuntimeError you get when using Pool is because arguments for pool-methods are pickled before being send over a (pool-internal) queue to the worker processes. Which pool-method you are trying to use is irrelevant here. This doesn't happen when you just use Process because there is no queue involved. You can reproduce the error just with pickle.dumps(multiprocessing.Value('i', 0)).

Your last code snippet doesn't work how you think it works. You are not sharing a Value, you are recreating independent counters for every child process.

In case you were on Unix and used the default start-method "fork", you would be done with just not passing the shared objects as arguments into the pool-methods. Your child-processes would inherit the globals through forking. With process-start-methods "spawn" (default Windows and macOS with Python 3.8+) or "forkserver", you'll have to use the initializer during Pool instantiation, to let the child-processes inherit the shared objects.

Note, you don't need an extra multiprocessing.Lock here, because multiprocessing.Value comes by default with an internal one you can use.

import os
from multiprocessing import Pool, Value #, set_start_method


def func(x):
    for i in range(x):
        assert i == i
        with cnt.get_lock():
            cnt.value += 1
            print(f'{os.getpid()} | counter incremented to: {cnt.value}\n')


def init_globals(counter):
    global cnt
    cnt = counter


if __name__ == '__main__':

    # set_start_method('spawn')

    cnt = Value('i', 0)
    iterable = [10000 for _ in range(10)]

    with Pool(initializer=init_globals, initargs=(cnt,)) as pool:
        pool.map(func, iterable)

    assert cnt.value == 100000

Probably worth noting as well is that you don't need the counter to be shared in all cases. If you just need to keep track of how often something has happened in total, an option would be to keep separate worker-local counters during computation which you sum up at the end. This could result in a significant performance improvement for frequent counter updates for which you don't need synchronization during the parallel computation itself.