Non-blocking read on a subprocess.PIPE in python

2018-12-31 00:15发布

I'm using the subprocess module to start a subprocess and connect to it's output stream (stdout). I want to be able to execute non-blocking reads on its stdout. Is there a way to make .readline non-blocking or to check if there is data on the stream before I invoke .readline? I'd like this to be portable or at least work under Windows and Linux.

here is how I do it for now (It's blocking on the .readline if no data is avaible):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

26条回答
裙下三千臣
2楼-- · 2018-12-31 00:34

Python 3.4 introduces new provisional API for asynchronous IO -- asyncio module.

The approach is similar to twisted-based answer by @Bryan Ward -- define a protocol and its methods are called as soon as data is ready:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

See "Subprocess" in the docs.

There is a high-level interface asyncio.create_subprocess_exec() that returns Process objects that allows to read a line asynchroniosly using StreamReader.readline() coroutine (with async/await Python 3.5+ syntax):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() performs the following tasks:

  • start subprocess, redirect its stdout to a pipe
  • read a line from subprocess' stdout asynchronously
  • kill subprocess
  • wait for it to exit

Each step could be limited by timeout seconds if necessary.

查看更多
回忆,回不去的记忆
3楼-- · 2018-12-31 00:35

Try the asyncproc module. For example:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

The module takes care of all the threading as suggested by S.Lott.

查看更多
浪荡孟婆
4楼-- · 2018-12-31 00:35

This solution uses the select module to "read any available data" from an IO stream. This function blocks initially until data is available, but then reads only the data that is available and doesn't block further.

Given the fact that it uses the select module, this only works on Unix.

The code is fully PEP8-compliant.

import select


def read_available(input_stream, max_bytes=None):
    """
    Blocks until any data is available, then all available data is then read and returned.
    This function returns an empty string when end of stream is reached.

    Args:
        input_stream: The stream to read from.
        max_bytes (int|None): The maximum number of bytes to read. This function may return fewer bytes than this.

    Returns:
        str
    """
    # Prepare local variables
    input_streams = [input_stream]
    empty_list = []
    read_buffer = ""

    # Initially block for input using 'select'
    if len(select.select(input_streams, empty_list, empty_list)[0]) > 0:

        # Poll read-readiness using 'select'
        def select_func():
            return len(select.select(input_streams, empty_list, empty_list, 0)[0]) > 0

        # Create while function based on parameters
        if max_bytes is not None:
            def while_func():
                return (len(read_buffer) < max_bytes) and select_func()
        else:
            while_func = select_func

        while True:
            # Read single byte at a time
            read_data = input_stream.read(1)
            if len(read_data) == 0:
                # End of stream
                break
            # Append byte to string buffer
            read_buffer += read_data
            # Check if more data is available
            if not while_func():
                break

    # Return read buffer
    return read_buffer
查看更多
倾城一夜雪
5楼-- · 2018-12-31 00:36

I have often had a similar problem; Python programs I write frequently need to have the ability to execute some primary functionality while simultaneously accepting user input from the command line (stdin). Simply putting the user input handling functionality in another thread doesn't solve the problem because readline() blocks and has no timeout. If the primary functionality is complete and there is no longer any need to wait for further user input I typically want my program to exit, but it can't because readline() is still blocking in the other thread waiting for a line. A solution I have found to this problem is to make stdin a non-blocking file using the fcntl module:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

In my opinion this is a bit cleaner than using the select or signal modules to solve this problem but then again it only works on UNIX...

查看更多
美炸的是我
6楼-- · 2018-12-31 00:36

I have the original questioner's problem, but did not wish to invoke threads. I mixed Jesse's solution with a direct read() from the pipe, and my own buffer-handler for line reads (however, my sub-process - ping - always wrote full lines < a system page size). I avoid busy-waiting by only reading in a gobject-registered io watch. These days I usually run code within a gobject MainLoop to avoid threads.

def set_up_ping(ip, w):
# run the sub-process
# watch the resultant pipe
p = subprocess.Popen(['/bin/ping', ip], stdout=subprocess.PIPE)
# make stdout a non-blocking file
fl = fcntl.fcntl(p.stdout, fcntl.F_GETFL)
fcntl.fcntl(p.stdout, fcntl.F_SETFL, fl | os.O_NONBLOCK)
stdout_gid = gobject.io_add_watch(p.stdout, gobject.IO_IN, w)
return stdout_gid # for shutting down

The watcher is

def watch(f, *other):
print 'reading',f.read()
return True

And the main program sets up a ping and then calls gobject mail loop.

def main():
set_up_ping('192.168.1.8', watch)
# discard gid as unused here
gobject.MainLoop().run()

Any other work is attached to callbacks in gobject.

查看更多
有味是清欢
7楼-- · 2018-12-31 00:38

fcntl, select, asyncproc won't help in this case.

A reliable way to read a stream without blocking regardless of operating system is to use Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from queue import Queue, Empty
except ImportError:
    from Queue import Queue, Empty  # python 2.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line
查看更多
登录 后发表回答