I have a small pool of workers (4) and a very large list of tasks (5000~). I'm using a pool and sending the tasks with map_async(). Because the task I'm running is fairly long, I'm forcing a chunksize of 1 so that one long process can't hold up some shorter ones.
What I'd like to do is periodically check how many tasks are left to be submitted. I know at most 4 will be active, I'm concerned with how many are left to process.
I've googled around and I can't find anybody doing this.
Some simple code to help:
import multiprocessing
import time
def mytask(num):
print('Started task, sleeping %s' % num)
time.sleep(num)
pool = multiprocessing.Pool(4)
jobs = pool.map_async(mytask, [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4], chunksize=1)
pool.close()
while True:
if not jobs.ready():
print("We're not done yet, %s tasks to go!" % <somethingtogettasks>)
jobs.wait(2)
else:
break
Looks like jobs._number_left is what you want. _ indicates that it is an internal value which may change at the whim of the developers, but it seems to be the only way to get that info.
No airtight way that I know of, but if you use the Pool.imap_unordered()
function instead of map_async, you can intercept the elements that are processed.
import multiprocessing
import time
process_count = 4
def mytask(num):
print('Started task, sleeping %s' % num)
time.sleep(num)
# Actually, you should return the job you've created here.
return num
pool = multiprocess.Pool(process_count)
jobs = []
items = [1,2,3,4,5,3,2,3,4,5,2,3,2,3,4,5,6,4]
job_count = 0
for job in pool.imap_unordered(mytask, items):
jobs.append(job)
job_count += 1
incomplete = len(items) - job_count
unsubmitted = max(0, incomplete - process_count)
print "Jobs incomplete: %s. Unsubmitted: %s" % incomplete, unsubmitted
pool.close()
I'm subtracting process_count
, because you can pretty much assume that all processes will be processing with one of two exceptions: 1) if you use an iterator, there may not be further items left to consume and process, and 2) You may have fewer than 4 items left. I didn't code in for the first exception. But it should be pretty easy to do so if you need to. Anyway, your example uses a list so you shouldn't have that problem.
Edit: I also realized you're using a While loop, which makes it look like you're trying to update something periodically, say, every half second or something. The code I gave as an example will not do it that way. I'm not sure if that's a problem.
I have similar requirements: track progress, perform interim work based on the results, stop all processing cleanly at any arbitrary time. How I've dealt with it is to send tasks one at a time with apply_async
. A heavily simplified version of what I do:
maxProcesses = 4
q = multiprocessing.Queue()
pool = multiprocessing.Pool()
runlist = range(100000)
sendcounter = 0
donecounter = 0
while donecounter < len(runlist):
if stopNowBooleanFunc(): # if for whatever reason I want to stop processing early
if donecounter == sendcounter: # wait til already sent tasks finish running
break
else: # don't send new tasks if it's time to stop
while sendcounter < len(runlist) and sendcounter - donecounter < maxProcesses:
pool.apply_async(mytask, (runlist[sendcounter], q))
sendcounter += 1
while not q.empty(): # process completed results as they arrive
aresult = q.get()
processResults(aresult)
donecounter += 1
Note that I use a Queue
instead of return
ing the results.
You can check the number of pending jobs by seeing Pool._cache
attribute assuming that you are using apply_async
. This is where ApplyResult
is stored until they are available and equals to the number of ApplyResult
s pending.
import multiprocessing as mp
import random
import time
def job():
time.sleep(random.randint(1,10))
print("job finished")
if __name__ == '__main__':
pool = mp.Pool(5)
for _ in range(10):
pool.apply_async(job)
while pool._cache:
print("number of jobs pending: ", len(pool._cache))
time.sleep(2)
pool.close()
pool.join()