Python ThreadPoolExecutor - is the callback guaran

2020-02-28 03:41发布

问题:

In the ThreadPoolExecutor (TPE), is the callback always guaranteed to run in the same thread as the submitted function?

For example, I tested this with the following code. I ran it many times and it seemed like func and callback always ran in the same thread.

import concurrent.futures 
import random 
import threading 
import time 

executor = concurrent.futures.ThreadPoolExecutor(max_workers=3) 

def func(x): 
    time.sleep(random.random()) 
    return threading.current_thread().name 

def callback(future): 
    time.sleep(random.random()) 
    x = future.result() 
    cur_thread = threading.current_thread().name 
    if (cur_thread != x): 
        print(cur_thread, x) 

print('main thread: %s' % threading.current_thread()) 
for i in range(10000): 
    future = executor.submit(func, i) 
    future.add_done_callback(callback) 

However, it seemed to fail when I removed the time.sleep(random.random()) statements, i.e. at least a few func functions and callbacks did not run in the same thread.

For a project that I am working on, the callback must always run on the same thread as the submitted function, so I wanted to be sure that this is guaranteed by TPE. (And also the results of the test without the random sleep seemed puzzling).

I looked at the source code for executors and it does not seem like we switch the thread to the main thread before we run the callback. But just wanted to be sure.

回答1:

The documentation does not guarantee which thread callbacks run in. The only documented guarantee is that callbacks will be run in a thread belonging to the process that added the callback, but that could be any thread, since you're using a ThreadPoolExecutor instead of a ProcessPoolExecutor:

Added callables are called in the order that they were added and are always called in a thread belonging to the process that added them.


In the current ThreadPoolExecutor implementation, the thread a callback executes in depends on the state of the Future at the time the callback is added, and whether or not the Future is cancelled. These are implementation details; you should not rely on them, as they may be different in different Python implementations or different versions, and they are subject to change without notice.

If you add the callback after the Future completes, the callback will execute in whatever thread you called add_done_callback in. You can see this by looking at the add_done_callback source:

def add_done_callback(self, fn):
    """Attaches a callable that will be called when the future finishes.

    Args:
        fn: A callable that will be called with this future as its only
            argument when the future completes or is cancelled. The callable
            will always be called by a thread in the same process in which
            it was added. If the future has already completed or been
            cancelled then the callable will be called immediately. These
            callables are called in the order that they were added.
    """
    with self._condition:
        if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
            self._done_callbacks.append(fn)
            return
    fn(self)

If the state of the Future indicates it's cancelled or finished, fn is just immediately called in the current thread of execution. Otherwise, it's added to an internal list of callbacks to run when the Future is complete.

For example:

>>> def func(*args):
...  time.sleep(5)
...  print("func {}".format(threading.current_thread()))
>>> def cb(a): print("cb {}".format(threading.current_thread()))
... 
>>> fut = ex.submit(func)
>>> func <Thread(Thread-1, started daemon 140084551563008)>
>>> fut = e.add_done_callback(cb)
cb <_MainThread(MainThread, started 140084622018368)>

If a future is cancelled by a successful cancel call, then the thread performing the cancellation immediately invokes all callbacks:

def cancel(self):
    """Cancel the future if possible.
    Returns True if the future was cancelled, False otherwise. A future
    cannot be cancelled if it is running or has already completed.
    """
    with self._condition:
        if self._state in [RUNNING, FINISHED]:
            return False

        if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
            return True

        self._state = CANCELLED
        self._condition.notify_all()

    self._invoke_callbacks()
    return True

Otherwise, callbacks are invoked by the thread that executes the future's task.