ThreadPoolExecutor: how to limit the queue maxsize

2019-06-21 04:20发布

问题:

I am using ThreadPoolExecutor class from the concurrent.futures package

def some_func(arg):
    # does some heavy lifting
    # outputs some results

from concurrent.futures import ThreadPoolExecutor

with ThreadPoolExecutor(max_workers=1) as executor:
    for arg in range(10000000):
        future = executor.submit(some_func, arg)

but I need to limit the queue size somehow, as I don't want millions of futures to be created at once, is there a simple way to do it or should I stick to queue.Queue and threading package to accomplish this?

回答1:

Python's ThreadPoolExecutor doesn't have the feature you're looking for, but the provided class can be easily sub-classed as follows to provide it:

class ThreadPoolExecutorWithQueueSizeLimit(futures.ThreadPoolExecutor):
def __init__(self, maxsize=50, *args, **kwargs):
    super(ThreadPoolExecutorWithQueueSizeLimit, self).__init__(*args, **kwargs)
    self._work_queue = Queue.Queue(maxsize=maxsize)


回答2:

I've been doing this by chunking the range. Here's a working example.

from time import time, strftime, sleep, gmtime
from random import randint
from itertools import islice
from concurrent.futures import ThreadPoolExecutor, as_completed

def nap(id, nap_length):
    sleep(nap_length)
    return nap_length


def chunked_iterable(iterable, chunk_size):
    it = iter(iterable)
    while True:
        chunk = tuple(islice(it, chunk_size))
        if not chunk:
            break
        yield chunk


if __name__ == '__main__':
    startTime = time()

    range_size = 10000000
    chunk_size = 10
    nap_time = 2

    # Iterate in chunks.
    # This consumes less memory and kicks back initial results sooner.
    for chunk in chunked_iterable(range(range_size), chunk_size):

        with ThreadPoolExecutor(max_workers=chunk_size) as pool_executor:
            pool = {}
            for i in chunk:
                function_call = pool_executor.submit(nap, i, nap_time)
                pool[function_call] = i

            for completed_function in as_completed(pool):
                result = completed_function.result()
                i = pool[completed_function]

                print('{} completed @ {} and slept for {}'.format(
                    str(i + 1).zfill(4),
                    strftime("%H:%M:%S", gmtime()),
                    result))

    print('==--- Script took {} seconds. ---=='.format(
        round(time() - startTime)))

The downside to this approach is the chunks are synchronous. All threads in a chunk must complete before the next chunk is added to the pool.



回答3:

you should use semaphore like this https://www.bettercodebytes.com/theadpoolexecutor-with-a-bounded-queue-in-python/

there is some thing wrong with andres.riancho's answer,that if we set max_size of queue,when we shutdown the pool, self._work_queue.put(None) can not put limited by max_size,so our poll will never exsit.

    def shutdown(self, wait=True):
        with self._shutdown_lock:
            self._shutdown = True
            self._work_queue.put(None)
        if wait:
            for t in self._threads:
                t.join(sys.maxint)