How to spawn future only if free worker is availab

2019-02-15 23:44发布

问题:

I am trying to send information extracted from lines of a big file to a process running on some server.

To speed this up, I would like to do this with some threads in parallel.

Using the Python 2.7 backport of concurrent.futures I tried this:

f = open("big_file")
with ThreadPoolExecutor(max_workers=4) as e:
    for line in f:
        e.submit(send_line_function, line)
f.close()

However, this is problematic, because all futures get submitted instantly, so that my machine runs out of memory, because the complete file gets loaded into memory.

My question is, if there is an easy way to only submit a new future when a free worker is available.

回答1:

You could iterate over chunks of the file using

for chunk in zip(*[f]*chunksize):

(This is an application of the grouper recipe, which collects items from the iterator f into groups of size chunksize. Note: This does not consume the entire file at once since zip returns an iterator in Python3.)


import concurrent.futures as CF
import itertools as IT
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

def worker(line):
    line = line.strip()
    logger.info(line)

chunksize = 1024
with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f:
    for chunk in zip(*[f]*chunksize):
        futures = [executor.submit(worker, line) for line in chunk]
        # wait for these futures to complete before processing another chunk
        CF.wait(futures)

Now, in the comments you rightly point out that this is not optimal. There could be some worker which takes a long time, and holds up a whole chunk of jobs.

Usually, if each call to worker takes roughly the same amount of time then this is not a big deal. However, here is a way to advance the filehandle on-demand. It uses a threading.Condition to notify the sprinkler to advance the filehandle.

import logging
import threading
import Queue

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')
SENTINEL = object()

def worker(cond, queue):
    for line in iter(queue.get, SENTINEL):
        line = line.strip()
        logger.info(line)
        with cond:
            cond.notify()
            logger.info('notify')

def sprinkler(cond, queue, num_workers):
    with open("big_file") as f:
        for line in f:
            logger.info('advancing filehandle') 
            with cond:
                queue.put(line)
                logger.info('waiting')
                cond.wait()
        for _ in range(num_workers):
            queue.put(SENTINEL)

num_workers = 4
cond = threading.Condition()
queue = Queue.Queue()
t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers])
t.start()

threads = [threading.Thread(target=worker, args=[cond, queue])]
for t in threads:
    t.start()
for t in threads:
    t.join()