从子命令输出直播从子命令输出直播(live output from subprocess comma

2019-05-08 19:10发布

我使用python脚本作为一个流体动力学代码驱动程序。 当谈到时间运行模拟,我使用subprocess.Popen来运行代码,收集输出和错误输出到subprocess.PIPE ---那么我可以打印(并保存到一个日志文件)的输出信息,并检查是否有错误。 问题是,我不知道该代码是如何进展。 如果我直接在命令行中运行它,它给了我关于迭代它在什么时间,接下来的时间,步骤是什么,等等。什么输出

有没有办法既存储输出(用于记录和错误检查),同时,产生的实时流媒体输出?

我的代码的相关部分:

ret_val = subprocess.Popen( run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True )
output, errors = ret_val.communicate()
log_file.write(output)
print output
if( ret_val.returncode ):
    print "RUN failed\n\n%s\n\n" % (errors)
    success = False

if( errors ): log_file.write("\n\n%s\n\n" % errors)

本来我是通过管道将run_command通过tee ,这样的副本就直接到日志文件,流仍直接输出到终端-但这种方式我不能存储任何错误(我knowlege)。


编辑:

临时解决方法:

ret_val = subprocess.Popen( run_command, stdout=log_file, stderr=subprocess.PIPE, shell=True )
while not ret_val.poll():
    log_file.flush()

然后,在另一端,运行tail -f log.txt (ST log_file = 'log.txt' )。

Answer 1:

你有这样的两种方式,无论是从创建一个迭代器readreadline功能,并做到:

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for c in iter(lambda: process.stdout.read(1), ''):  # replace '' with b'' for Python 3
        sys.stdout.write(c)
        f.write(c)

要么

import subprocess
import sys
with open('test.log', 'w') as f:  # replace 'w' with 'wb' for Python 3
    process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
    for line in iter(process.stdout.readline, ''):  # replace '' with b'' for Python 3
        sys.stdout.write(line)
        f.write(line)

或者你可以创建一个readerwriter的文件。 传writerPopen并从所读reader

import io
import time
import subprocess
import sys

filename = 'test.log'
with io.open(filename, 'wb') as writer, io.open(filename, 'rb', 1) as reader:
    process = subprocess.Popen(command, stdout=writer)
    while process.poll() is None:
        sys.stdout.write(reader.read())
        time.sleep(0.5)
    # Read the remaining
    sys.stdout.write(reader.read())

这样,您将有写在数据test.log ,以及在标准输出。

该文件的方式唯一的好处是,你的代码就不会阻止。 所以,你可以做任何你在此期间,要和阅读每当你从希望reader在非阻塞的方式。 当使用PIPEreadreadline功能将被阻塞,直到一个字符被写入到管或线被分别写入到管道。



Answer 2:

Executive Summary (or "tl;dr" version): it's easy when there's at most one subprocess.PIPE, otherwise it's hard.

It may be time to explain a bit about how subprocess.Popen does its thing.

(Caveat: this is for Python 2.x, although 3.x is similar; and I'm quite fuzzy on the Windows variant. I understand the POSIX stuff much better.)

The Popen function needs to deal with zero-to-three I/O streams, somewhat simultaneously. These are denoted stdin, stdout, and stderr as usual.

You can provide:

  • None, indicating that you don't want to redirect the stream. It will inherit these as usual instead. Note that on POSIX systems, at least, this does not mean it will use Python's sys.stdout, just Python's actual stdout; see demo at end.
  • An int value. This is a "raw" file descriptor (in POSIX at least). (Side note: PIPE and STDOUT are actually ints internally, but are "impossible" descriptors, -1 and -2.)
  • A stream—really, any object with a fileno method. Popen will find the descriptor for that stream, using stream.fileno(), and then proceed as for an int value.
  • subprocess.PIPE, indicating that Python should create a pipe.
  • subprocess.STDOUT (for stderr only): tell Python to use the same descriptor as for stdout. This only makes sense if you provided a (non-None) value for stdout, and even then, it is only needed if you set stdout=subprocess.PIPE. (Otherwise you can just provide the same argument you provided for stdout, e.g., Popen(..., stdout=stream, stderr=stream).)

The easiest cases (no pipes)

If you redirect nothing (leave all three as the default None value or supply explicit None), Pipe has it quite easy. It just needs to spin off the subprocess and let it run. Or, if you redirect to a non-PIPE—an int or a stream's fileno()—it's still easy, as the OS does all the work. Python just needs to spin off the subprocess, connecting its stdin, stdout, and/or stderr to the provided file descriptors.

The still-easy case: one pipe

If you redirect only one stream, Pipe still has things pretty easy. Let's pick one stream at a time and watch.

Suppose you want to supply some stdin, but let stdout and stderr go un-redirected, or go to a file descriptor. As the parent process, your Python program simply needs to use write() to send data down the pipe. You can do this yourself, e.g.:

proc = subprocess.Popen(cmd, stdin=subprocess.PIPE)
proc.stdin.write('here, have some data\n') # etc

or you can pass the stdin data to proc.communicate(), which then does the stdin.write shown above. There is no output coming back so communicate() has only one other real job: it also closes the pipe for you. (If you don't call proc.communicate() you must call proc.stdin.close() to close the pipe, so that the subprocess knows there is no more data coming through.)

Suppose you want to capture stdout but leave stdin and stderr alone. Again, it's easy: just call proc.stdout.read() (or equivalent) until there is no more output. Since proc.stdout() is a normal Python I/O stream you can use all the normal constructs on it, like:

for line in proc.stdout:

or, again, you can use proc.communicate(), which simply does the read() for you.

If you want to capture only stderr, it works the same as with stdout.

There's one more trick before things get hard. Suppose you want to capture stdout, and also capture stderr but on the same pipe as stdout:

proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

In this case, subprocess "cheats"! Well, it has to do this, so it's not really cheating: it starts the subprocess with both its stdout and its stderr directed into the (single) pipe-descriptor that feeds back to its parent (Python) process. On the parent side, there's again only a single pipe-descriptor for reading the output. All the "stderr" output shows up in proc.stdout, and if you call proc.communicate(), the stderr result (second value in the tuple) will be None, not a string.

The hard cases: two or more pipes

The problems all come about when you want to use at least two pipes. In fact, the subprocess code itself has this bit:

def communicate(self, input=None):
    ...
    # Optimization: If we are only using one pipe, or no pipe at
    # all, using select() or threads is unnecessary.
    if [self.stdin, self.stdout, self.stderr].count(None) >= 2:

But, alas, here we've made at least two, and maybe three, different pipes, so the count(None) returns either 1 or 0. We must do things the hard way.

在Windows中,使用threading.Thread积累的结果self.stdoutself.stderr ,并有父线程提供self.stdin输入数据(然后关闭管道)。

在POSIX,这里采用poll ,如有,否则select ,积累输出,并提供标准输入的输入。 这一切都运行在(单)父进程/线程。

线程或轮询选择在这里需要/避免死锁。 假设,例如,我们已经重定向所有三个流三个独立的管道。 进一步假设有多少数据可以塞进一个管写作过程中被挂起之前,等待读取过程中从另一端“清理”管小的限制。 让我们小限制设置为一个字节,只是为了说明。 (这实际上是如何工作的,除了限制比一个字节大得多。)

如果父(Python)的进程尝试写几个字节,说, 'go\n'proc.stdin ,第一个字节进去,然后第二个使Python进程暂停,等待子进程读取第一个字节,排空管道。

同时,假设子决定打印一个友好的“你好!不要恐慌!” 问候。 该H进入其标准输出管道,但e导致其暂停,等待其父母读到H ,排空stdout管道。

现在,我们坚持:Python的过程中睡着了,等着完成说“去”,和子也睡着,等待完成说“你好不要恐慌!”。

subprocess.Popen代码避免了线程,或选/调查这个问题。 当字节可以走在管道,他们去。 当他们不能,只有一个线程(不是整个过程)有睡眠,或在选择/投票表决的情况下,Python进程同时等待“可写”或“可用的数据”,写入进程的标准输入只有当有空间,并读取其stdout和/或标准错误,只有当数据准备好了。 该proc.communicate()代码(实际上_communicate其中毛茸茸的案件处理)(如果有的话)已经被发送,所有stdout和/或标准错误数据已累计返回一旦所有的标准输入数据。

如果你想读这两个stdoutstderr在两个不同的管道(无论任何的stdin重定向),则需要避免死锁了。 这里的死锁情况是不同的,它在子写东西长,出现stderr ,当你从提取数据stdout ,反之亦然,但它仍然存在。


演示

我答应证明,未重定向,Python的subprocess ES写入基础标准输出,而不是sys.stdout 。 所以,这里是一些代码:

from cStringIO import StringIO
import os
import subprocess
import sys

def show1():
    print 'start show1'
    save = sys.stdout
    sys.stdout = StringIO()
    print 'sys.stdout being buffered'
    proc = subprocess.Popen(['echo', 'hello'])
    proc.wait()
    in_stdout = sys.stdout.getvalue()
    sys.stdout = save
    print 'in buffer:', in_stdout

def show2():
    print 'start show2'
    save = sys.stdout
    sys.stdout = open(os.devnull, 'w')
    print 'after redirect sys.stdout'
    proc = subprocess.Popen(['echo', 'hello'])
    proc.wait()
    sys.stdout = save

show1()
show2()

当运行:

$ python out.py
start show1
hello
in buffer: sys.stdout being buffered

start show2
hello

请注意,如果您添加的第一个程序将失败stdout=sys.stdout ,作为一个StringIO对象没有fileno 。 第二个将省略hello ,如果你添加stdout=sys.stdoutsys.stdout已经被重定向到os.devnull

(如果您重定向Python的文件描述符-1,子进程遵循重定向。在open(os.devnull, 'w')调用产生一种流体fileno()大于2)



Answer 3:

我们也可以使用默认的文件迭代器读取标准输出,而不是使用与ITER建造的ReadLine()。

import subprocess
import sys
process = subprocess.Popen(your_command, stdout=subprocess.PIPE)
for line in process.stdout:
    sys.stdout.write(line)


Answer 4:

如果你能够使用第三方库,你也许能够使用的东西像sarge (披露:我是它的维护者)。 该库允许从子过程非阻塞访问输出流-它的分层在subprocess模块。



Answer 5:

一个好的,但“重量级”的解决方案是使用双绞线 - 见底部。

如果你愿意忍受只有标准输出沿着这些线路的东西应该工作:

import subprocess
import sys
popenobj = subprocess.Popen(["ls", "-Rl"], stdout=subprocess.PIPE)
while not popenobj.poll():
   stdoutdata = popenobj.stdout.readline()
   if stdoutdata:
      sys.stdout.write(stdoutdata)
   else:
      break
print "Return code", popenobj.returncode

(如果你使用read(),它会尝试读取整个“文件”,这是没有用的,我们真正可以在这里使用的东西,读取所有这就是在管道中的数据现在)

人们可能会尝试与线程,例如接近这一点:

import subprocess
import sys
import threading

popenobj = subprocess.Popen("ls", stdout=subprocess.PIPE, shell=True)

def stdoutprocess(o):
   while True:
      stdoutdata = o.stdout.readline()
      if stdoutdata:
         sys.stdout.write(stdoutdata)
      else:
         break

t = threading.Thread(target=stdoutprocess, args=(popenobj,))
t.start()
popenobj.wait()
t.join()
print "Return code", popenobj.returncode

现在,我们有可能通过具有两个线程添加标准错误也是如此。

但是请注意,该子文档劝阻直接使用这些文件,并建议使用communicate()最关注的,我认为死锁是不是一个问题上)和解决方案是有点klunky所以它真的好像子模块不非常胜任这项工作 (另见: http://www.python.org/dev/peps/pep-3145/ ),我们需要看看别的吧。

一个更复杂的解决方案是使用双绞线 ,如下所示: https://twistedmatrix.com/documents/11.1.0/core/howto/process.html

你做的方式扭曲是创建使用过程reactor.spawnprocess()并提供ProcessProtocol说,然后处理输出异步。 在扭曲的样本Python代码是在这里: https://twistedmatrix.com/documents/11.1.0/core/howto/listings/process/process.py



Answer 6:

它看起来像行缓冲输出会为你工作,在这种情况下,像下面这样可能适合。 (警告:这是未经测试)。这只会给子进程的stdout的实时性。 如果你想在实时两个标准错误和标准输出,你就必须做一些更复杂的select

proc = subprocess.Popen(run_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
while proc.poll() is None:
    line = proc.stdout.readline()
    print line
    log_file.write(line + '\n')
# Might still be data on stdout at this point.  Grab any
# remainder.
for line in proc.stdout.read().split('\n'):
    print line
    log_file.write(line + '\n')
# Do whatever you want with proc.stderr here...


Answer 7:

为什么不设置stdout直接sys.stdout ? 如果你需要输出到一个日志为好,那么你可以简单地覆盖f的写入方法。

import sys
import subprocess

class SuperFile(open.__class__):

    def write(self, data):
        sys.stdout.write(data)
        super(SuperFile, self).write(data)

f = SuperFile("log.txt","w+")       
process = subprocess.Popen(command, stdout=f, stderr=f)


Answer 8:

下面是我用我的项目之一,其类。 它重定向一个子进程日志的输出。 起初,我试图直接覆盖写入方法,但不工作的子进程将永远不会调用它(重定向发生在文件描述符级)。 所以,我用我自己的管道,类似于它是如何的子模块中完成的。 这具有在适配器封装所有日志记录/打印逻辑的优点,并且可以简单地传递该记录器的实例Popensubprocess.Popen("/path/to/binary", stderr = LogAdapter("foo"))

class LogAdapter(threading.Thread):

    def __init__(self, logname, level = logging.INFO):
        super().__init__()
        self.log = logging.getLogger(logname)
        self.readpipe, self.writepipe = os.pipe()

        logFunctions = {
            logging.DEBUG: self.log.debug,
            logging.INFO: self.log.info,
            logging.WARN: self.log.warn,
            logging.ERROR: self.log.warn,
        }

        try:
            self.logFunction = logFunctions[level]
        except KeyError:
            self.logFunction = self.log.info

    def fileno(self):
        #when fileno is called this indicates the subprocess is about to fork => start thread
        self.start()
        return self.writepipe

    def finished(self):
       """If the write-filedescriptor is not closed this thread will
       prevent the whole program from exiting. You can use this method
       to clean up after the subprocess has terminated."""
       os.close(self.writepipe)

    def run(self):
        inputFile = os.fdopen(self.readpipe)

        while True:
            line = inputFile.readline()

            if len(line) == 0:
                #no new data was added
                break

            self.logFunction(line.strip())

如果你不需要记录,而只是想使用print()可以很明显的删除代码的大部分,并保持类短。 你也可以通过展开__enter____exit__方法,并呼吁finished__exit__ ,让你可以轻松地使用它作为背景。



Answer 9:

所有上述解决方案我尝试的失败或者分离标准错误和标准输出输出,(多管)或永久封锁当OS管道缓冲区已满,当你正在运行的输出命令太快(没有此对Python的警告恰好子过程的轮询()手册)。 我发现的唯一可靠的方法是通过选择,但是这是一个只POSIX标准的解决方案:

import subprocess
import sys
import os
import select
# returns command exit status, stdout text, stderr text
# rtoutput: show realtime output while running
def run_script(cmd,rtoutput=0):
    p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    poller = select.poll()
    poller.register(p.stdout, select.POLLIN)
    poller.register(p.stderr, select.POLLIN)

    coutput=''
    cerror=''
    fdhup={}
    fdhup[p.stdout.fileno()]=0
    fdhup[p.stderr.fileno()]=0
    while sum(fdhup.values()) < len(fdhup):
        try:
            r = poller.poll(1)
        except select.error, err:
            if err.args[0] != EINTR:
                raise
            r=[]
        for fd, flags in r:
            if flags & (select.POLLIN | select.POLLPRI):
                c = os.read(fd, 1024)
                if rtoutput:
                    sys.stdout.write(c)
                    sys.stdout.flush()
                if fd == p.stderr.fileno():
                    cerror+=c
                else:
                    coutput+=c
            else:
                fdhup[fd]=1
    return p.poll(), coutput.strip(), cerror.strip()


Answer 10:

除了所有这些的答案,一个简单的方法也可以如下:

process = subprocess.Popen(your_command, stdout=subprocess.PIPE)

while process.stdout.readable():
    line = process.stdout.readline()

    if not line:
        break

    print(line.strip())

通过读取流循环,只要它是可读的,如果它得到一个空的结果,停止它。

这里的关键是, readline()与返回线\n末),只要有一个输出和空虚,如果它真的在最后。

希望这可以帮助别人。



Answer 11:

类似于以前的答案,但下面的解决方案使用Python3提供一个通用的方法来打印和登录的实时工作了,我在窗口( 获得实时输出,使用的Python ):

def print_and_log(command, logFile):
    with open(logFile, 'wb') as f:
        command = subprocess.Popen(command, stdout=subprocess.PIPE, shell=True)

        while True:
            output = command.stdout.readline()
            if not output and command.poll() is not None:
                f.close()
                break
            if output:
                f.write(output)
                print(str(output.strip(), 'utf-8'), flush=True)
        return command.poll()


Answer 12:

基于上述所有我建议稍微修改后的版本(python3):

  • 而循环中调用的ReadLine(国际热核实验堆的解决方案提示似乎永远阻止我 - Python 3中,Windows 7中)
  • structered所以处理读取数据并不需要被复制poll返回一事─后None
  • 标准错误管道输送到标准输出,以便输出两个输出被读
  • 添加代码来获得CMD的出口值。

码:

import subprocess
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE,
                        stderr=subprocess.STDOUT, universal_newlines=True)
while True:
    rd = proc.stdout.readline()
    print(rd, end='')  # and whatever you want to do...
    if not rd:  # EOF
        returncode = proc.poll()
        if returncode is not None:
            break
        time.sleep(0.1)  # cmd closed stdout, but not exited yet

# You may want to check on ReturnCode here


Answer 13:

我认为subprocess.communicate方法是有点误导:实际填写输出错误您在指定subprocess.Popen

然而,从阅读subprocess.PIPE ,你可以提供给subprocess.Popen输出错误参数最终将填补OS管道缓冲区和死锁您的应用程序(特别是如果你多进程/线程必须使用subprocess )。

我提出的解决方案是提供了输出错误处理文件-和读取文件的内容,而不是从锁死阅读PIPE 。 这些文件可以tempfile.NamedTemporaryFile() -它也可以用于阅读,当他们被写入到访问subprocess.communicate

下面是一个示例用法:

        try:
            with ProcessRunner(('python', 'task.py'), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        catch ProcessError as e:
            print(e.error_message)
            raise

这是源代码,这是准备与尽可能多的意见可以作为我公司可提供给它做了什么:

如果你正在使用python 2,请务必首先从PyPI将安装最新版本的subprocess32包。


import os
import sys
import threading
import time
import tempfile
import logging

if os.name == 'posix' and sys.version_info[0] < 3:
    # Support python 2
    import subprocess32 as subprocess
else:
    # Get latest and greatest from python 3
    import subprocess

logger = logging.getLogger(__name__)


class ProcessError(Exception):
    """Base exception for errors related to running the process"""


class ProcessTimeout(ProcessError):
    """Error that will be raised when the process execution will exceed a timeout"""


class ProcessRunner(object):
    def __init__(self, args, env=None, timeout=None, bufsize=-1, seconds_to_wait=0.25, **kwargs):
        """
        Constructor facade to subprocess.Popen that receives parameters which are more specifically required for the
        Process Runner. This is a class that should be used as a context manager - and that provides an iterator
        for reading captured output from subprocess.communicate in near realtime.

        Example usage:


        try:
            with ProcessRunner(('python', task_file_path), env=os.environ.copy(), seconds_to_wait=0.01) as process_runner:
                for out in process_runner:
                    print(out)
        catch ProcessError as e:
            print(e.error_message)
            raise

        :param args: same as subprocess.Popen
        :param env: same as subprocess.Popen
        :param timeout: same as subprocess.communicate
        :param bufsize: same as subprocess.Popen
        :param seconds_to_wait: time to wait between each readline from the temporary file
        :param kwargs: same as subprocess.Popen
        """
        self._seconds_to_wait = seconds_to_wait
        self._process_has_timed_out = False
        self._timeout = timeout
        self._process_done = False
        self._std_file_handle = tempfile.NamedTemporaryFile()
        self._process = subprocess.Popen(args, env=env, bufsize=bufsize,
                                         stdout=self._std_file_handle, stderr=self._std_file_handle, **kwargs)
        self._thread = threading.Thread(target=self._run_process)
        self._thread.daemon = True

    def __enter__(self):
        self._thread.start()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self._thread.join()
        self._std_file_handle.close()

    def __iter__(self):
        # read all output from stdout file that subprocess.communicate fills
        with open(self._std_file_handle.name, 'r') as stdout:
            # while process is alive, keep reading data
            while not self._process_done:
                out = stdout.readline()
                out_without_trailing_whitespaces = out.rstrip()
                if out_without_trailing_whitespaces:
                    # yield stdout data without trailing \n
                    yield out_without_trailing_whitespaces
                else:
                    # if there is nothing to read, then please wait a tiny little bit
                    time.sleep(self._seconds_to_wait)

            # this is a hack: terraform seems to write to buffer after process has finished
            out = stdout.read()
            if out:
                yield out

        if self._process_has_timed_out:
            raise ProcessTimeout('Process has timed out')

        if self._process.returncode != 0:
            raise ProcessError('Process has failed')

    def _run_process(self):
        try:
            # Start gathering information (stdout and stderr) from the opened process
            self._process.communicate(timeout=self._timeout)
            # Graceful termination of the opened process
            self._process.terminate()
        except subprocess.TimeoutExpired:
            self._process_has_timed_out = True
            # Force termination of the opened process
            self._process.kill()

        self._process_done = True

    @property
    def return_code(self):
        return self._process.returncode





Answer 14:

在Python化解决方案中没有为我工作。 原来, proc.stdout.read()或类似的可能永远阻塞。

因此,我用tee像这样:

subprocess.run('./my_long_running_binary 2>&1 | tee -a my_log_file.txt && exit ${PIPESTATUS}', shell=True, check=True, executable='/bin/bash')

如果你已经在使用该解决方案是方便shell=True

${PIPESTATUS}捕捉整个命令链的成功状态(仅Bash中可用)。 如果我省略了&& exit ${PIPESTATUS} ,那么这将始终返回零,因为tee永远不会失败。

unbuffer可能会为立即打印每一行到终端,而不是等待的时间太长了,直到“管道缓冲区”得到填补是必要的。 然而,无缓冲燕子断言的退出状态(SIG中止)...

2>&1还记录stderror到文件中。



文章来源: live output from subprocess command