I'm trying to figure out how to simply start a number of long running shell commands in a non-blocking way, and asynchronously handle their output when they finish, in the order they finish, even if that is another order than they started, using the asyncio python library available in Python 3.4 and forward.
I couldn't find a simple example of doing this, even in the asyncio documentation itself, which also seems to be quite low-level.
Use get_lines()
coroutines, to get shell commands output asynchronously and pass the coroutines to asyncio.as_completed()
, to get the results in the order they finish:
#!/usr/bin/env python3.5
import asyncio
import sys
from asyncio.subprocess import PIPE, STDOUT
async def get_lines(shell_command):
p = await asyncio.create_subprocess_shell(shell_command,
stdin=PIPE, stdout=PIPE, stderr=STDOUT)
return (await p.communicate())[0].splitlines()
async def main():
# get commands output concurrently
coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"'
.format(i=i, e=sys.executable))
for i in reversed(range(5))]
for f in asyncio.as_completed(coros): # print in the order they finish
print(await f)
if sys.platform.startswith('win'):
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
create_subprocess_shell
is what you are looking for. It will return a Process
instance, which you can wait()
on, or communicate()
with.
I have exactly the same situation as yours. In my case, I am running multiple git fetch
command in several repo directories.
In the first trial, the code looks like this (and cmds
is ['git', 'fetch']
):
async def run_async(path: str, cmds: List[str]):
process = await asyncio.create_subprocess_exec(*cmds, cwd=path)
await process.wait()
This function works on one repo, and the caller creates tasks for multiple repos and runs an event loop
to complete them.
Although the program runs and the outcome on disk is correct, the fetch
outputs from different repos are interleaved. The reason is that await process.wait()
could give back control to the caller (the loop scheduler) any time IO blocks (file, network, etc).
A simple change fixes it:
async def run_async(path: str, cmds: List[str]):
"""
Run `cmds` asynchronously in `path` directory
"""
process = await asyncio.create_subprocess_exec(
*cmds, stdout=asyncio.subprocess.PIPE, cwd=path)
stdout, _ = await process.communicate()
stdout and print(stdout.decode())
Here the rationale is to redirect the stdout
so that it's in one place. In my case, I simply print it out. If you need the output, you can return it in the end.
Also, the printing order may not be the same as the start order, which is fine in my case.
The source code is here on github. To give some context, that project is a command line tool to manage multiple git repos, which delegates git command execution from any working directory. There are less than 200 lines of code and it should be an easy read.