Catch a thread's exception in the caller threa

2019-01-01 06:17发布

I'm very new to Python and multithreaded programming in general. Basically, I have a script that will copy files to another location. I would like this to be placed in another thread so I can output .... to indicate that the script is still running.

The problem that I am having is that if the files cannot be copied it will throw an exception. This is ok if running in the main thread; however, having the following code does not work:

try:
    threadClass = TheThread(param1, param2, etc.)
    threadClass.start()   ##### **Exception takes place here**
except:
    print "Caught an exception"

In the thread class itself, I tried to re-throw the exception, but it does not work. I have seen people on here ask similar questions, but they all seem to be doing something more specific than what I am trying to do (and I don't quite understand the solutions offered). I have seen people mention the usage of sys.exc_info(), however I do not know where or how to use it.

All help is greatly appreciated!

EDIT: The code for the thread class is below:

class TheThread(threading.Thread):
    def __init__(self, sourceFolder, destFolder):
        threading.Thread.__init__(self)
        self.sourceFolder = sourceFolder
        self.destFolder = destFolder

    def run(self):
        try:
           shul.copytree(self.sourceFolder, self.destFolder)
        except:
           raise

13条回答
初与友歌
2楼-- · 2019-01-01 06:57

There are a lot of really weirdly complicated answers to this question. Am I oversimplifying this, because this seems sufficient for most things to me.

from threading import Thread

class PropagatingThread(Thread):
    def run(self):
        self.exc = None
        try:
            if hasattr(self, '_Thread__target'):
                # Thread uses name mangling prior to Python 3.
                self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
            else:
                self.ret = self._target(*self._args, **self._kwargs)
        except BaseException as e:
            self.exc = e

    def join(self):
        super(PropagatingThread, self).join()
        if self.exc:
            raise self.exc
        return self.ret

If you're certain you'll only ever be running on one or the other version of Python, you could reduce the the run() method down to just the mangled version (if you'll only be running on versions of Python before 3), or just the clean version (if you'll only be running on versions of Python starting with 3).

Example usage:

def f(*args, **kwargs)
    print(args)
    print(kwargs)
    raise Exception('I suck')

t = PropagatingThread(target=f, args=(5,), kwargs={'hello':'world'})
t.start()
t.join()

And you'll see the exception raised on the other thread when you join.

查看更多
柔情千种
3楼-- · 2019-01-01 06:59

concurrent.futures.as_completed

https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed

The following solution:

  • returns to the main thread immediately when an exception is called
  • requires no extra user defined classes because it does not need:
    • an explicit Queue
    • to add an except else around your work thread

Source:

#!/usr/bin/env python3

import concurrent.futures
import time

def func_that_raises(do_raise):
    for i in range(3):
        print(i)
        time.sleep(0.1)
    if do_raise:
        raise Exception()
    for i in range(3):
        print(i)
        time.sleep(0.1)

with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    futures = []
    futures.append(executor.submit(func_that_raises, False))
    futures.append(executor.submit(func_that_raises, True))
    for future in concurrent.futures.as_completed(futures):
        print(repr(future.exception()))

Possible output:

0
0
1
1
2
2
0
Exception()
1
2
None

It is unfortunately not possible to kill futures to cancel the others as one fails:

If you do something like:

for future in concurrent.futures.as_completed(futures):
    if future.exception() is not None:
        raise future.exception()

then the with catches it, and waits for the second thread to finish before continuing. The following behaves similarly:

for future in concurrent.futures.as_completed(futures):
    future.result()

since future.result() re-raises the exception if one occurred.

If you want to quit the entire Python process, you might get away with os._exit(0), but this likely means you need a refactor.

Tested on Python 3.6.7, Ubuntu 18.04.

查看更多
情到深处是孤独
4楼-- · 2019-01-01 06:59

A simple way of catching thread's exception and communicating back to the caller method could be by passing dictionary or a list to worker method.

Example (passing dictionary to worker method):

import threading

def my_method(throw_me):
    raise Exception(throw_me)

def worker(shared_obj, *args, **kwargs):
    try:
        shared_obj['target'](*args, **kwargs)
    except Exception as err:
        shared_obj['err'] = err

shared_obj = {'err':'', 'target': my_method}
throw_me = "Test"

th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()

if shared_obj['err']:
    print(">>%s" % shared_obj['err'])
查看更多
弹指情弦暗扣
5楼-- · 2019-01-01 07:00

This was a nasty little problem, and I'd like to throw my solution in. Some other solutions I found (async.io for example) looked promising but also presented a bit of a black box. The queue / event loop approach sort of ties you to a certain implementation. The concurrent futures source code, however, is around only 1000 lines, and easy to comprehend. It allowed me to easily solve my problem: create ad-hoc worker threads without much setup, and to be able to catch exceptions in the main thread.

My solution uses the concurrent futures API and threading API. It allows you to create a worker which gives you both the thread and the future. That way, you can join the thread to wait for the result:

worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())

...or you can let the worker just send a callback when done:

worker = Worker(test)
thread = worker.start(lambda x: print('callback', x))

...or you can loop until the event completes:

worker = Worker(test)
thread = worker.start()

while True:
    print("waiting")
    if worker.future.done():
        exc = worker.future.exception()
        print('exception?', exc)
        result = worker.future.result()
        print('result', result)           
        break
    time.sleep(0.25)

Here's the code:

from concurrent.futures import Future
import threading
import time

class Worker(object):
    def __init__(self, fn, args=()):
        self.future = Future()
        self._fn = fn
        self._args = args

    def start(self, cb=None):
        self._cb = cb
        self.future.set_running_or_notify_cancel()
        thread = threading.Thread(target=self.run, args=())
        thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
        thread.start()
        return thread

    def run(self):
        try:
            self.future.set_result(self._fn(*self._args))
        except BaseException as e:
            self.future.set_exception(e)

        if(self._cb):
            self._cb(self.future.result())

...and the test function:

def test(*args):
    print('args are', args)
    time.sleep(2)
    raise Exception('foo')
查看更多
有味是清欢
6楼-- · 2019-01-01 07:03

If an exception occurs in a thread, the best way is to re-raise it in the caller thread during join. You can get information about the exception currently being handled using the sys.exc_info() function. This information can simply be stored as a property of the thread object until join is called, at which point it can be re-raised.

Note that a Queue.Queue (as suggested in other answers) is not necessary in this simple case where the thread throws at most 1 exception and completes right after throwing an exception. We avoid race conditions by simply waiting for the thread to complete.

For example, extend ExcThread (below), overriding excRun (instead of run).

Python 2.x:

import threading

class ExcThread(threading.Thread):
  def excRun(self):
    pass

  def run(self):
    self.exc = None
    try:
      # Possibly throws an exception
      self.excRun()
    except:
      import sys
      self.exc = sys.exc_info()
      # Save details of the exception thrown but don't rethrow,
      # just complete the function

  def join(self):
    threading.Thread.join(self)
    if self.exc:
      msg = "Thread '%s' threw an exception: %s" % (self.getName(), self.exc[1])
      new_exc = Exception(msg)
      raise new_exc.__class__, new_exc, self.exc[2]

Python 3.x:

The 3 argument form for raise is gone in Python 3, so change the last line to:

raise new_exc.with_traceback(self.exc[2])
查看更多
路过你的时光
7楼-- · 2019-01-01 07:05

Although it is not possible to directly catch an exception thrown in a different thread, here's a code to quite transparently obtain something very close to this functionality. Your child thread must subclass the ExThread class instead of threading.Thread and the parent thread must call the child_thread.join_with_exception() method instead of child_thread.join() when waiting for the thread to finish its job.

Technical details of this implementation: when the child thread throws an exception, it is passed to the parent through a Queue and thrown again in the parent thread. Notice that there's no busy waiting in this approach .

#!/usr/bin/env python

import sys
import threading
import Queue

class ExThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)
        self.__status_queue = Queue.Queue()

    def run_with_exception(self):
        """This method should be overriden."""
        raise NotImplementedError

    def run(self):
        """This method should NOT be overriden."""
        try:
            self.run_with_exception()
        except BaseException:
            self.__status_queue.put(sys.exc_info())
        self.__status_queue.put(None)

    def wait_for_exc_info(self):
        return self.__status_queue.get()

    def join_with_exception(self):
        ex_info = self.wait_for_exc_info()
        if ex_info is None:
            return
        else:
            raise ex_info[1]

class MyException(Exception):
    pass

class MyThread(ExThread):
    def __init__(self):
        ExThread.__init__(self)

    def run_with_exception(self):
        thread_name = threading.current_thread().name
        raise MyException("An error in thread '{}'.".format(thread_name))

def main():
    t = MyThread()
    t.start()
    try:
        t.join_with_exception()
    except MyException as ex:
        thread_name = threading.current_thread().name
        print "Caught a MyException in thread '{}': {}".format(thread_name, ex)

if __name__ == '__main__':
    main()
查看更多
登录 后发表回答