我有一个螺纹内创建子进程,以使得螺纹可以不断地检查(来自stdout或stderr)特定输出条件,并且调用相应的回调的程序,而该程序的其余部分继续。 下面是该代码的削减的版本:
import select
import subprocess
import threading
def run_task():
command = ['python', 'a-script-that-outputs-lines.py']
proc = subprocess.Popen(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
while True:
ready, _, _ = select.select((proc.stdout, proc.stderr), (), (), .1)
if proc.stdout in ready:
next_line_to_process = proc.stdout.readline()
# process the output
if proc.stderr in ready:
next_line_to_process = proc.stderr.readline()
# process the output
if not ready and proc.poll() is not None:
break
thread = threading.Thread(target = run_task)
thread.run()
它的工作原理相当不错,但我想线程退出一次满足两个条件:正在运行的子进程已经完成,并全部输出和错误数据已被处理。
我有困难的是,如果我的最后一个条件是,因为它是以上( if not ready and proc.poll() is not None
),那么线程永远不会退出,因为一旦输出和错误的文件描述符被标记为准备好了,他们从来没有成为未就绪(甚至之后的所有数据已经从他们阅读, read()
会挂起或readline()
会返回一个空字符串)。
如果我改变这种状况,只是if proc.poll() is not None
,则循环存在程序退出的时候,我不能保证它看到所有需要进行处理的数据。
这仅仅是个错误的做法,或者是有什么办法可以可靠地确定,当你读过一切都不会被写入文件描述符的数据? 或者这是一个问题,具体到尝试从一个子进程的STDERR /标准输出读?
我已经在Python 2.5的尝试这种(在OS X上运行),也试过select.poll()
和select.epoll()
上的Python 2.6为基础的变体(用2.6内核在Debian上运行)。
select
,如果你想了解您是否可以从管道无阻塞读取模块是合适的。
为了确保您已经阅读所有的数据,使用一个简单的条件if proc.poll() is not None: break
,并呼吁rest = [pipe.read() for pipe in [p.stdout, p.stderr]]
后循环。
这是不可能的一个子进程关闭其标准输出/标准错误之前其关闭,因此你可以跳过处理EOF的简单逻辑。
不要叫Thread.run()
直接使用Thread.start()
来代替。 你可能不会在这里需要单独的线程在所有。
不要叫p.stdout.readline()
后, select()
它可能会阻止,使用os.read(p.stdout.fileno(), limit)
来代替。 空字节串表示用于相应的管EOF。
作为替代或补充你可以把使用管道非阻塞fcntl
模块:
import os
from fcntl import fcntl, F_GETFL, F_SETFL
def make_nonblocking(fd):
return fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | os.O_NONBLOCK)
而在阅读处理IO / OS错误。
我的最终解决,正如我上面提到的,是下面的情况下,这是有帮助的人。 我认为这是正确的做法,因为我现在是97.2%肯定你不能做到这一点,只需select()
/ poll()
和read()
import select
import subprocess
import threading
def run_task():
command = ['python', 'a-script-that-outputs-lines.py']
proc = subprocess.Popen(command, stdout = subprocess.PIPE, stderr = subprocess.PIPE)
while True:
ready, _, _ = select.select((proc.stdout, proc.stderr), (), (), .1)
if proc.stdout in ready:
next_line_to_process = proc.stdout.readline()
if next_line_to_process:
# process the output
elif proc.returncode is not None:
# The program has exited, and we have read everything written to stdout
ready = filter(lambda x: x is not proc.stdout, ready)
if proc.stderr in ready:
next_line_to_process = proc.stderr.readline()
if next_line_to_process:
# process the output
elif proc.returncode is not None:
# The program has exited, and we have read everything written to stderr
ready = filter(lambda x: x is not proc.stderr, ready)
if proc.poll() is not None and not ready:
break
thread = threading.Thread(target = run_task)
thread.run()
你可以做一个原始os.read(fd, size)
在管道的文件描述符,而不是使用readline()
这是一个非阻塞操作,也可以检测EOF(在这种情况下,它返回一个空字符串或字节对象)。 你必须执行行分裂和缓冲自己。 使用这样的事情:
class NonblockingReader():
def __init__(self, pipe):
self.fd = pipe.fileno()
self.buffer = ""
def readlines(self):
data = os.read(self.fd, 2048)
if not data:
return None
self.buffer += data
if os.linesep in self.buffer:
lines = self.buffer.split(os.linesep)
self.buffer = lines[-1]
return lines[:-1]
else:
return []
文章来源: Checking to see if there is more data to read from a file descriptor using Python's select module