I have a small pool of workers (4) and a very large list of tasks (5000~). I'm using a pool and sending the tasks with map_async(). Because the task I'm running is fairly long, I'm forcing a chunksize of 1 so that one long process can't hold up some shorter ones.
What I'd like to do is periodically check how many tasks are left to be submitted. I know at most 4 will be active, I'm concerned with how many are left to process.
I've googled around and I can't find anybody doing this.
Some simple code to help:
import multiprocessing
import time
def mytask(num):
print('Started task, sleeping %s' % num)
time.sleep(num)
pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()
while True:
if not jobs.ready():
print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
jobs.wait(2)
else:
break
No airtight way that I know of, but if you use the
Pool.imap_unordered()
function instead of map_async, you can intercept the elements that are processed.I'm subtracting
process_count
, because you can pretty much assume that all processes will be processing with one of two exceptions: 1) if you use an iterator, there may not be further items left to consume and process, and 2) You may have fewer than 4 items left. I didn't code in for the first exception. But it should be pretty easy to do so if you need to. Anyway, your example uses a list so you shouldn't have that problem.Edit: I also realized you're using a While loop, which makes it look like you're trying to update something periodically, say, every half second or something. The code I gave as an example will not do it that way. I'm not sure if that's a problem.
I have similar requirements: track progress, perform interim work based on the results, stop all processing cleanly at any arbitrary time. How I've dealt with it is to send tasks one at a time with
apply_async
. A heavily simplified version of what I do:Note that I use a
Queue
instead ofreturn
ing the results.You can check the number of pending jobs by seeing
Pool._cache
attribute assuming that you are usingapply_async
. This is whereApplyResult
is stored until they are available and equals to the number ofApplyResult
s pending.Looks like jobs._number_left is what you want. _ indicates that it is an internal value which may change at the whim of the developers, but it seems to be the only way to get that info.