How to get the amount of “work” left to be done by

2019-03-10 07:54发布

So far whenever I needed to use multiprocessing I have done so by manually creating a "process pool" and sharing a working Queue with all subprocesses.

For example:

from multiprocessing import Process, Queue


class MyClass:

    def __init__(self, num_processes):
        self._log         = logging.getLogger()
        self.process_list = []
        self.work_queue   = Queue()
        for i in range(num_processes):
            p_name = 'CPU_%02d' % (i+1)
            self._log.info('Initializing process %s', p_name)
            p = Process(target = do_stuff,
                        args   = (self.work_queue, 'arg1'),
                        name   = p_name)

This way I could add stuff to the queue, which would be consumed by the subprocesses. I could then monitor how far the processing was by checking the Queue.qsize():

    while True:
        qsize = self.work_queue.qsize()
        if qsize == 0:
            self._log.info('Processing finished')
            break
        else:
            self._log.info('%d simulations still need to be calculated', qsize)

Now I figure that multiprocessing.Pool could simplify a lot this code.

What I couldn't find out is how can I monitor the amount of "work" still left to be done.

Take the following example:

from multiprocessing import Pool


class MyClass:

    def __init__(self, num_processes):
        self.process_pool = Pool(num_processes)
        # ...
        result_list = []
        for i in range(1000):            
            result = self.process_pool.apply_async(do_stuff, ('arg1',))
            result_list.append(result)
        # ---> here: how do I monitor the Pool's processing progress?
        # ...?

Any ideas?

4条回答
劫难
2楼-- · 2019-03-10 08:00

From the docs, it looks to me like what you want to do is collect your results in a list or other sequence, then iterate the result list checking for ready to build your output list. You could then calculate the processing status by comparing the number of remaining result objects not in ready state to the total number of jobs dispatched. See http://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.AsyncResult

查看更多
走好不送
3楼-- · 2019-03-10 08:01

I came up with the solution below for async_call.

Trivial toy script example but should apply broadly I think.

Basically in an infinite loop poll the ready value of your result objects in a list generator and sum to get a count of how many of your dispatched Pool tasks are remaining.

Once none are remaining break and join() & close().

Add sleep in loop as desired.

Same principle as solutions above but without a queue. If you also keep track of how many tasks you initially sent the Pool you can calculate percentage complete, etc...

import multiprocessing
import os
import time
from random import randrange


def worker():
    print os.getpid()

    #simulate work
    time.sleep(randrange(5))

if __name__ == '__main__':

    pool = multiprocessing.Pool(processes=8)
    result_objs = []

    print "Begin dispatching work"

    task_count = 10
    for x in range(task_count):
        result_objs.append(pool.apply_async(func=worker))

    print "Done dispatching work"

    while True:
        incomplete_count = sum(1 for x in result_objs if not x.ready())

        if incomplete_count == 0:
            print "All done"
            break

        print str(incomplete_count) + " Tasks Remaining"
        print str(float(task_count - incomplete_count) / task_count * 100) + "% Complete"
        time.sleep(.25)

    pool.close()
    pool.join()
查看更多
我命由我不由天
4楼-- · 2019-03-10 08:03

Use a Manager queue. This is a queue that is shared between worker processes. If you use a normal queue it will get pickled and unpickled by each worker and hence copied, so that the queue can't be updated by each worker.

You then have your workers add stuff to the queue and monitor the queue's state while the workers are working. You need to do this using map_async as this lets you see when the entire result is ready, allowing you to break the monitoring loop.

Example:

import time
from multiprocessing import Pool, Manager


def play_function(args):
    """Mock function, that takes a single argument consisting
    of (input, queue). Alternately, you could use another function
    as a wrapper.
    """
    i, q = args
    time.sleep(0.1)  # mock work
    q.put(i)
    return i

p = Pool()
m = Manager()
q = m.Queue()

inputs = range(20)
args = [(i, q) for i in inputs]
result = p.map_async(play_function, args)

# monitor loop
while True:
    if result.ready():
        break
    else:
        size = q.qsize()
        print(size)
        time.sleep(0.1)

outputs = result.get()
查看更多
不美不萌又怎样
5楼-- · 2019-03-10 08:03

I've had the same problem and came up with a somewhat simple solution for MapResult objects (albeit using internal MapResult data)

pool = Pool(POOL_SIZE)

result = pool.map_async(get_stuff, todo)
while not result.ready():
    remaining = result._number_left * result._chunksize
    sys.stderr.write('\r\033[2KRemaining: %d' % remaining)
    sys.stderr.flush()
    sleep(.1)

print >> sys.stderr, '\r\033[2KRemaining: 0'

Note that the remaining value is not always exact since the chunk size is often rounded up depending on the number of items to process.

You can circuvent this by using pool.map_async(get_stuff, todo, chunksize=1)

查看更多
登录 后发表回答