I'd like to use multiprocessing.Value
+ multiprocessing.Lock
to share a counter between separate processes. For example:
import itertools as it
import multiprocessing
def func(x, val, lock):
for i in range(x):
i ** 2
with lock:
val.value += 1
print('counter incremented to:', val.value)
if __name__ == '__main__':
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
with multiprocessing.Pool() as pool:
pool.starmap(func, ((i, v, lock) for i in range(25)))
print(counter.value())
This will throw the following exception:
RuntimeError: Synchronized objects should only be shared between processes through inheritance
What I am most confused by is that a related (albeit not completely analogous) pattern works with multiprocessing.Process()
:
if __name__ == '__main__':
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
procs = [multiprocessing.Process(target=func, args=(i, v, lock))
for i in range(25)]
for p in procs: p.start()
for p in procs: p.join()
Now, I recognize that these are two different markedly things:
- the first example uses a number of worker processes equal to
cpu_count()
, and splits an iterablerange(25)
between them - the second example creates 25 worker processes and tasks each with one input
That said: how can I share an instance with pool.starmap()
(or pool.map()
) in this manner?
I've seen similar questions here, here, and here, but those approaches doesn't seem to be suited to .map()
/.starmap()
, regarldess of whether Value
uses ctypes.c_int
.
I realize that this approach technically works:
def func(x):
for i in range(x):
i ** 2
with lock:
v.value += 1
print('counter incremented to:', v.value)
v = None
lock = None
def set_global_counter_and_lock():
"""Egh ... """
global v, lock
if not any((v, lock)):
v = multiprocessing.Value('i', 0)
lock = multiprocessing.Lock()
if __name__ == '__main__':
# Each worker process will call `initializer()` when it starts.
with multiprocessing.Pool(initializer=set_global_counter_and_lock) as pool:
pool.map(func, range(25))
Is this really the best-practices way of going about this?
The
RuntimeError
you get when usingPool
is because arguments for pool-methods are pickled before being send over a (pool-internal) queue to the worker processes. Which pool-method you are trying to use is irrelevant here. This doesn't happen when you just useProcess
because there is no queue involved. You can reproduce the error just withpickle.dumps(multiprocessing.Value('i', 0))
.Your last code snippet doesn't work how you think it works. You are not sharing a
Value
, you are recreating independent counters for every child process.In case you were on Unix and used the default start-method "fork", you would be done with just not passing the shared objects as arguments into the pool-methods. Your child-processes would inherit the globals through forking. With process-start-methods "spawn" (default Windows and macOS with Python 3.8+) or "forkserver", you'll have to use the
initializer
duringPool
instantiation, to let the child-processes inherit the shared objects.Note, you don't need an extra
multiprocessing.Lock
here, becausemultiprocessing.Value
comes by default with an internal one you can use.Probably worth noting as well is that you don't need the counter to be shared in all cases. If you just need to keep track of how often something has happened in total, an option would be to keep separate worker-local counters during computation which you sum up at the end. This could result in a significant performance improvement for frequent counter updates for which you don't need synchronization during the parallel computation itself.