可以将文章内容翻译成中文,广告屏蔽插件可能会导致该功能失效(如失效,请关闭广告屏蔽插件后再试):
问题:
I\'m very new to Python and multithreaded programming in general. Basically, I have a script that will copy files to another location. I would like this to be placed in another thread so I can output ....
to indicate that the script is still running.
The problem that I am having is that if the files cannot be copied it will throw an exception. This is ok if running in the main thread; however, having the following code does not work:
try:
threadClass = TheThread(param1, param2, etc.)
threadClass.start() ##### **Exception takes place here**
except:
print \"Caught an exception\"
In the thread class itself, I tried to re-throw the exception, but it does not work. I have seen people on here ask similar questions, but they all seem to be doing something more specific than what I am trying to do (and I don\'t quite understand the solutions offered). I have seen people mention the usage of sys.exc_info()
, however I do not know where or how to use it.
All help is greatly appreciated!
EDIT: The code for the thread class is below:
class TheThread(threading.Thread):
def __init__(self, sourceFolder, destFolder):
threading.Thread.__init__(self)
self.sourceFolder = sourceFolder
self.destFolder = destFolder
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except:
raise
回答1:
The problem is that thread_obj.start()
returns immediately. The child thread that you spawned executes in its own context, with its own stack. Any exception that occurs there is in the context of the child thread, and it is in its own stack. One way I can think of right now to communicate this information to the parent thread is by using some sort of message passing, so you might look into that.
Try this on for size:
import sys
import threading
import Queue
class ExcThread(threading.Thread):
def __init__(self, bucket):
threading.Thread.__init__(self)
self.bucket = bucket
def run(self):
try:
raise Exception(\'An error occured here.\')
except Exception:
self.bucket.put(sys.exc_info())
def main():
bucket = Queue.Queue()
thread_obj = ExcThread(bucket)
thread_obj.start()
while True:
try:
exc = bucket.get(block=False)
except Queue.Empty:
pass
else:
exc_type, exc_obj, exc_trace = exc
# deal with the exception
print exc_type, exc_obj
print exc_trace
thread_obj.join(0.1)
if thread_obj.isAlive():
continue
else:
break
if __name__ == \'__main__\':
main()
回答2:
The concurrent.futures
module makes it simple to do work in separate threads (or processes) and handle any resulting exceptions:
import concurrent.futures
import shutil
def copytree_with_dots(src_path, dst_path):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
# Execute the copy on a separate thread,
# creating a future object to track progress.
future = executor.submit(shutil.copytree, src_path, dst_path)
while future.running():
# Print pretty dots here.
pass
# Return the value returned by shutil.copytree(), None.
# Raise any exceptions raised during the copy process.
return future.result()
concurrent.futures
is included with Python 3.2, and is available as the backported futures
module for earlier versions.
回答3:
Although it is not possible to directly catch an exception thrown in a different thread, here\'s a code to quite transparently obtain something very close to this functionality. Your child thread must subclass the ExThread
class instead of threading.Thread
and the parent thread must call the child_thread.join_with_exception()
method instead of child_thread.join()
when waiting for the thread to finish its job.
Technical details of this implementation: when the child thread throws an exception, it is passed to the parent through a Queue
and thrown again in the parent thread. Notice that there\'s no busy waiting in this approach .
#!/usr/bin/env python
import sys
import threading
import Queue
class ExThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.__status_queue = Queue.Queue()
def run_with_exception(self):
\"\"\"This method should be overriden.\"\"\"
raise NotImplementedError
def run(self):
\"\"\"This method should NOT be overriden.\"\"\"
try:
self.run_with_exception()
except BaseException:
self.__status_queue.put(sys.exc_info())
self.__status_queue.put(None)
def wait_for_exc_info(self):
return self.__status_queue.get()
def join_with_exception(self):
ex_info = self.wait_for_exc_info()
if ex_info is None:
return
else:
raise ex_info[1]
class MyException(Exception):
pass
class MyThread(ExThread):
def __init__(self):
ExThread.__init__(self)
def run_with_exception(self):
thread_name = threading.current_thread().name
raise MyException(\"An error in thread \'{}\'.\".format(thread_name))
def main():
t = MyThread()
t.start()
try:
t.join_with_exception()
except MyException as ex:
thread_name = threading.current_thread().name
print \"Caught a MyException in thread \'{}\': {}\".format(thread_name, ex)
if __name__ == \'__main__\':
main()
回答4:
If an exception occurs in a thread, the best way is to re-raise it in the caller thread during join
. You can get information about the exception currently being handled using the sys.exc_info()
function. This information can simply be stored as a property of the thread object until join
is called, at which point it can be re-raised.
Note that a Queue.Queue
(as suggested in other answers) is not necessary in this simple case where the thread throws at most 1 exception and completes right after throwing an exception. We avoid race conditions by simply waiting for the thread to complete.
For example, extend ExcThread
(below), overriding excRun
(instead of run
).
Python 2.x:
import threading
class ExcThread(threading.Thread):
def excRun(self):
pass
def run(self):
self.exc = None
try:
# Possibly throws an exception
self.excRun()
except:
import sys
self.exc = sys.exc_info()
# Save details of the exception thrown but don\'t rethrow,
# just complete the function
def join(self):
threading.Thread.join(self)
if self.exc:
msg = \"Thread \'%s\' threw an exception: %s\" % (self.getName(), self.exc[1])
new_exc = Exception(msg)
raise new_exc.__class__, new_exc, self.exc[2]
Python 3.x:
The 3 argument form for raise
is gone in Python 3, so change the last line to:
raise new_exc.with_traceback(self.exc[2])
回答5:
There are a lot of really weirdly complicated answers to this question. Am I oversimplifying this, because this seems sufficient for most things to me.
from threading import Thread
class PropagatingThread(Thread):
def run(self):
self.exc = None
try:
if hasattr(self, \'_Thread__target\'):
# Thread uses name mangling prior to Python 3.
self.ret = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
else:
self.ret = self._target(*self._args, **self._kwargs)
except BaseException as e:
self.exc = e
def join(self):
super(PropagatingThread, self).join()
if self.exc:
raise self.exc
return self.ret
If you\'re certain you\'ll only ever be running on one or the other version of Python, you could reduce the the run()
method down to just the mangled version (if you\'ll only be running on versions of Python before 3), or just the clean version (if you\'ll only be running on versions of Python starting with 3).
Example usage:
def f(*args, **kwargs)
print(args)
print(kwargs)
raise Exception(\'I suck\')
t = PropagatingThread(target=f, args=(5,), kwargs={\'hello\':\'world\'})
t.start()
t.join()
And you\'ll see the exception raised on the other thread when you join.
回答6:
This was a nasty little problem, and I\'d like to throw my solution in. Some other solutions I found (async.io for example) looked promising but also presented a bit of a black box. The queue / event loop approach sort of ties you to a certain implementation. The concurrent futures source code, however, is around only 1000 lines, and easy to comprehend. It allowed me to easily solve my problem: create ad-hoc worker threads without much setup, and to be able to catch exceptions in the main thread.
My solution uses the concurrent futures API and threading API. It allows you to create a worker which gives you both the thread and the future. That way, you can join the thread to wait for the result:
worker = Worker(test)
thread = worker.start()
thread.join()
print(worker.future.result())
...or you can let the worker just send a callback when done:
worker = Worker(test)
thread = worker.start(lambda x: print(\'callback\', x))
...or you can loop until the event completes:
worker = Worker(test)
thread = worker.start()
while True:
print(\"waiting\")
if worker.future.done():
exc = worker.future.exception()
print(\'exception?\', exc)
result = worker.future.result()
print(\'result\', result)
break
time.sleep(0.25)
Here\'s the code:
from concurrent.futures import Future
import threading
import time
class Worker(object):
def __init__(self, fn, args=()):
self.future = Future()
self._fn = fn
self._args = args
def start(self, cb=None):
self._cb = cb
self.future.set_running_or_notify_cancel()
thread = threading.Thread(target=self.run, args=())
thread.daemon = True #this will continue thread execution after the main thread runs out of code - you can still ctrl + c or kill the process
thread.start()
return thread
def run(self):
try:
self.future.set_result(self._fn(*self._args))
except BaseException as e:
self.future.set_exception(e)
if(self._cb):
self._cb(self.future.result())
...and the test function:
def test(*args):
print(\'args are\', args)
time.sleep(2)
raise Exception(\'foo\')
回答7:
As a noobie to Threading, it took me a long time to understand how to implement Mateusz Kobos\'s code (above). Here\'s a clarified version to help understand how to use it.
#!/usr/bin/env python
import sys
import threading
import Queue
class ExThread(threading.Thread):
def __init__(self):
threading.Thread.__init__(self)
self.__status_queue = Queue.Queue()
def run_with_exception(self):
\"\"\"This method should be overriden.\"\"\"
raise NotImplementedError
def run(self):
\"\"\"This method should NOT be overriden.\"\"\"
try:
self.run_with_exception()
except Exception:
self.__status_queue.put(sys.exc_info())
self.__status_queue.put(None)
def wait_for_exc_info(self):
return self.__status_queue.get()
def join_with_exception(self):
ex_info = self.wait_for_exc_info()
if ex_info is None:
return
else:
raise ex_info[1]
class MyException(Exception):
pass
class MyThread(ExThread):
def __init__(self):
ExThread.__init__(self)
# This overrides the \"run_with_exception\" from class \"ExThread\"
# Note, this is where the actual thread to be run lives. The thread
# to be run could also call a method or be passed in as an object
def run_with_exception(self):
# Code will function until the int
print \"sleeping 5 seconds\"
import time
for i in 1, 2, 3, 4, 5:
print i
time.sleep(1)
# Thread should break here
int(\"str\")
# I\'m honestly not sure why these appear here? So, I removed them.
# Perhaps Mateusz can clarify?
# thread_name = threading.current_thread().name
# raise MyException(\"An error in thread \'{}\'.\".format(thread_name))
if __name__ == \'__main__\':
# The code lives in MyThread in this example. So creating the MyThread
# object set the code to be run (but does not start it yet)
t = MyThread()
# This actually starts the thread
t.start()
print
print (\"Notice \'t.start()\' is considered to have completed, although\"
\" the countdown continues in its new thread. So you code \"
\"can tinue into new processing.\")
# Now that the thread is running, the join allows for monitoring of it
try:
t.join_with_exception()
# should be able to be replace \"Exception\" with specific error (untested)
except Exception, e:
print
print \"Exceptioon was caught and control passed back to the main thread\"
print \"Do some handling here...or raise a custom exception \"
thread_name = threading.current_thread().name
e = (\"Caught a MyException in thread: \'\" +
str(thread_name) +
\"\' [\" + str(e) + \"]\")
raise Exception(e) # Or custom class of exception, such as MyException
回答8:
Similar way like RickardSjogren\'s without Queue, sys etc. but also without some listeners to signals: execute directly an exception handler which corresponds to an except block.
#!/usr/bin/env python3
import threading
class ExceptionThread(threading.Thread):
def __init__(self, callback=None, *args, **kwargs):
\"\"\"
Redirect exceptions of thread to an exception handler.
:param callback: function to handle occured exception
:type callback: function(thread, exception)
:param args: arguments for threading.Thread()
:type args: tuple
:param kwargs: keyword arguments for threading.Thread()
:type kwargs: dict
\"\"\"
self._callback = callback
super().__init__(*args, **kwargs)
def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
except BaseException as e:
if self._callback is None:
raise e
else:
self._callback(self, e)
finally:
# Avoid a refcycle if the thread is running a function with
# an argument that has a member that points to the thread.
del self._target, self._args, self._kwargs, self._callback
Only self._callback and the except block in run() is additional to normal threading.Thread.
回答9:
concurrent.futures.as_completed
https://docs.python.org/3.7/library/concurrent.futures.html#concurrent.futures.as_completed
The following solution:
- returns to the main thread immediately when an exception is called
- requires no extra user defined classes because it does not need:
- an explicit
Queue
- to add an except else around your work thread
Source:
#!/usr/bin/env python3
import concurrent.futures
import time
def func_that_raises(do_raise):
for i in range(3):
print(i)
time.sleep(0.1)
if do_raise:
raise Exception()
for i in range(3):
print(i)
time.sleep(0.1)
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
futures = []
futures.append(executor.submit(func_that_raises, False))
futures.append(executor.submit(func_that_raises, True))
for future in concurrent.futures.as_completed(futures):
print(repr(future.exception()))
Possible output:
0
0
1
1
2
2
0
Exception()
1
2
None
It is unfortunately not possible to kill futures to cancel the others as one fails:
concurrent.features
; Python: concurrent.futures How to make it cancelable?
threading
: Is there any way to kill a Thread?
- C pthreads: Kill Thread in Pthread Library
If you do something like:
for future in concurrent.futures.as_completed(futures):
if future.exception() is not None:
raise future.exception()
then the with
catches it, and waits for the second thread to finish before continuing. The following behaves similarly:
for future in concurrent.futures.as_completed(futures):
future.result()
since future.result()
re-raises the exception if one occurred.
If you want to quit the entire Python process, you might get away with os._exit(0)
, but this likely means you need a refactor.
Tested on Python 3.6.7, Ubuntu 18.04.
回答10:
One method I am fond of is based on the observer pattern. I define a signal class which my thread uses to emit exceptions to listeners. It can also be used to return values from threads. Example:
import threading
class Signal:
def __init__(self):
self._subscribers = list()
def emit(self, *args, **kwargs):
for func in self._subscribers:
func(*args, **kwargs)
def connect(self, func):
self._subscribers.append(func)
def disconnect(self, func):
try:
self._subscribers.remove(func)
except ValueError:
raise ValueError(\'Function {0} not removed from {1}\'.format(func, self))
class WorkerThread(threading.Thread):
def __init__(self, *args, **kwargs):
super(WorkerThread, self).__init__(*args, **kwargs)
self.Exception = Signal()
self.Result = Signal()
def run(self):
if self._Thread__target is not None:
try:
self._return_value = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
except Exception as e:
self.Exception.emit(e)
else:
self.Result.emit(self._return_value)
if __name__ == \'__main__\':
import time
def handle_exception(exc):
print exc.message
def handle_result(res):
print res
def a():
time.sleep(1)
raise IOError(\'a failed\')
def b():
time.sleep(2)
return \'b returns\'
t = WorkerThread(target=a)
t2 = WorkerThread(target=b)
t.Exception.connect(handle_exception)
t2.Result.connect(handle_result)
t.start()
t2.start()
print \'Threads started\'
t.join()
t2.join()
print \'Done\'
I do not have enough experience of working with threads to claim that this is a completely safe method. But it has worked for me and I like the flexibility.
回答11:
Using naked excepts is not a good practice because you usually catch more than you bargain for.
I would suggest modifying the except
to catch ONLY the exception that you would like to handle. I don\'t think that raising it has the desired effect, because when you go to instantiate TheThread
in the outer try
, if it raises an exception, the assignment is never going to happen.
Instead you might want to just alert on it and move on, such as:
def run(self):
try:
shul.copytree(self.sourceFolder, self.destFolder)
except OSError, err:
print err
Then when that exception is caught, you can handle it there. Then when the outer try
catches an exception from TheThread
, you know it won\'t be the one you already handled, and will help you isolate your process flow.
回答12:
A simple way of catching thread\'s exception and communicating back to the caller method could be by passing dictionary or a list to worker
method.
Example (passing dictionary to worker method):
import threading
def my_method(throw_me):
raise Exception(throw_me)
def worker(shared_obj, *args, **kwargs):
try:
shared_obj[\'target\'](*args, **kwargs)
except Exception as err:
shared_obj[\'err\'] = err
shared_obj = {\'err\':\'\', \'target\': my_method}
throw_me = \"Test\"
th = threading.Thread(target=worker, args=(shared_obj, throw_me), kwargs={})
th.start()
th.join()
if shared_obj[\'err\']:
print(\">>%s\" % shared_obj[\'err\'])
回答13:
I know I\'m a bit late to the party here but I was having a very similar problem but it included using tkinter as a GUI, and the mainloop made it impossible to use any of the solutions that depend on .join(). Therefore I adapted the solution given in the EDIT of the original question, but made it more general to make it easier to understand for others.
Here is the new thread class in action:
import threading
import traceback
import logging
class ExceptionThread(threading.Thread):
def __init__(self, *args, **kwargs):
threading.Thread.__init__(self, *args, **kwargs)
def run(self):
try:
if self._target:
self._target(*self._args, **self._kwargs)
except Exception:
logging.error(traceback.format_exc())
def test_function_1(input):
raise IndexError(input)
if __name__ == \"__main__\":
input = \'useful\'
t1 = ExceptionThread(target=test_function_1, args=[input])
t1.start()
Of course you can always have it handle the exception some other way from logging, such as printing it out, or having it output to the console.
This allows you to use the ExceptionThread class exactly like you would the Thread class, without any special modifications.