Real-time output from engines in IPython parallel?

2019-02-20 00:32发布

问题:

I am running a bunch of long-running tasks with IPython's great parallelization functionality.

How can I get real-time output from the ipengines' stdout in my IPython client?

E.g., I'm running dview.map_async(fun, lots_of_args) and fun prints to stdout. I would like to see the outputs as they are happening.

I know about AsyncResult.display_output(), but it's only available after all tasks have finished.

回答1:

You can see stdout in the meantime by accessing AsyncResult.stdout, which will return a list of strings, which are the stdout from each engine.

The simplest case being:

print ar.stdout

You can wrap this in a simple function that prints stdout while you wait for the AsyncResult to complete:

import sys
import time
from IPython.display import clear_output

def wait_watching_stdout(ar, dt=1, truncate=1000):
    while not ar.ready():
        stdouts = ar.stdout
        if not any(stdouts):
            continue
        # clear_output doesn't do much in terminal environments
        clear_output()
        print '-' * 30
        print "%.3fs elapsed" % ar.elapsed
        print ""
        for eid, stdout in zip(ar._targets, ar.stdout):
            if stdout:
                print "[ stdout %2i ]\n%s" % (eid, stdout[-truncate:])
        sys.stdout.flush()
        time.sleep(dt)

An example notebook illustrating this function.

Now, if you are using older IPython, you may see an artificial restriction on access of the stdout attribute ('Result not ready' errors). The information is available in the metadata, so you can still get at it while the task is not done:

rc.spin()
stdout = [ rc.metadata[msg_id]['stdout'] for msg_id in ar.msg_ids ]

Which is essentially the same thing that the ar.stdout attribute access does.



回答2:

just in case somebody is still struggling with getting ordinary print-outputs of the individual kernels:

I adapted minrk's answer such that i get the output of each kernel as if it would have been a local one by constantly checking if the stdout of each kernel changes while the program is running.

asdf = dview.map_async(function, arguments)

# initialize a stdout0 array for comparison
stdout0 = asdf.stdout

while not asdf.ready():
    # check if stdout changed for any kernel
    if asdf.stdout != stdout0:
        for i in range(0,len(asdf.stdout)):
            if asdf.stdout[i] != stdout0[i]:
                # print only new stdout's without previous message and remove '\n' at the end
                print('kernel ' + str(i) + ': ' + asdf.stdout[i][len(stdout0[i]):-1])

                # set stdout0 to last output for new comparison
                stdout0 =  asdf.stdout
    else:
        continue


asdf.get()

outputs will then be something like:

kernel0: message 1 from kernel 0
kernel1: message 1 from kernel 1
kernel0: message 2 from kernel 0
kernel0: message 3 from kernel 0
kernel1: message 2 from kernel 0
...