我测试的子进程管道与蟒蛇。 我知道,我可以做什么节目下方直接做在Python中,但是这不是问题的关键。 我只是想测试管道,所以我知道如何使用它。
我的系统是Linux操作系统Ubuntu 9.04与默认的Python 2.6。
我开始与这个文档例子 。
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output
这样的作品,但由于p1
的stdin
没有被重定向,我在终端类型的东西喂管。 当我键入^D
关闭标准输入,我得到了我想要的输出。
不过,我想将数据发送到使用python的字符串变量的管道。 首先,我试着写在标准输入:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here
没有工作。 我试着用p2.stdout.read()
上,而不是最后一行,但它也阻止。 我加p1.stdin.flush()
和p1.stdin.close()
但它也不能工作。 我后来我搬到了交流:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0]
所以,这仍然不是。
我注意到,运行一个进程(如p1
以上,去除p2
)完美的作品。 和通过的文件句柄p1
( stdin=open(...)
也有效。 所以,问题是:
是否有可能将数据传递到的蟒蛇2或多个子管道,不阻塞? 为什么不?
我知道我可以运行shell并运行在shell管道,但是这不是我想要的。
更新1:按照下面我亚伦Digulla的提示我现在尝试使用线程,使其工作。
首先,我已经试过在一个线程中运行p1.communicate。
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here
好吧,没有工作。 试过其他组合等改变它.write()
并且还p2.read()
没有。 现在让我们尝试了相反的做法:
def get_output(subp):
output = subp.communicate()[0] # blocks on thread
print 'GOT:', output
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,))
t.start()
p1.communicate('data\n') # blocks here.
t.join()
代码最终阻挡某处。 无论是在产生的线程,或在主线程,或两者兼而有之。 因此,它没有工作。 如果你知道如何使它发挥作用,将能够更方便,如果你能提供工作代码。 我想在这里。
更新2
保罗·杜波依斯的一些信息下面回答,所以我做更多的测试。 我读过整个subprocess.py
模块,并得到了它是如何工作的。 所以,我想正是申请代码。
我在Linux上,但因为我是用线程测试,我的第一种方法是复制精确的Windows线程看到代码subprocess.py
的communicate()
方法,但对于两个过程,而不是一个。 下面是我的尝试整个上市:
import os
from subprocess import Popen, PIPE
import threading
def get_output(fobj, buffer):
while True:
chunk = fobj.read() # BLOCKS HERE
if not chunk:
break
buffer.append(chunk)
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread
for x in xrange(100000):
p1.stdin.write('hello world\n') # write data
p1.stdin.flush()
p1.stdin.close() # close input...
t.join()
好。 它没有工作。 即使在p1.stdin.close()
被调用, p2.stdout.read()
仍块。
然后我试图在POSIX代码subprocess.py
:
import os
from subprocess import Popen, PIPE
import select
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
while to_read or to_write:
read_now, write_now, xlist = select.select(to_read, to_write, [])
if read_now:
data = os.read(p2.stdout.fileno(), 1024)
if not data:
p2.stdout.close()
to_read = []
else:
b.append(data)
if write_now:
if numwrites > 0:
numwrites -= 1
p1.stdin.write('hello world!\n'); p1.stdin.flush()
else:
p1.stdin.close()
to_write = []
print b
上也块select.select()
通过散布print
S周围,我发现了这一点:
- 阅读工作。 代码执行时读了很多次。
- 写作也在努力。 数据写入
p1.stdin
。 - 在结束
numwrites
, p1.stdin.close()
被调用。 - 当
select()
开始堵,只有to_read
拥有的东西, p2.stdout
。 to_write
已经是空的。 -
os.read()
调用始终返回的东西,所以p2.stdout.close()
不会被调用。
从两个试验结论 :关闭stdin
所述第一进程的在管道( grep
在本例中)不使它其缓冲的输出转储到下一个和死亡。
没办法,使工作?
PS:我不想使用临时文件,我已经与文件测试,我知道它的工作原理。 而且我不希望使用Windows。
Answer 1:
我发现了如何做到这一点。
这是不是线程,不是选择()。
当我运行第一处理( grep
),它创建两个低级文件描述符,一个用于每个管。 让我们称那些a
和b
。
当我运行第二个进程, b
被传递到cut
sdtin
。 但在脑死亡的默认Popen
- close_fds=False
。
那效果是cut
也继承了a
。 所以, grep
,如果我收甚至不能死a
,因为标准输入仍然是开放的cut
的过程( cut
忽略它)。
下面的代码现在运行完美。
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read()
assert result == "Hello Worl\n"
close_fds=True
,应在UNIX系统上DEFAULT。 在Windows上,关闭所有产品安全,所以它可以防止管道。
编辑:
PS:对于有类似的问题,阅读这样的回答:作为pooryorick在评论说,这也可以阻止如果写入数据p1.stdin
比缓冲区大。 在这种情况下,你应该块的数据成小块,和使用select.select()
知道什么时候读/写。 在这个问题的代码应该给出如何实现的提示。
EDIT2:发现了另一个解决方案,从pooryorick更多的帮助-而不是使用close_fds=True
,并关闭所有 FDS,人们可以关闭fd
执行第二时属于第一过程中,S,它会工作。 收盘必须在孩子做这样的preexec_fn
从POPEN功能就非常方便的做到这一点。 在P2上执行,你可以这样做:
p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)
Answer 2:
与大型文件
两个原则需要均匀地施加在Python大文件时。
- 由于任何IO程序可以阻止,我们必须保持管道的每个阶段在不同的线程或进程。 我们使用的线程在这个例子中,但子进程将让你避免了GIL。
- 我们必须用增量的读取和写入,让我们不要等待
EOF
开始取得进展之前。
一个替代方案是使用非阻塞IO,虽然这是在标准Python繁琐。 见GEVENT对于非阻塞使用原语实现了同步IO API的轻量级线程库。
示例代码
我们将构建一个愚蠢的管道是大致
{cat /usr/share/dict/words} | grep -v not \
| {upcase, filtered tee to stderr} | cut -c 1-10 \
| {translate 'E' to '3'} | grep K | grep Z | {downcase}
其中在大括号每一级{}
在Python被实现,而其他使用标准的外部程序。 TL; DR: 看到这个要点 。
我们先从预期的进口。
#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading
管道的Python的阶段
但所有管道的最后一个Python的实现阶段需要在一个线程中去,这样它的IO不会阻止其他人。 这些可能不是在Python子进程运行,如果你想他们真正并行运行(避开GIL)。
def writer(output):
for line in open('/usr/share/dict/words'):
output.write(line)
output.close()
def filter(input, output):
for line in input:
if 'k' in line and 'z' in line: # Selective 'tee'
sys.stderr.write('### ' + line)
output.write(line.upper())
output.close()
def leeter(input, output):
for line in input:
output.write(line.replace('E', '3'))
output.close()
上述这些需求被投入在自己的线程,我们将不使用此功能方便。
def spawn(func, **kwargs):
t = threading.Thread(target=func, kwargs=kwargs)
t.start()
return t
创建管道
创建使用外部阶段Popen
和使用Python阶段spawn
。 这个论点bufsize=-1
表示使用系统的默认缓冲(通常为4 KIB)。 这通常是比默认值(无缓冲)或行缓冲速度更快,但你要行缓冲,如果你想直观地监视没有延迟输出。
grepv = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)
twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)
驾驶管道
如上组装,在管道中的所有缓冲区将填满,但因为没有人是从末端(阅读grepz.stdout
),他们将所有的块。 我们可以看整个事情在一个调用grepz.stdout.read()
但会使用大量内存大文件。 相反,我们不断的读取。
for line in grepz.stdout:
sys.stdout.write(line.lower())
该线程和进程清理,一旦达到EOF
。 我们可以明确地清理使用
for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()
Python的2.6和更早版本
在内部, subprocess.Popen
来电fork
,配置管文件描述符,并调用exec
。 从子进程fork
在父进程所有文件描述符的副本, 这两个副本都需要被关闭之前,相应的读者将获得EOF
。 这可以通过手动地闭合管道(无论是通过固定close_fds=True
或合适preexec_fn
参数subprocess.Popen
)或通过设置FD_CLOEXEC
标志具有exec
自动关闭的文件描述符。 这个标志在Python-2.7自动设置,后来,看到issue12786 。 我们可以通过调用得到早期版本的Python的Python 2.7的行为
p._set_cloexec_flags(p.stdin)
前传递p.stdin
作为参数传递给随后的subprocess.Popen
。
Answer 3:
有对制作烟斗工作三大招数预期
确保管的两端各在不同的线程/进程(一些靠近顶部的例子存在这个问题)使用。
明确地关闭管道的未使用端在每个处理
由要么禁用它(Python的-u选项)处理缓冲,使用的pty的,或者干脆填写了一些缓冲,不会影响数据,(也许“\ n”,但是任何适合)。
Python的“管道”模块(我的作者)中的示例适合您的方案完全相同,使低级别的步骤相当清晰。
http://pypi.python.org/pypi/pipeline/
最近,我使用的子模块作为生产者处理器的消费者 - 控制器模式的一部分:
http://www.darkarchive.org/w/Pub/PythonInteract
本实施例中用缓冲标准输入比不求助于使用PTY,并且还示出了管道端部应被关闭的位置。 我宁愿进程线程,但原理是一样的。 此外,它说明了队列同步到饲料生产者和消费者收集输出,以及如何将它们完全关闭(看出来插入到队列中的哨兵)。 该模式允许根据最近的输出将要生成新的输入,允许递归发现和处理。
Answer 4:
如果太多的数据写入到管道的接收端Nosklo的提供的解决方案将很快突破:
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n' * 20000)
p1.stdin.close()
result = p2.stdout.read()
assert result == "Hello Worl\n"
如果此脚本不你的机器上挂,只是增加“20000”的东西,超过你的操作系统管缓冲区的大小。
这是因为操作系统正在缓冲输入的“grep”,但一旦缓冲区已满, p1.stdin.write
调用将阻塞,直至有从读取p2.stdout
。 在玩具的场景,你可以用的方式写入/从在同一过程中管道读取,但在正常使用的情况,有必要从一个线程/进程写入和从一个单独的线程/进程读取。 这是真实的subprocess.popen,os.pipe,os.popen *等。
另外一个奇怪的是,有时你想保持饲养与同一管道的早期输出生成的项目管道。 解决的办法是使双方的管式加料器和管道读者异步的人程序,并实现两个队列:主程序和管式加料器和主程序和管读取器之间的一个之间的一个。 PythonInteract是这样一个例子。
子是一个很好的方便的模式,但由于它隐藏os.popen的细节,并呼吁os.fork它的引擎盖下,它有时更难以处理比低级别的调用它利用。 出于这个原因,子进程并不了解进程间的管道到底是如何工作的好办法。
Answer 5:
你必须在多个线程做到这一点。 孩子P1不会读取你的输入,因为P2没有读P1的输出,因为你不读P2的输出:否则,你将在一个情况下,你不能发送数据结束。
所以你需要一个后台线程读取什么P2写出。 这将允许P2一些数据写入到管道后继续,因此它可以读取输入的从P1的下一行,再次让P1处理您发送给它的数据。
或者,您可以将数据发送到P1与后台线程和读取从P2的输出在主线程。 但是,任何一方必须是一个线程。
Answer 6:
在回答nosklo的说法(见其他意见对这个问题),它不能没有做到close_fds=True
:
close_fds=True
只是必要的,如果你离开了其他文件描述符打开。 当打开多个子进程,这是一件好事,持续跟踪可能会继承打开的文件,并明确地关闭不需要的任何:
from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p1.stdin.write('Hello World\n')
p1.stdin.close()
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
result = p2.stdout.read()
assert result == "Hello Worl\n"
close_fds
默认为False
,因为子倾向于信任调用程序知道它与打开的文件描述符做的,只是提供一个容易的选择主叫方所有关闭他们,如果这就是它想做的事。
但真正的问题是,管道缓冲区会咬你的所有,但玩具的例子。 正如我在其他的答案对这个问题说,经验法则是不是在同一个进程/线程打开你的读者和您的作家。 谁想要使用的子模块,用于双向通信将很好地服务于研究os.pipe和os.fork,第一。 他们其实并不难,如果你有一个使用很好的例子,来看看。
Answer 7:
我想你可能会检查错误的问题。 当然,亚伦说,如果你尝试既是生产者管道的开始,以及管道末端的消费者,很容易陷入死锁状态。 这是沟通()解决了这个问题。
沟通()是不完全正确的,因为你stdin和stdout在不同的子对象; 但如果你在subprocess.py看看落实,你会看到,它不正是亚伦建议。
一旦你看到沟通读取和写入,你会看到在你的第二个尝试沟通()与P2为P1的输出竞争:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
# ...
p1.communicate('data\n') # reads from p1.stdout, as does p2
我在Win32平台上,这肯定有不同的I / O和缓存办学特色,但是这对我的作品:
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,))
t.start()
p1.stdin.write('hello world\n' * 100000)
p1.stdin.close()
t.join()
我使用幼稚无螺纹p2.read时调谐的输入尺寸,以产生死锁()
您也可以尝试缓冲到一个文件中,如
fd, _ = tempfile.mkstemp()
os.write(fd, 'hello world\r\n' * 100000)
os.lseek(fd, 0, os.SEEK_SET)
p1 = Popen(["grep", "-v", "not"], stdin=fd, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
print p2.stdout.read()
这也为我工作没有死锁。
Answer 8:
在上述意见中的一个,我挑战nosklo要么张贴一些代码来支持自己的主张约select.select
或给予好评我回应他此前曾下投票。 他回答下面的代码:
from subprocess import Popen, PIPE
import select
p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
data_to_write = 100000 * 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0
while to_read or to_write:
read_now, write_now, xlist = select.select(to_read, to_write, [])
if read_now:
data = p2.stdout.read(1024)
if not data:
p2.stdout.close()
to_read = []
else:
b.append(data)
if write_now:
if written < len(data_to_write):
part = data_to_write[written:written+1024]
written += len(part)
p1.stdin.write(part); p1.stdin.flush()
else:
p1.stdin.close()
to_write = []
print b
这个脚本的一个问题是,它的第二猜测系统管道缓冲区的大小/性质。 该脚本会出现故障少,如果它可以去除像1024幻数。
最大的问题是,这个脚本代码只与数据输入和外部程序的正确组合都能正常工作。 grep和切割线都工作,所以他们的内部缓冲区的行为有点不同。 如果我们用一个更通用的命令,如“猫”,和写入数据的小钻头进入管道,致命的竞争条件会更频繁地弹出:
from subprocess import Popen, PIPE
import select
import time
p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
data_to_write = 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0
while to_read or to_write:
time.sleep(1)
read_now, write_now, xlist = select.select(to_read, to_write, [])
if read_now:
print 'I am reading now!'
data = p2.stdout.read(1024)
if not data:
p1.stdout.close()
to_read = []
else:
b.append(data)
if write_now:
print 'I am writing now!'
if written < len(data_to_write):
part = data_to_write[written:written+1024]
written += len(part)
p1.stdin.write(part); p1.stdin.flush()
else:
print 'closing file'
p1.stdin.close()
to_write = []
print b
在这种情况下,两种不同的结果将体现:
write, write, close file, read -> success
write, read -> hang
所以,我再次nosklo挑战要么邮编展示使用select.select
处理任意输入和管道缓冲从单个线程,或给予好评我的反应。
底线:不要试图从一个单独的线程处理管道的两端。 这只是不值得。 见管道用于如何正确地做到这一点一个很好的低级别的例子。
Answer 9:
怎么样使用SpooledTemporaryFile? 这就免去了(但也许并没有解决)问题:
http://docs.python.org/library/tempfile.html#tempfile.SpooledTemporaryFile
您可以写信给它像一个文件,但它实际上是一个内存块。
还是我完全误解...
Answer 10:
下面是使用POPEN连同os.fork来完成同样的事情的一个例子。 而不是使用的close_fds
它只是关闭在正确的地方的管道。 远远高于试图用简单的select.select
,并充分利用系统管道的缓冲区。
from subprocess import Popen, PIPE
import os
import sys
p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)
pid = os.fork()
if pid: #parent
p1.stdin.close()
p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE)
data = p2.stdout.read()
sys.stdout.write(data)
p2.stdout.close()
else: #child
data_to_write = 'hello world\n' * 100000
p1.stdin.write(data_to_write)
p1.stdin.close()
Answer 11:
它比你想象的要简单得多!
import sys
from subprocess import Popen, PIPE
# Pipe the command here. It will read from stdin.
# So cat a file, to stdin, like (cat myfile | ./this.py),
# or type on terminal and hit control+d when done, etc
# No need to handle this yourself, that's why we have shell's!
p = Popen("grep -v not | cut -c 1-10", shell=True, stdout=PIPE)
nextData = None
while True:
nextData = p.stdout.read()
if nextData in (b'', ''):
break
sys.stdout.write ( nextData.decode('utf-8') )
p.wait()
此代码为Python 3.6编写,并与Python 2.7的作品。
使用它,如:
cat README.md | python ./example.py
要么
python example.py < README.md
要管“README.md”的这个节目的内容。
但..在这一点上,为什么不只是像你想使用“猫”直接和管道输出? 喜欢:
cat filename | grep -v not | cut -c 1-10
键入到控制台将做的工作也是如此。 我个人只使用代码的选择,如果我被进一步处理输出,否则shell脚本会更容易维护和保留。
你只要,使用壳里做管道适合你。 在一个出。 这就是她会是在干什么,管理流程和管理输入和输出的单宽链大。 有人会说它是一个shell的最好的非交互式功能..
文章来源: blocks - send input to python subprocess pipeline