I am experiencing very strange issues while working with the data inside my function that gets called by pool.map. For example, the following code works as expected...
import csv
import multiprocessing
import itertools
from collections import deque
cur_best = 0
d_sol = deque(maxlen=9)
d_names = deque(maxlen=9)
**import CSV Data1**
def calculate(vals):
#global cur_best
sol = sum(int(x[2]) for x in vals)
names = [x[0] for x in vals]
print(", ".join(names) + " = " + str(sol))
def process():
pool = multiprocessing.Pool(processes=4)
prod = itertools.product(([x[2], x[4], x[10]] for x in Data1))
result = pool.map_async(calculate, prod)
pool.close()
pool.join()
return result
process()
Now when I add a simple if-statement to calculate(), I get no output.
def calculate(vals):
#global cur_best
sol = sum(int(x[2]) for x in vals)
if sol > cur_best:
cur_best = sol
names = [x[0] for x in vals]
print(", ".join(names) + " = " + str(cur_best))
#would like to append cur_best and names to a deque
I have tried adjusting where I declare 'cur_best' to no avail.
I am trying to keep track of the 'current best' solution as I am running through the calculations. In my linear code, this logic resides in a nested for-loop and I append each new 'cur_best' to a deque.
Do my new issues relate to the way pool.map or pool.map_async work? Can I no longer treat my calculate() function as a linear loop?
There are several other conditional statements I need to address. Should I be handling this in a different part of the code? And if so, how exactly?
There are likely two things going on here. First, the reason you're not seeing anything printed from the worker function is probably because it's throwing an exception. Because you're using map_async
, you won't actually see the exception until you call result.get()
. However, since you're callnig close
/join
on the pool right after using map_async
, you should probably just use map
instead, which will block until all the work is complete (or an exception is thrown). I'm not sure why the exception is happening (nothing jumps out from the code you provided), but my guess would be you're pulling the wrong index from your list somewhere.
Second, as Armin Rigo pointed out, cur_best
is not shared between all processes, so your logic won't work the way you're intending. I think the easiest option is to use a multiprocessing.Value
to create an integer in shared memory, which will be accessible to all processes.
To append the results you're getting to a deque
, you would need to create shared deques, using a multiprocessing.Manager
. A Manager
spawns a server process that can manage shared access to an object (like a deque
). Each process in you pool (as well as the parent process) gets access to a Proxy
object, which can communicate with the Manager's process to read/write to the shared object.
Here's an example showing everything discussed above:
import itertools
import multiprocessing
from collections import deque
from multiprocessing.managers import BaseManager, MakeProxyType
class DequeManager(BaseManager):
pass
BaseDequeProxy = MakeProxyType('BaseDequeProxy', (
'__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
'__mul__', '__reversed__', '__rmul__', '__setitem__',
'append', 'count', 'extend', 'extendleft', 'index', 'insert', 'pop',
'remove', 'reverse', 'sort', 'appendleft', 'popleft', 'rotate',
'__imul__'
))
class DequeProxy(BaseDequeProxy):
def __iadd__(self, value):
self._callmethod('extend', (value,))
return self
def __imul__(self, value):
self._callmethod('__imul__', (value,))
return self
DequeManager.register('deque', deque, DequeProxy)
cur_best = d_sol = d_names = None
def init_globals(best, sol, names):
""" This will be called in each worker process.
A global variable (cur_best) will be created in each worker.
Because it is a multiprocessing.Value, it will be shared
between each worker, too.
"""
global cur_best, d_sol, d_names
cur_best = best
d_sol = sol
d_names = names
def calculate(vals):
global cur_best
sol = sum(int(x[2]) for x in vals)
if sol > cur_best.value:
cur_best.value = sol
names = [x[0] for x in vals]
print(", ".join(names) + " = " + str(cur_best.value))
d_sol.append(cur_best.value)
d_names.append(names)
return sol
def process():
global d_sol, d_names
cur_best = multiprocessing.Value("I", 0) # unsigned int
m = DequeManager()
m.start()
d_sol = m.deque(maxlen=9)
d_names = m.deque(maxlen=9)
pool = multiprocessing.Pool(processes=4, initializer=init_globals,
initargs=(cur_best, d_sol, d_names))
prod = itertools.product([x[2], x[4], x[10]] for x in Data1)
result = pool.map(calculate, prod) # map instead of map_async
pool.close()
pool.join()
return result # Result will be a list containing the value of `sol` returned from each worker call
if __name__ == "__main__":
print(process())