python no output when using pool.map_async

2019-04-15 12:15发布

问题:

I am experiencing very strange issues while working with the data inside my function that gets called by pool.map. For example, the following code works as expected...

import csv
import multiprocessing
import itertools
from collections import deque

cur_best = 0
d_sol = deque(maxlen=9)
d_names = deque(maxlen=9)

**import CSV Data1**

def calculate(vals):
    #global cur_best
    sol = sum(int(x[2]) for x in vals)
    names = [x[0] for x in vals]
    print(", ".join(names) + " = " + str(sol))

def process():
    pool = multiprocessing.Pool(processes=4)
    prod = itertools.product(([x[2], x[4], x[10]] for x in Data1))
    result = pool.map_async(calculate, prod)
    pool.close()
    pool.join()
    return result

process()

Now when I add a simple if-statement to calculate(), I get no output.

   def calculate(vals):
        #global cur_best
        sol = sum(int(x[2]) for x in vals)
        if sol > cur_best:
             cur_best = sol
             names = [x[0] for x in vals]
             print(", ".join(names) + " = " + str(cur_best))
             #would like to append cur_best and names to a deque

I have tried adjusting where I declare 'cur_best' to no avail.

I am trying to keep track of the 'current best' solution as I am running through the calculations. In my linear code, this logic resides in a nested for-loop and I append each new 'cur_best' to a deque.

Do my new issues relate to the way pool.map or pool.map_async work? Can I no longer treat my calculate() function as a linear loop?

There are several other conditional statements I need to address. Should I be handling this in a different part of the code? And if so, how exactly?

回答1:

There are likely two things going on here. First, the reason you're not seeing anything printed from the worker function is probably because it's throwing an exception. Because you're using map_async, you won't actually see the exception until you call result.get(). However, since you're callnig close/join on the pool right after using map_async, you should probably just use map instead, which will block until all the work is complete (or an exception is thrown). I'm not sure why the exception is happening (nothing jumps out from the code you provided), but my guess would be you're pulling the wrong index from your list somewhere.

Second, as Armin Rigo pointed out, cur_best is not shared between all processes, so your logic won't work the way you're intending. I think the easiest option is to use a multiprocessing.Value to create an integer in shared memory, which will be accessible to all processes.

To append the results you're getting to a deque, you would need to create shared deques, using a multiprocessing.Manager. A Manager spawns a server process that can manage shared access to an object (like a deque). Each process in you pool (as well as the parent process) gets access to a Proxy object, which can communicate with the Manager's process to read/write to the shared object.

Here's an example showing everything discussed above:

import itertools
import multiprocessing
from collections import deque
from multiprocessing.managers import BaseManager, MakeProxyType

class DequeManager(BaseManager):
   pass

BaseDequeProxy = MakeProxyType('BaseDequeProxy', (
    '__add__', '__contains__', '__delitem__', '__getitem__', '__len__',
    '__mul__', '__reversed__', '__rmul__', '__setitem__',
    'append', 'count', 'extend', 'extendleft', 'index', 'insert', 'pop', 
    'remove', 'reverse', 'sort', 'appendleft', 'popleft', 'rotate', 
    '__imul__'
    ))
class DequeProxy(BaseDequeProxy):
    def __iadd__(self, value):
        self._callmethod('extend', (value,))
        return self
    def __imul__(self, value):
        self._callmethod('__imul__', (value,))
        return self

DequeManager.register('deque', deque, DequeProxy)


cur_best = d_sol = d_names = None

def init_globals(best, sol, names):
    """ This will be called in each worker process. 

    A global variable (cur_best) will be created in each worker.
    Because it is a multiprocessing.Value, it will be shared
    between each worker, too.

    """
    global cur_best, d_sol, d_names
    cur_best = best
    d_sol = sol
    d_names = names

def calculate(vals):
    global cur_best
    sol = sum(int(x[2]) for x in vals)
    if sol > cur_best.value:
        cur_best.value = sol
        names = [x[0] for x in vals]
        print(", ".join(names) + " = " + str(cur_best.value))
        d_sol.append(cur_best.value)
        d_names.append(names)
    return sol

def process():
    global d_sol, d_names
    cur_best = multiprocessing.Value("I", 0)  # unsigned int

    m = DequeManager()
    m.start()
    d_sol = m.deque(maxlen=9)
    d_names = m.deque(maxlen=9)  

    pool = multiprocessing.Pool(processes=4, initializer=init_globals, 
                                initargs=(cur_best, d_sol, d_names))
    prod = itertools.product([x[2], x[4], x[10]] for x in Data1)
    result = pool.map(calculate, prod)  # map instead of map_async
    pool.close()
    pool.join()
    return result  # Result will be a list containing the value of `sol` returned from each worker call

if __name__ == "__main__":    
    print(process())