Suppose you are using a multiprocessing.Pool
object, and you are using the initializer
setting of the constructor to pass an initializer function that then creates a resource in the global namespace. Assume resource has a context manager. How would you handle the life-cycle of the context managed resource provided it has to live through the life of the process, but be properly cleaned up at the end?
So far, I have something somewhat like this:
resource_cm = None
resource = None
def _worker_init(args):
global resource
resource_cm = open_resource(args)
resource = resource_cm.__enter__()
From here on, the pool processes can use the resource. So far so good. But handling clean up is a bit trickier, since the multiprocessing.Pool
class does not provide a destructor
or deinitializer
argument.
One of my ideas is to use the atexit
module, and register the clean up in the initializer. Something like this:
def _worker_init(args):
global resource
resource_cm = open_resource(args)
resource = resource_cm.__enter__()
def _clean_up():
resource_cm.__exit__()
import atexit
atexit.register(_clean_up)
Is this a good approach? Is there an easier way of doing this?
EDIT: atexit
does not seem to work. At least not in the way I am using it above, so as of right now I still do not have a solution for this problem.
You can subclass
Process
and override itsrun()
method so that it performs cleanup before exit. Then you should subclassPool
so that it uses your subclassed process:First, this is a really great question! After digging around a bit in the
multiprocessing
code, I think I've found a way to do this:When you start a
multiprocessing.Pool
, internally thePool
object creates amultiprocessing.Process
object for each member of the pool. When those sub-processes are starting up, they call a_bootstrap
function, which looks like this:The
run
method is what actually runs thetarget
you gave theProcess
object. For aPool
process that's a method with a long-running while loop that waits for work items to come in over an internal queue. What's really interesting for us is what happened afterself.run
:util._exit_function()
is called.As it turns out, that function does some clean up that sounds a lot like what you're looking for:
Here's the docstring of
_run_finalizers
:The method actually runs through a list of finalizer callbacks and executes them:
Perfect. So how do we get into the
_finalizer_registry
? There's an undocumented object calledFinalize
inmultiprocessing.util
that is responsible for adding a callback to the registry:Ok, so putting it all together into an example:
Output:
As you can see
__exit__
gets called in all our workers when wejoin()
the pool.