I am using multiprocessing.Pool()
to parallelize some heavy computations.
The target function returns a lot of data (a huge list). I'm running out of RAM.
Without multiprocessing
, I'd just change the target function into a generator, by yield
ing the resulting elements one after another, as they are computed.
I understand multiprocessing does not support generators -- it waits for the entire output and returns it at once, right? No yielding. Is there a way to make the Pool
workers yield data as soon as they become available, without constructing the entire result array in RAM?
Simple example:
def target_fnc(arg):
result = []
for i in xrange(1000000):
result.append('dvsdbdfbngd') # <== would like to just use yield!
return result
def process_args(some_args):
pool = Pool(16)
for result in pool.imap_unordered(target_fnc, some_args):
for element in result:
yield element
This is Python 2.7.
This sounds like an ideal use case for a Queue: http://docs.python.org/2/library/multiprocessing.html#exchanging-objects-between-processes
Simply feed your results into the queue from the pooled workers and ingest them in the master.
Note that you still may run into memory pressure issues unless you drain the queue nearly as fast as the workers are populating it. You could limit the queue size (the maximum number of objects that will fit in the queue) in which case the pooled workers would block on the queue.put
statements until space is available in the queue. This would put a ceiling on memory usage. But if you're doing this, it may be time to reconsider whether you require pooling at all and/or if it might make sense to use fewer workers.
If your tasks can return data in chunks… can they be broken up into smaller tasks, each of which returns a single chunk? Obviously, this isn't always possible. When it isn't, you have to use some other mechanism (like a Queue
, as Loren Abrams suggests). But when it is, it's probably a better solution for other reasons, as well as solving this problem.
With your example, this is certainly doable. For example:
def target_fnc(arg, low, high):
result = []
for i in xrange(low, high):
result.append('dvsdbdfbngd') # <== would like to just use yield!
return result
def process_args(some_args):
pool = Pool(16)
pool_args = []
for low in in range(0, 1000000, 10000):
pool_args.extend(args + [low, low+10000] for args in some_args)
for result in pool.imap_unordered(target_fnc, pool_args):
for element in result:
yield element
(You could of course replace the loop with a nested comprehension, or a zip
and flatten
, if you prefer.)
So, if some_args
is [1, 2, 3]
, you'll get 300 tasks—[[1, 0, 10000], [2, 0, 10000], [3, 0, 10000], [1, 10000, 20000], …]
, each of which only returns 10000 elements instead of 1000000.
From your description, it sounds like you're not so much interested in processing the data as they come in, as in avoiding passing a million-element list
back.
There's a simpler way of doing that: Just put the data into a file. For example:
def target_fnc(arg):
fd, path = tempfile.mkstemp(text=True)
with os.fdopen(fd) as f:
for i in xrange(1000000):
f.write('dvsdbdfbngd\n')
return path
def process_args(some_args):
pool = Pool(16)
for result in pool.imap_unordered(target_fnc, some_args):
with open(result) as f:
for element in f:
yield element
Obviously if your results can contain newlines, or aren't strings, etc., you'll want to use a csv
file, a numpy
, etc. instead of a simple text file, but the idea is the same.
That being said, even if this is simpler, there are usually benefits to processing the data a chunk at a time, so breaking up your tasks or using a Queue
(as the other two answers suggest) may be better, if the downsides (respectively, needing a way to break the tasks up, or having to be able to consume the data as fast as they're produced) are not deal-breakers.