Asynchronously receive output from long running sh

2019-05-06 18:55发布

问题:

I'm trying to figure out how to simply start a number of long running shell commands in a non-blocking way, and asynchronously handle their output when they finish, in the order they finish, even if that is another order than they started, using the asyncio python library available in Python 3.4 and forward.

I couldn't find a simple example of doing this, even in the asyncio documentation itself, which also seems to be quite low-level.

回答1:

Use get_lines() coroutines, to get shell commands output asynchronously and pass the coroutines to asyncio.as_completed(), to get the results in the order they finish:

#!/usr/bin/env python3.5
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT

async def get_lines(shell_command):
    p = await asyncio.create_subprocess_shell(shell_command,
            stdin=PIPE, stdout=PIPE, stderr=STDOUT)
    return (await p.communicate())[0].splitlines()

async def main():
    # get commands output concurrently
    coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
                       .format(i=i, e=sys.executable))
             for i in reversed(range(5))]
    for f in asyncio.as_completed(coros): # print in the order they finish
        print(await f)


if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()


回答2:

create_subprocess_shell is what you are looking for. It will return a Process instance, which you can wait() on, or communicate() with.



回答3:

I have exactly the same situation as yours. In my case, I am running multiple git fetch command in several repo directories.

In the first trial, the code looks like this (and cmds is ['git', 'fetch']):

async def run_async(path: str, cmds: List[str]):
    process = await asyncio.create_subprocess_exec(*cmds, cwd=path)
    await process.wait()

This function works on one repo, and the caller creates tasks for multiple repos and runs an event loop to complete them.

Although the program runs and the outcome on disk is correct, the fetch outputs from different repos are interleaved. The reason is that await process.wait() could give back control to the caller (the loop scheduler) any time IO blocks (file, network, etc).

A simple change fixes it:

async def run_async(path: str, cmds: List[str]):
    """
    Run `cmds` asynchronously in `path` directory
    """
    process = await asyncio.create_subprocess_exec(
        *cmds, stdout=asyncio.subprocess.PIPE, cwd=path)
    stdout, _ = await process.communicate()
    stdout and print(stdout.decode())

Here the rationale is to redirect the stdout so that it's in one place. In my case, I simply print it out. If you need the output, you can return it in the end.

Also, the printing order may not be the same as the start order, which is fine in my case.

The source code is here on github. To give some context, that project is a command line tool to manage multiple git repos, which delegates git command execution from any working directory. There are less than 200 lines of code and it should be an easy read.