Detect failed tasks in concurrent.futures

2019-06-22 13:31发布

问题:

I've been using concurrent.futures as it has a simple interface and let user easily control the max number of threads/processes. However, it seems like concurrent.futures hides failed tasks and continue the main thread after all tasks finished/failed.

import concurrent.futures

def f(i):
    return (i + 's')

with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    fs = [executor.submit(f, i ) for i in range(10)]
    concurrent.futures.wait(fs)

Calling f on any integer leads an TypeError. However, the whole script runs just fine and exits with code 0. Is there any way to make it throw an exception/error when any thread failed?

Or, is there a better way to limit number of threads/processes without using concurrent.futures?

回答1:

concurrent.futures.wait will ensure all the tasks completed, but it doesn't check success (something return-ed) vs. failure (exception raised and not caught in worker function). To do that, you need to call .result() on each Future (which will cause it to either re-raise the exception from the task, or produce the return-ed value). There are other methods to check without actually raising in the main thread (e.g. .exception()), but .result() is the most straightforward method.

If you want to make it re-raise, the simplest approach is just to replace the wait() call with:

for fut in concurrent.futures.as_completed(fs):
    fut.result()

which will process results as Futures complete, and promptly raise an Exception if one occurred. Alternatively, you continue to use wait so all tasks finish before you check for exceptions on any of them, then iterate over fs directly and call .result() on each.



回答2:

There is another way to do the same with multiprocessing.Pool (for processes) or multiprocessing.pool.ThreadPool (for threads). As far as I know it rethrows any caught exceptions.