How to attach debugger to a python subproccess?

2019-01-30 19:48发布

问题:

I need to debug a child process spawned by multiprocessing.Process(). The pdb degugger seems to be unaware of forking and unable to attach to already running processes.

Are there any smarter python debuggers which can be attached to a subprocess?

回答1:

Winpdb is pretty much the definition of a smarter Python debugger. It explicitly supports going down a fork, not sure it works nicely with multiprocessing.Process() but it's worth a try.

For a list of candidates to check for support of your use case, see the list of Python Debuggers in the wiki.



回答2:

I've been searching for a simple to solution for this problem and came up with this:

import sys
import pdb

class ForkedPdb(pdb.Pdb):
    """A Pdb subclass that may be used
    from a forked multiprocessing child

    """
    def interaction(self, *args, **kwargs):
        _stdin = sys.stdin
        try:
            sys.stdin = open('/dev/stdin')
            pdb.Pdb.interaction(self, *args, **kwargs)
        finally:
            sys.stdin = _stdin

Use it the same way you might use the classic Pdb:

ForkedPdb().set_trace()


回答3:

This is an elaboration of Romuald's answer which restores the original stdin using its file descriptor. This keeps readline working inside the debugger. Besides, pdb special management of KeyboardInterrupt is disabled, in order it not to interfere with multiprocessing sigint handler.

class ForkablePdb(pdb.Pdb):

    _original_stdin_fd = sys.stdin.fileno()
    _original_stdin = None

    def __init__(self):
        pdb.Pdb.__init__(self, nosigint=True)

    def _cmdloop(self):
        current_stdin = sys.stdin
        try:
            if not self._original_stdin:
                self._original_stdin = os.fdopen(self._original_stdin_fd)
            sys.stdin = self._original_stdin
            self.cmdloop()
        finally:
            sys.stdin = current_stdin


回答4:

Building upon @memplex idea, I had to modify it to get it to work with joblib by setting the sys.stdin in the constructor as well as passing it directly along via joblib.

<!-- language: lang-py -->
import os
import pdb
import signal
import sys
import joblib

_original_stdin_fd = None

class ForkablePdb(pdb.Pdb):
    _original_stdin = None
    _original_pid = os.getpid()

    def __init__(self):
        pdb.Pdb.__init__(self)
        if self._original_pid != os.getpid():
            if _original_stdin_fd is None:
                raise Exception("Must set ForkablePdb._original_stdin_fd to stdin fileno")

            self.current_stdin = sys.stdin
            if not self._original_stdin:
                self._original_stdin = os.fdopen(_original_stdin_fd)
            sys.stdin = self._original_stdin

    def _cmdloop(self):
        try:
            self.cmdloop()
        finally:
            sys.stdin = self.current_stdin

def handle_pdb(sig, frame):
    ForkablePdb().set_trace(frame)

def test(i, fileno):
    global _original_stdin_fd
    _original_stdin_fd = fileno
    while True:
        pass    

if __name__ == '__main__':
    print "PID: %d" % os.getpid()
    signal.signal(signal.SIGUSR2, handle_pdb)
    ForkablePdb().set_trace()
    fileno = sys.stdin.fileno()
    joblib.Parallel(n_jobs=2)(joblib.delayed(test)(i, fileno) for i in range(10))


回答5:

An idea I had was to create "dummy" classes to fake the implementation of the methods you are using from multiprocessing:

from multiprocessing import Pool


class DummyPool():
    @staticmethod
    def apply_async(func, args, kwds):
        return DummyApplyResult(func(*args, **kwds))

    def close(self): pass
    def join(self): pass


class DummyApplyResult():
    def __init__(self, result):
        self.result = result

    def get(self):
        return self.result


def foo(a, b, switch):
    # set trace when DummyPool is used
    # import ipdb; ipdb.set_trace()
    if switch:
        return b - a
    else:
        return a - b


if __name__ == '__main__':
    xml = etree.parse('C:/Users/anmendoza/Downloads/jim - 8.1/running-config.xml')
    pool = DummyPool()  # switch between Pool() and DummyPool() here
    results = []
    results.append(pool.apply_async(foo, args=(1, 100), kwds={'switch': True}))
    pool.close()
    pool.join()
    results[0].get()


回答6:

If you are on a supported platform, try DTrace. Most of the BSD / Solaris / OS X family support DTrace.

Here is an intro by the author. You can use Dtrace to debug just about anything.

Here is a SO post on learning DTrace.