Error with multiprocessing, atexit and global data

2019-05-05 17:02发布

问题:

Sorry in advance, this is going to be long ...

Possibly related:

Python Multiprocessing atexit Error "Error in atexit._run_exitfuncs"

Definitely related:

python parallel map (multiprocessing.Pool.map) with global data

Keyboard Interrupts with python's multiprocessing Pool

Here's a "simple" script I hacked together to illustrate my problem...

import time
import multiprocessing as multi
import atexit

cleanup_stuff=multi.Manager().list([])

##################################################
# Some code to allow keyboard interrupts  
##################################################
was_interrupted=multi.Manager().list([])
class _interrupt(object):
    """
    Toy class to allow retrieval of the interrupt that triggered it's execution
    """
    def __init__(self,interrupt):
        self.interrupt=interrupt

def interrupt():
    was_interrupted.append(1)

def interruptable(func):
    """
    decorator to allow functions to be "interruptable" by
    a keyboard interrupt when in python's multiprocessing.Pool.map
    **Note**, this won't actually cause the Map to be interrupted,
    It will merely cause the following functions to be not executed.
    """
    def newfunc(*args,**kwargs):
        try:
            if(not was_interrupted):
                return func(*args,**kwargs)
            else:
                return False
        except KeyboardInterrupt as e:
            interrupt()
            return _interrupt(e)  #If we really want to know about the interrupt...
    return newfunc

@atexit.register
def cleanup():
    for i in cleanup_stuff:
        print(i)
    return

@interruptable
def func(i):
    print(i)
    cleanup_stuff.append(i)
    time.sleep(float(i)/10.)
    return i

#Must wrap func here, otherwise it won't be found in __main__'s dict
#Maybe because it was created dynamically using the decorator?
def wrapper(*args):
    return func(*args)


if __name__ == "__main__":

    #This is an attempt to use signals -- I also attempted something similar where
    #The signals were only caught in the child processes...Or only on the main process...
    #
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,onSigInt)

    #Try 2 with signals (only catch signal on main process)
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,onSigInt)
    #def startup(): signal.signal(signal.SIGINT,signal.SIG_IGN)
    #p=multi.Pool(processes=4,initializer=startup)

    #Try 3 with signals (only catch signal on child processes)
    #import signal
    #def onSigInt(*args): interrupt()
    #signal.signal(signal.SIGINT,signal.SIG_IGN)
    #def startup(): signal.signal(signal.SIGINT,onSigInt)
    #p=multi.Pool(processes=4,initializer=startup)


    p=multi.Pool(4)
    try:
        out=p.map(wrapper,range(30))
        #out=p.map_async(wrapper,range(30)).get()  #This doesn't work either...

        #The following lines don't work either
        #Effectively trying to roll my own p.map() with p.apply_async 
        # results=[p.apply_async(wrapper,args=(i,)) for i in range(30)]
        # out = [ r.get() for r in results() ]
    except KeyboardInterrupt:
        print ("Hello!")
        out=None
    finally:
        p.terminate()
        p.join()

    print (out)

This works just fine if no KeyboardInterrupt is raised. However, if I raise one, the following exception occurs:

10
7
9
12
^CHello!
None
Error in atexit._run_exitfuncs:
Traceback (most recent call last):
  File "/usr/lib/python2.6/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "test.py", line 58, in cleanup
    for i in cleanup_stuff:
  File "<string>", line 2, in __getitem__
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod
    self._connect()
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client
   c = SocketClient(address)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient
   s.connect(address)
  File "<string>", line 1, in connect
error: [Errno 2] No such file or directory
Error in sys.exitfunc:
Traceback (most recent call last):
  File "/usr/lib/python2.6/atexit.py", line 24, in _run_exitfuncs
    func(*targs, **kargs)
  File "test.py", line 58, in cleanup
    for i in cleanup_stuff:
  File "<string>", line 2, in __getitem__
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod
    self._connect()
  File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client
    c = SocketClient(address)
  File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient
    s.connect(address)
  File "<string>", line 1, in connect
socket.error: [Errno 2] No such file or directory

Interestingly enough, the code does exit the Pool.map function without calling any of the additional functions ... The problem seems to be that the KeyboardInterrupt isn't handled properly at some point, but it is a little confusing where that is, and why it isn't handled in interruptable. Thanks.

Note, the same problem happens if I use out=p.map_async(wrapper,range(30)).get()

EDIT 1

A little closer ... If I enclose the out=p.map(...) in a try,except,finally clause, it gets rid of the first exception ... the other ones are still raised in atexit however. The code and traceback above have been updated.

EDIT 2

Something else that does not work has been added to the code above as a comment. (Same error). This attempt was inspired by:

http://jessenoller.com/2009/01/08/multiprocessingpool-and-keyboardinterrupt/

EDIT 3

Another failed attempt using signals added to the code above.

EDIT 4

I have figured out how to restructure my code so that the above is no longer necessary. In the (unlikely) event that someone stumbles upon this thread with the same use-case that I had, I will describe my solution ...

Use Case

I have a function which generates temporary files using the tempfile module. I would like those temporary files to be cleaned up when the program exits. My initial attempt was to pack each temporary file name into a list and then delete all the elements of the list with a function registered via atexit.register. The problem is that the updated list was not being updated across multiple processes. This is where I got the idea of using multiprocessing.Manager to manage the list data. Unfortunately, this fails on a KeyboardInterrupt no matter how hard I tried because the communication sockets between processes were broken for some reason. The solution to this problem is simple. Prior to using multiprocessing, set the temporary file directory ... something like tempfile.tempdir=tempfile.mkdtemp() and then register a function to delete the temporary directory. Each of the processes writes to the same temporary directory, so it works. Of course, this solution only works where the shared data is a list of files that needs to be deleted at the end of the program's life.