So far whenever I needed to use multiprocessing
I have done so by manually creating a "process pool" and sharing a working Queue with all subprocesses.
For example:
from multiprocessing import Process, Queue
class MyClass:
def __init__(self, num_processes):
self._log = logging.getLogger()
self.process_list = []
self.work_queue = Queue()
for i in range(num_processes):
p_name = 'CPU_%02d' % (i+1)
self._log.info('Initializing process %s', p_name)
p = Process(target = do_stuff,
args = (self.work_queue, 'arg1'),
name = p_name)
This way I could add stuff to the queue, which would be consumed by the subprocesses. I could then monitor how far the processing was by checking the Queue.qsize()
:
while True:
qsize = self.work_queue.qsize()
if qsize == 0:
self._log.info('Processing finished')
break
else:
self._log.info('%d simulations still need to be calculated', qsize)
Now I figure that multiprocessing.Pool
could simplify a lot this code.
What I couldn't find out is how can I monitor the amount of "work" still left to be done.
Take the following example:
from multiprocessing import Pool
class MyClass:
def __init__(self, num_processes):
self.process_pool = Pool(num_processes)
# ...
result_list = []
for i in range(1000):
result = self.process_pool.apply_async(do_stuff, ('arg1',))
result_list.append(result)
# ---> here: how do I monitor the Pool's processing progress?
# ...?
Any ideas?
Use a Manager
queue. This is a queue that is shared between worker processes. If you use a normal queue it will get pickled and unpickled by each worker and hence copied, so that the queue can't be updated by each worker.
You then have your workers add stuff to the queue and monitor the queue's state while the workers are working. You need to do this using map_async
as this lets you see when the entire result is ready, allowing you to break the monitoring loop.
Example:
import time
from multiprocessing import Pool, Manager
def play_function(args):
"""Mock function, that takes a single argument consisting
of (input, queue). Alternately, you could use another function
as a wrapper.
"""
i, q = args
time.sleep(0.1) # mock work
q.put(i)
return i
p = Pool()
m = Manager()
q = m.Queue()
inputs = range(20)
args = [(i, q) for i in inputs]
result = p.map_async(play_function, args)
# monitor loop
while True:
if result.ready():
break
else:
size = q.qsize()
print(size)
time.sleep(0.1)
outputs = result.get()
I've had the same problem and came up with a somewhat simple solution for MapResult objects (albeit using internal MapResult data)
pool = Pool(POOL_SIZE)
result = pool.map_async(get_stuff, todo)
while not result.ready():
remaining = result._number_left * result._chunksize
sys.stderr.write('\r\033[2KRemaining: %d' % remaining)
sys.stderr.flush()
sleep(.1)
print >> sys.stderr, '\r\033[2KRemaining: 0'
Note that the remaining value is not always exact since the chunk size is often rounded up depending on the number of items to process.
You can circuvent this by using pool.map_async(get_stuff, todo, chunksize=1)
I came up with the solution below for async_call.
Trivial toy script example but should apply broadly I think.
Basically in an infinite loop poll the ready value of your result objects in a list generator and sum to get a count of how many of your dispatched Pool tasks are remaining.
Once none are remaining break and join() & close().
Add sleep in loop as desired.
Same principle as solutions above but without a queue. If you also keep track of how many tasks you initially sent the Pool you can calculate percentage complete, etc...
import multiprocessing
import os
import time
from random import randrange
def worker():
print os.getpid()
#simulate work
time.sleep(randrange(5))
if __name__ == '__main__':
pool = multiprocessing.Pool(processes=8)
result_objs = []
print "Begin dispatching work"
task_count = 10
for x in range(task_count):
result_objs.append(pool.apply_async(func=worker))
print "Done dispatching work"
while True:
incomplete_count = sum(1 for x in result_objs if not x.ready())
if incomplete_count == 0:
print "All done"
break
print str(incomplete_count) + " Tasks Remaining"
print str(float(task_count - incomplete_count) / task_count * 100) + "% Complete"
time.sleep(.25)
pool.close()
pool.join()
From the docs, it looks to me like what you want to do is collect your result
s in a list or other sequence, then iterate the result list checking for ready
to build your output list. You could then calculate the processing status by comparing the number of remaining result objects not in ready state to the total number of jobs dispatched. See http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult