I am trying to read and process 1000s of files, but unfortunately it takes about 3x as long to process the file as it does to read it in from disk, so I would like to process these files as they are read in (and while I am continuing to read in additional files).
In a perfect world, I have a generator which reads one file at a time, and I would like to pass this generator to a pool of workers which process items from the generator as they are (slowly) generated.
Here's an example:
def process_file(file_string):
...
return processed_file
pool = Pool(processes=4)
path = 'some/path/'
results = pool.map(process_file, (open(path+part,'rb').read() for part in os.listdir(path)))
the only issue with the code above is that all the files are read into memory before the pool begins, which means that I need to wait for the disk to read everything in, and I also consume a large amount of memory.
Pool.map
andPool.map_async
list
ify theiterable
passed to them, so your generator will always be realized fully before processing even begins.The various
Pool.imap*
functions appear to process inputs as generators, so you might be able to change:to:
and get the same results without slurping before processing, but AFAICT, they'll still try to fully populate the queues as fast as they can, which could lead to a lot of data outstanding and excessive memory usage; beyond that, you'll be reading all the data in one process, then sending all of it over IPC, which means you're still mostly bottlenecked on I/O.
In your position, I'd move the read into the task itself (and if you can, avoid reading in the whole file, processing it by line or by block instead of reading the whole thing at once). You'd get parallel reads, less IPC, and you won't risk slurping all the files before the first few are even processed; you'll never have more files open than you have workers. So the end result would look like:
You are reading the files into the parent's memory and then transferring the payload into the children. That's rather inefficient. Send just the filename and let the children do the I/O. If the result is a bunch of text that you plan to write to a file, do that in the child also.
map
will normally issue large blocks of work in one shot to reduce communication overhead with its pool workers. That's probably why you get the big memory spike. Passing just the filename solves that problem but setting a small chunksize is still beneficial when you have uneven processing time among the workers.