How to run nested, hierarchical pathos multiproces

2019-02-25 23:50发布

问题:

Having build a significant part of my code on dill serialization/pickling, I'm also trying to use pathos multiprocessing to parallelize my calculations. Pathos it is a natural extension of dill.

When trying to run nested

from pathos.multiprocessing import ProcessingPool
ProcessingPool().map(fn, args)

inside an other ProcessingPool().map, then I receive:

AssertionError: daemonic processes are not allowed to have children

E.g.:

from pathos.multiprocessing import ProcessingPool

def triple(x):
    return 3*x

def refork(x):
    from pathos.multiprocessing import ProcessingPool
    return ProcessingPool().map(triple, xrange(5))

ProcessingPool().map(refork, xrange(3))

yields

AssertionError: daemonic processes are not allowed to have children

I tried using amap(...).get() without success. This is on pathos 0.2.0.

What is the best way to allow for nested parallelization?

Update

I have to be honest at this point, and confess that I have removed the assertion "daemonic processes are not allowed to have children" from pathos. I also built something which cascades KeyboardInterrupt to workers and workers of those... Parts of the solution below:

def run_parallel(exec_func, exec_args, num_workers_i)
    pool = ProcessingPool(num_workers_i)
    pool.restart(force=True)
    pid_is = pool.map(get_pid_i, xrange(num_workers_i))
    try:
        results = pool.amap(
            exec_func,
            exec_args,
        )
        counter_i = 0
        while not results.ready():
            sleep(2)
            if counter_i % 60 == 0:
                print('Waiting for children running in pool.amap() with PIDs: {}'.format(pid_is))
            counter_i += 1
        results = results.get()
        pool.close()
        pool.join()
    except KeyboardInterrupt:
        print('Ctrl+C received, attempting to terminate pool...')
        hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise
    except:
        print('Attempting to close parallel after exception: {}'.format(sys.exc_info()[0]))
        cls.hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise


def hard_kill_pool(pid_is, pool):
    for pid_i in pid_is:
        os.kill(pid_i, signal.SIGINT)  # sending Ctrl+C
    pool.terminate()

Seems to work from console and IPython notebook (with stop button), but not sure it's 100% correct in all corner cases.

回答1:

I encountered exactly the same issue. In my case, The inner operation was the one that needed parallelism so I did a ThreadingPool of a ProcessingPool. Here it is with your example:

from pathos.multiprocessing import ProcessingPool, ThreadingPool

def triple(x):
    return 3*x

def refork(x):
    from pathos.multiprocessing import ProcessingPool
    return ProcessingPool().map(triple, xrange(5))

ThreadingPool().map(refork, xrange(3))

You can even have another layer with another outer threading pool. Depending on your case, you can invert the order of these pools. However, you cannot have processes of processes. If really needed, see: https://stackoverflow.com/a/8963618/6522112. I haven't try it yet myself so I can't elaborate on this.