So far whenever I needed to use multiprocessing
I have done so by manually creating a "process pool" and sharing a working Queue with all subprocesses.
For example:
from multiprocessing import Process, Queue
class MyClass:
def __init__(self, num_processes):
self._log = logging.getLogger()
self.process_list = []
self.work_queue = Queue()
for i in range(num_processes):
p_name = 'CPU_%02d' % (i+1)
self._log.info('Initializing process %s', p_name)
p = Process(target = do_stuff,
args = (self.work_queue, 'arg1'),
name = p_name)
This way I could add stuff to the queue, which would be consumed by the subprocesses. I could then monitor how far the processing was by checking the Queue.qsize()
:
while True:
qsize = self.work_queue.qsize()
if qsize == 0:
self._log.info('Processing finished')
break
else:
self._log.info('%d simulations still need to be calculated', qsize)
Now I figure that multiprocessing.Pool
could simplify a lot this code.
What I couldn't find out is how can I monitor the amount of "work" still left to be done.
Take the following example:
from multiprocessing import Pool
class MyClass:
def __init__(self, num_processes):
self.process_pool = Pool(num_processes)
# ...
result_list = []
for i in range(1000):
result = self.process_pool.apply_async(do_stuff, ('arg1',))
result_list.append(result)
# ---> here: how do I monitor the Pool's processing progress?
# ...?
Any ideas?
From the docs, it looks to me like what you want to do is collect your
result
s in a list or other sequence, then iterate the result list checking forready
to build your output list. You could then calculate the processing status by comparing the number of remaining result objects not in ready state to the total number of jobs dispatched. See http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResultI came up with the solution below for async_call.
Trivial toy script example but should apply broadly I think.
Basically in an infinite loop poll the ready value of your result objects in a list generator and sum to get a count of how many of your dispatched Pool tasks are remaining.
Once none are remaining break and join() & close().
Add sleep in loop as desired.
Same principle as solutions above but without a queue. If you also keep track of how many tasks you initially sent the Pool you can calculate percentage complete, etc...
Use a
Manager
queue. This is a queue that is shared between worker processes. If you use a normal queue it will get pickled and unpickled by each worker and hence copied, so that the queue can't be updated by each worker.You then have your workers add stuff to the queue and monitor the queue's state while the workers are working. You need to do this using
map_async
as this lets you see when the entire result is ready, allowing you to break the monitoring loop.Example:
I've had the same problem and came up with a somewhat simple solution for MapResult objects (albeit using internal MapResult data)
Note that the remaining value is not always exact since the chunk size is often rounded up depending on the number of items to process.
You can circuvent this by using
pool.map_async(get_stuff, todo, chunksize=1)