Python - Join Multiple Threads With Timeout

2020-03-25 01:04发布

问题:

I have multiple Process threads running and I'd like to join all of them together with a timeout parameter. I understand that if no timeout were necessary, I'd be able to write:

for thread in threads:
    thread.join()

One solution I thought of was to use a master thread that joined all the threads together and attempt to join that thread. However, I received the following error in Python:

AssertionError: can only join a child process

The code I have is below.

def join_all(threads):
    for thread in threads:
        thread.join()

if __name__ == '__main__':
    for thread in threads:
        thread.start()

    master = multiprocessing.Process(target=join_all, args=(threads,))
    master.start()
    master.join(timeout=60)

回答1:

You could loop over each thread repeatedly, doing non-blocking checks to see if the thread is done:

import time

def timed_join_all(threads, timeout):
    start = cur_time = time.time()
    while cur_time <= (start + timeout):
        for thread in threads:
            if not thread.is_alive():
                thread.join()
        time.sleep(1)
        cur_time = time.time()

if __name__ == '__main__':
    for thread in threads:
        thread.start()

    timed_join_all(threads, 60)


回答2:

This answer is initially based on that by dano but has a number of changes.

join_all takes a list of threads and a timeout (in seconds) and attempts to join all of the threads. It does this by making a non-blocking call to Thread.join (by setting the timeout to 0, as join with no arguments will never timeout).

Once all the threads have finished (by checking is_alive() on each of them) the loop will exit prematurely.

If some threads are still running by the time the timeout occurs, the function raises a RuntimeError with information about the remaining threads.

import time

def join_all(threads, timeout):
    """
    Args:
        threads: a list of thread objects to join
        timeout: the maximum time to wait for the threads to finish
    Raises:
        RuntimeError: is not all the threads have finished by the timeout
    """
    start = cur_time = time.time()
    while cur_time <= (start + timeout):
        for thread in threads:
            if thread.is_alive():
                thread.join(timeout=0)
        if all(not t.is_alive() for t in threads):
            break
        time.sleep(0.1)
        cur_time = time.time()
    else:
        still_running = [t for t in threads if t.is_alive()]
        num = len(still_running)
        names = [t.name for t in still_running]
        raise RuntimeError('Timeout on {0} threads: {1}'.format(num, names))

if __name__ == '__main__':
    for thread in threads:
        thread.start()

    join_all(threads, 60)

In my usage of this, it was inside a test suite where the threads were dæmonised versions of ExcThread so that if the threads never finished running, it wouldn't matter.



回答3:

The following code joins each process, waiting a certain amount of time. If the proc returns fast enough, the timeout is reduced, then the next process is joined. If a timeout occurs, an error message is shown and the entire system exits to the caller.

source

import multiprocessing, sys, time

# start three procs that run for differing lengths of time
procs = [
    multiprocessing.Process(
        target=time.sleep, args=[num], name='%d sec'%num,
        )
    for num in [1,2,5]
]
for p in procs:
    p.start()
    print p

timeleft = 3.0
print 'Join, timeout after {} seconds'.format(timeleft)
for p in procs:
    orig = time.time()
    print '{}: join, {:.3f} sec left...'.format(p, timeleft)
    p.join(timeleft)
    timeleft -= time.time() - orig
    if timeleft <= 0.:
        sys.exit('timed out!')

example with timeout

We start three procs: one waits for 1 sec, another for 3 sec, the last for 5 seconds. Then we `join` them, timing out after 3 seconds -- the last proc will be *interrupted*.

<Process(1 sec, started)>
<Process(2 sec, started)>
<Process(5 sec, started)>
Join, timeout after 3.0 seconds
<Process(1 sec, started)>: join, 3.000 sec left...
<Process(2 sec, started)>: join, 1.982 sec left...
<Process(5 sec, started)>: join, 0.965 sec left...
timed out!


回答4:

I'm writing this here, just to make sure that I don't forget it. The principle of the answer is the same as the one of dano. Also the code snippet is a bit more pythonic:

threads = []
timeout = ...

# create and start the threads
for work in ...:
    thread = threading.Thread(target=worker)
    thread.daemon = True # without this the thread might outlive its parent
    thread.start()
    threads.append(thread)

# Wait for workers to finish or for timeout
stop_time = time.time() + timeout
while any(t.isAlive for t in threads) and (time.time() < stop_time):
    time.sleep(0.1)