可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I need to debug a child process spawned by multiprocessing.Process()
. The pdb
degugger seems to be unaware of forking and unable to attach to already running processes.
Are there any smarter python debuggers which can be attached to a subprocess?
回答1:
Winpdb is pretty much the definition of a smarter Python debugger. It explicitly supports going down a fork, not sure it works nicely with multiprocessing.Process()
but it's worth a try.
For a list of candidates to check for support of your use case, see the list of Python Debuggers in the wiki.
回答2:
I've been searching for a simple to solution for this problem and came up with this:
import sys
import pdb
class ForkedPdb(pdb.Pdb):
"""A Pdb subclass that may be used
from a forked multiprocessing child
"""
def interaction(self, *args, **kwargs):
_stdin = sys.stdin
try:
sys.stdin = open('/dev/stdin')
pdb.Pdb.interaction(self, *args, **kwargs)
finally:
sys.stdin = _stdin
Use it the same way you might use the classic Pdb:
ForkedPdb().set_trace()
回答3:
This is an elaboration of Romuald's answer which restores the original stdin using its file descriptor. This keeps readline working inside the debugger. Besides, pdb special management of KeyboardInterrupt is disabled, in order it not to interfere with multiprocessing sigint handler.
class ForkablePdb(pdb.Pdb):
_original_stdin_fd = sys.stdin.fileno()
_original_stdin = None
def __init__(self):
pdb.Pdb.__init__(self, nosigint=True)
def _cmdloop(self):
current_stdin = sys.stdin
try:
if not self._original_stdin:
self._original_stdin = os.fdopen(self._original_stdin_fd)
sys.stdin = self._original_stdin
self.cmdloop()
finally:
sys.stdin = current_stdin
回答4:
Building upon @memplex idea, I had to modify it to get it to work with joblib
by setting the sys.stdin
in the constructor as well as passing it directly along via joblib.
<!-- language: lang-py -->
import os
import pdb
import signal
import sys
import joblib
_original_stdin_fd = None
class ForkablePdb(pdb.Pdb):
_original_stdin = None
_original_pid = os.getpid()
def __init__(self):
pdb.Pdb.__init__(self)
if self._original_pid != os.getpid():
if _original_stdin_fd is None:
raise Exception("Must set ForkablePdb._original_stdin_fd to stdin fileno")
self.current_stdin = sys.stdin
if not self._original_stdin:
self._original_stdin = os.fdopen(_original_stdin_fd)
sys.stdin = self._original_stdin
def _cmdloop(self):
try:
self.cmdloop()
finally:
sys.stdin = self.current_stdin
def handle_pdb(sig, frame):
ForkablePdb().set_trace(frame)
def test(i, fileno):
global _original_stdin_fd
_original_stdin_fd = fileno
while True:
pass
if __name__ == '__main__':
print "PID: %d" % os.getpid()
signal.signal(signal.SIGUSR2, handle_pdb)
ForkablePdb().set_trace()
fileno = sys.stdin.fileno()
joblib.Parallel(n_jobs=2)(joblib.delayed(test)(i, fileno) for i in range(10))
回答5:
An idea I had was to create "dummy" classes to fake the implementation of the methods you are using from multiprocessing:
from multiprocessing import Pool
class DummyPool():
@staticmethod
def apply_async(func, args, kwds):
return DummyApplyResult(func(*args, **kwds))
def close(self): pass
def join(self): pass
class DummyApplyResult():
def __init__(self, result):
self.result = result
def get(self):
return self.result
def foo(a, b, switch):
# set trace when DummyPool is used
# import ipdb; ipdb.set_trace()
if switch:
return b - a
else:
return a - b
if __name__ == '__main__':
xml = etree.parse('C:/Users/anmendoza/Downloads/jim - 8.1/running-config.xml')
pool = DummyPool() # switch between Pool() and DummyPool() here
results = []
results.append(pool.apply_async(foo, args=(1, 100), kwds={'switch': True}))
pool.close()
pool.join()
results[0].get()
回答6:
If you are on a supported platform, try DTrace. Most of the BSD / Solaris / OS X family support DTrace.
Here is an intro by the author. You can use Dtrace to debug just about anything.
Here is a SO post on learning DTrace.