Python pathos Process Pool non-daemonic?

2019-09-19 13:29发布

How can I implement non-daemonic processes with pathos in python3 instead of with the multiprocessing module?

To be more specific, I am referring to: Python Process Pool non-daemonic?

The answer to this post implements non-daemonic processes via the multiprocessing module. Unfortunately, this module does not allow to pickle lambda functions among other objects, but pathos does in Python 2:

#import multiprocessing
#import multiprocessing.pool
import pathos

#class NoDaemonProcess(multiprocessing.Process):
class NoDaemonProcess(pathos.multiprocessing.Pool.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

#class NoDaemonPool(multiprocessing.pool.Pool):
class NoDaemonPool(pathos.multiprocessing.Pool):
    Process = NoDaemonProcess

def myproc(args):
    i, max_workers = args
    #pool = multiprocessing.Pool(max_workers)
    pool = pathos.pools.ProcessPool(max_workers)
    l_args = [j for j in range(i)]
    mysubproc = lambda x : x
    print("myproc", l_args, pool.map(mysubproc, l_args))
    return i

max_workers = [2, 1]
executor = NoDaemonPool(max_workers[0])
#executor = pathos.multiprocessing.Pool(max_workers[0])
l_args = [(i, max_workers[1]) for i in range(10)]
print(executor.map(myproc, l_args))

output:

('myproc', [], [])
('myproc', [0, 1], [0, 1])
('myproc', [0], [0])
('myproc', [0, 1, 2], [0, 1, 2])
('myproc', [0, 1, 2, 3], [0, 1, 2, 3])
('myproc', [0, 1, 2, 3, 4, 5], [0, 1, 2, 3, 4, 5])
('myproc', [0, 1, 2, 3, 4], [0, 1, 2, 3, 4])
('myproc', [0, 1, 2, 3, 4, 5, 6], [0, 1, 2, 3, 4, 5, 6])
('myproc', [0, 1, 2, 3, 4, 5, 6, 7], [0, 1, 2, 3, 4, 5, 6, 7])
('myproc', [0, 1, 2, 3, 4, 5, 6, 7, 8], [0, 1, 2, 3, 4, 5, 6, 7, 8])
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In Python 3, the pathos module has changed w.r.t. Python 2, e.g., pathos.multiprocessing.Pool.Process is not a class anymore, but a function, so one can no longer use it for inheritance (see above). - any pathos documentation I am missing?

How can I make the above code work in pathos in Python 3?

As a work-around for the above particular example, one can simply fall back to the multiprocessing NoDaemonPool implementation, and use pathos for the daemon sub-processes:

import multiprocessing
import multiprocessing.pool
import pathos

class NoDaemonProcess(multiprocessing.Process):
#class NoDaemonProcess(pathos.multiprocessing.Pool.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonPool(multiprocessing.pool.Pool):
#class NoDaemonPool(pathos.multiprocessing.Pool):
    Process = NoDaemonProcess

def myproc(args):
    i, max_workers = args
    #pool = multiprocessing.Pool(max_workers)
    pool = pathos.pools.ProcessPool(max_workers)
    l_args = [j for j in range(i)]
    mysubproc = lambda x : x
    print("myproc", l_args, pool.map(mysubproc, l_args))
    return i

max_workers = [2, 1]
executor = NoDaemonPool(max_workers[0])
#executor = pathos.multiprocessing.Pool(max_workers[0])
l_args = [(i, max_workers[1]) for i in range(10)]
print(executor.map(myproc, l_args))

However, this work-around is not a solution because (i) its imports both pathos and multiprocessing, and even more importantly (ii) it won't be able to pickle, e.g., if myproc instead is defined as

myproc = lambda x : x

Thank you very much, Best, Sebastian

1条回答
一纸荒年 Trace。
2楼-- · 2019-09-19 13:44

I just found the answer myself for Python 3 taking the original idea from

Python Process Pool non-daemonic?

and keeping a clean process error handling as suggested in

How to run nested, hierarchical pathos multiprocessing maps?

import pathos
import signal
import sys
import os
import time

# redefine process pool via inheritance
import multiprocess.context as context
class NoDaemonProcess(context.Process):
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonPool(pathos.multiprocessing.Pool):
    def Process(self, *args, **kwds):
        return NoDaemonProcess(*args, **kwds)

def get_pid_i(x):
    return os.getpid()
def hard_kill_pool(pid_is, pool):
    for pid_i in pid_is:
        os.kill(pid_i, signal.SIGINT)  # sending Ctrl+C
    pool.terminate()

def myproc(args):
    i, max_workers = args
    l_args = [j for j in range(i)]
    mysubproc = lambda x : x
    pool = pathos.pools.ProcessPool(max_workers)
    pool.restart(force=True)
    pid_is = pool.map(get_pid_i, range(max_workers))
    try:
        l_traj_df = pool.amap(mysubproc, l_args)
        counter_i = 0
        while not l_traj_df.ready():
            time.sleep(1)
            if counter_i % 30 == 0:
                print('Waiting for children running in pool.amap() in myproc( {} ) with PIDs: {}'.format(i, pid_is))
            counter_i += 1
        l_traj_df = l_traj_df.get()
        pool.close()
        pool.join()
    except KeyboardInterrupt:
        print('Ctrl+C received in myproc( {} ), attempting to terminate pool...').format(myens)
        hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise
    except:
        print('Attempting to close parallel after exception: {} in myproc( {} )'.format(sys.exc_info()[0], myens))
        hard_kill_pool(pid_is, pool)  # sending Ctrl+C
        raise

#myproc = lambda x : x

max_workers = [2, 1]
pool = NoDaemonPool(max_workers[0])
#pool.restart(force=True)
pid_is = pool.map(get_pid_i, range(max_workers[0]))
try:
    results = pool.map_async(myproc, l_args)
    counter_i = 0
    while not results.ready():
        if counter_i % 30 == 0:
            print('Waiting for children running in pool.map_async() in main() with PIDs: {}'.format(pid_is))
        time.sleep(2)
        counter_i += 1
    results = results.get()
    pool.close()
    pool.join()
except KeyboardInterrupt:
    print('Ctrl+C received in main(), attempting to terminate pool...')
    hard_kill_pool(pid_is, pool)  # sending Ctrl+C
    raise
except:
    print('Attempting to close parallel after exception: {} in main()'.format(_sys.exc_info()[0]))
    hard_kill_pool(pid_is, pool)  # sending Ctrl+C
    raise

output:

Waiting for children running in pool.map_async() in main() with PIDs [15015, 15014]
Waiting for children running in pool.amap() in myproc( 2 ) with PIDs [15019]
Waiting for children running in pool.amap() in myproc( 1 ) with PIDs: [15020]
Waiting for children running in pool.amap() in myproc( 3 ) with PIDs [15021]
Waiting for children running in pool.amap() in myproc( 4 ) with PIDs [15022]
Waiting for children running in pool.amap() in myproc( 6 ) with PIDs [15024]
Waiting for children running in pool.amap() in myproc( 5 ) with PIDs [15023]
Waiting for children running in pool.amap() in myproc( 7 ) with PIDs [15025]
Waiting for children running in pool.amap() in myproc( 8 ) with PIDs [15026]
Waiting for children running in pool.amap() in myproc( 9 ) with PIDs [15028]

I used the following module versions:

python                    3.6.0                         0
pathos                    0.2.1                    py36_1    condo-forge
multiprocess              0.70.4                   py36_0    http://conda.binstar.org/omnia
dill                      0.2.7.1          py36h644ae93_0  
pox                       0.2.3                    py36_0    conda-forge
ppft                      1.6.4.7.1                py36_0    conda-forge
six                       1.10.0                   py36_0  
查看更多
登录 后发表回答