检查,看看是否有更多的数据使用Python的选择模块文件进行读取,(Checking to see

2019-10-20 03:47发布

我有一个螺纹内创建子进程,以使得螺纹可以不断地检查(来自stdout或stderr)特定输出条件,并且调用相应的回调的程序,而该程序的其余部分继续。 下面是该代码的削减的版本:

import select
import subprocess
import threading

def run_task():
    command = ['python', 'a-script-that-outputs-lines.py']
    proc = subprocess.Popen(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
    while True:

        ready, _, _ = select.select((proc.stdout, proc.stderr), (), (), .1)

        if proc.stdout in ready:
            next_line_to_process = proc.stdout.readline()
            # process the output

        if proc.stderr in ready:
            next_line_to_process = proc.stderr.readline()
            # process the output

        if not ready and proc.poll() is not None:
            break

thread = threading.Thread(target = run_task)
thread.run()

它的工作原理相当不错,但我想线程退出一次满足两个条件:正在运行的子进程已经完成,并全部输出和错误数据已被处理。

我有困难的是,如果我的最后一个条件是,因为它是以上( if not ready and proc.poll() is not None ),那么线程永远不会退出,因为一旦输出和错误的文件描述符被标记为准备好了,他们从来没有成为未就绪(甚至之后的所有数据已经从他们阅读, read()会挂起或readline()会返回一个空字符串)。

如果我改变这种状况,只是if proc.poll() is not None ,则循环存在程序退出的时候,我不能保证它看到所有需要进行处理的数据。

这仅仅是个错误的做法,或者是有什么办法可以可靠地确定,当你读过一切都不会被写入文件描述符的数据? 或者这是一个问题,具体到尝试从一个子进程的STDERR /标准输出读?

我已经在Python 2.5的尝试这种(在OS X上运行),也试过select.poll()select.epoll()上的Python 2.6为基础的变体(用2.6内核在Debian上运行)。

Answer 1:

select ,如果你想了解您是否可以从管道无阻塞读取模块是合适的。

为了确保您已经阅读所有的数据,使用一个简单的条件if proc.poll() is not None: break ,并呼吁rest = [pipe.read() for pipe in [p.stdout, p.stderr]]后循环。

这是不可能的一个子进程关闭其标准输出/标准错误之前其关闭,因此你可以跳过处理EOF的简单逻辑。


不要叫Thread.run()直接使用Thread.start()来代替。 你可能不会在这里需要单独的线程在所有。

不要叫p.stdout.readline()后, select()它可能会阻止,使用os.read(p.stdout.fileno(), limit)来代替。 空字节串表示用于相应的管EOF。


作为替代或补充你可以把使用管道非阻塞fcntl模块:

import os
from fcntl import fcntl, F_GETFL, F_SETFL

def make_nonblocking(fd):
    return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | os.O_NONBLOCK)

而在阅读处理IO / OS错误。



Answer 2:

我的最终解决,正如我上面提到的,是下面的情况下,这是有帮助的人。 我认为这是正确的做法,因为我现在是97.2%肯定你不能做到这一点,只需select() / poll()read()

import select
import subprocess
import threading

def run_task():
    command = ['python', 'a-script-that-outputs-lines.py']
    proc = subprocess.Popen(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
    while True:

        ready, _, _ = select.select((proc.stdout, proc.stderr), (), (), .1)

        if proc.stdout in ready:
            next_line_to_process = proc.stdout.readline()
            if next_line_to_process:
                # process the output
            elif proc.returncode is not None:
                # The program has exited, and we have read everything written to stdout
                ready = filter(lambda x: x is not proc.stdout, ready)

        if proc.stderr in ready:
            next_line_to_process = proc.stderr.readline()
            if next_line_to_process:
                # process the output
            elif proc.returncode is not None:
                # The program has exited, and we have read everything written to stderr
                ready = filter(lambda x: x is not proc.stderr, ready)

        if proc.poll() is not None and not ready:
            break

thread = threading.Thread(target = run_task)
thread.run()


Answer 3:

你可以做一个原始os.read(fd, size)在管道的文件描述符,而不是使用readline() 这是一个非阻塞操作,也可以检测EOF(在这种情况下,它返回一个空字符串或字节对象)。 你必须执行行分裂和缓冲自己。 使用这样的事情:

class NonblockingReader():
  def __init__(self, pipe):
    self.fd = pipe.fileno()
    self.buffer = ""

  def readlines(self):
    data = os.read(self.fd, 2048)
    if not data:
      return None

    self.buffer += data
    if os.linesep in self.buffer:
      lines = self.buffer.split(os.linesep)
      self.buffer = lines[-1]
      return lines[:-1]
    else:
      return []


文章来源: Checking to see if there is more data to read from a file descriptor using Python's select module