块 - 输入发送到Python子管道(blocks - send input to python s

2019-08-22 00:06发布

我测试的子进程管道与蟒蛇。 我知道,我可以做什么节目下方直接做在Python中,但是这不是问题的关键。 我只是想测试管道,所以我知道如何使用它。

我的系统是Linux操作系统Ubuntu 9.04与默认的Python 2.6。

我开始与这个文档例子 。

from subprocess import Popen, PIPE
p1 = Popen(["grep", "-v", "not"], stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
output = p2.communicate()[0]
print output

这样的作品,但由于p1stdin没有被重定向,我在终端类型的东西喂管。 当我键入^D关闭标准输入,我得到了我想要的输出。

不过,我想将数据发送到使用python的字符串变量的管道。 首先,我试着写在标准输入:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.stdin.write('test\n')
output = p2.communicate()[0] # blocks forever here

没有工作。 我试着用p2.stdout.read()上,而不是最后一行,但它也阻止。 我加p1.stdin.flush()p1.stdin.close()但它也不能工作。 我后来我搬到了交流:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
p1.communicate('test\n') # blocks forever here
output = p2.communicate()[0] 

所以,这仍然不是。

我注意到,运行一个进程(如p1以上,去除p2 )完美的作品。 和通过的文件句柄p1stdin=open(...)也有效。 所以,问题是:

是否有可能将数据传递到的蟒蛇2或多个子管道,不阻塞? 为什么不?

我知道我可以运行shell并运行在shell管道,但是这不是我想要的。


更新1:按照下面我亚伦Digulla的提示我现在尝试使用线程,使其工作。

首先,我已经试过在一个线程中运行p1.communicate。

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=p1.communicate, args=('some data\n',))
t.start()
output = p2.communicate()[0] # blocks forever here

好吧,没有工作。 试过其他组合等改变它.write()并且还p2.read() 没有。 现在让我们尝试了相反的做法:

def get_output(subp):
    output = subp.communicate()[0] # blocks on thread
    print 'GOT:', output

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.communicate('data\n') # blocks here.
t.join()

代码最终阻挡某处。 无论是在产生的线程,或在主线程,或两者兼而有之。 因此,它没有工作。 如果你知道如何使它发挥作用,将能够更方便,如果你能提供工作代码。 我想在这里。


更新2

保罗·杜波依斯的一些信息下面回答,所以我做更多的测试。 我读过整个subprocess.py模块,并得到了它是如何工作的。 所以,我想正是申请代码。

我在Linux上,但因为我是用线程测试,我的第一种方法是复制精确的Windows线程看到代码subprocess.pycommunicate()方法,但对于两个过程,而不是一个。 下面是我的尝试整个上市:

import os
from subprocess import Popen, PIPE
import threading

def get_output(fobj, buffer):
    while True:
        chunk = fobj.read() # BLOCKS HERE
        if not chunk:
            break
        buffer.append(chunk)

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

b = [] # create a buffer
t = threading.Thread(target=get_output, args=(p2.stdout, b))
t.start() # start reading thread

for x in xrange(100000):
    p1.stdin.write('hello world\n') # write data
    p1.stdin.flush()
p1.stdin.close() # close input...
t.join()

好。 它没有工作。 即使在p1.stdin.close()被调用, p2.stdout.read()仍块。

然后我试图在POSIX代码subprocess.py

import os
from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)

numwrites = 100000
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer

while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = os.read(p2.stdout.fileno(), 1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if numwrites > 0:
            numwrites -= 1
            p1.stdin.write('hello world!\n'); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

上也块select.select() 通过散布print S周围,我发现了这一点:

  • 阅读工作。 代码执行时读了很多次。
  • 写作也在努力。 数据写入p1.stdin
  • 在结束numwritesp1.stdin.close()被调用。
  • select()开始堵,只有to_read拥有的东西, p2.stdoutto_write已经是空的。
  • os.read()调用始终返回的东西,所以p2.stdout.close()不会被调用。

从两个试验结论 :关闭stdin所述第一进程的在管道( grep在本例中)不使它其缓冲的输出转储到下一个和死亡。

没办法,使工作?

PS:我不想使用临时文件,我已经与文件测试,我知道它的工作原理。 而且我不希望使用Windows。

Answer 1:

我发现了如何做到这一点。

这是不是线程,不是选择()。

当我运行第一处理( grep ),它创建两个低级文件描述符,一个用于每个管。 让我们称那些ab

当我运行第二个进程, b被传递到cut sdtin 。 但在脑死亡的默认Popen - close_fds=False

那效果是cut也继承了a 。 所以, grep ,如果我收甚至不能死a ,因为标准输入仍然是开放的cut的过程( cut忽略它)。

下面的代码现在运行完美。

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n')
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds=True ,应在UNIX系统上DEFAULT。 在Windows上,关闭所有产品安全,所以它可以防止管道。

编辑:

PS:对于有类似的问题,阅读这样的回答:作为pooryorick在评论说,这也可以阻止如果写入数据p1.stdin比缓冲区大。 在这种情况下,你应该块的数据成小块,和使用select.select()知道什么时候读/写。 在这个问题的代码应该给出如何实现的提示。

EDIT2:发现了另一个解决方案,从pooryorick更多的帮助-而不是使用close_fds=True ,并关闭所有 FDS,人们可以关闭fd执行第二时属于第一过程中,S,它会工作。 收盘必须在孩子做这样的preexec_fn从POPEN功能就非常方便的做到这一点。 在P2上执行,你可以这样做:

p2 = Popen(cmd2, stdin=p1.stdout, stdout=PIPE, stderr=devnull, preexec_fn=p1.stdin.close)


Answer 2:

与大型文件

两个原则需要均匀地施加在Python大文件时。

  1. 由于任何IO程序可以阻止,我们必须保持管道的每个阶段在不同的线程或进程。 我们使用的线程在这个例子中,但子进程将让你避免了GIL。
  2. 我们必须用增量的读取和写入,让我们不要等待EOF开始取得进展之前。

一个替代方案是使用非阻塞IO,虽然这是在标准Python繁琐。 见GEVENT对于非阻塞使用原语实现了同步IO API的轻量级线程库。

示例代码

我们将构建一个愚蠢的管道是大致

{cat /usr/share/dict/words} | grep -v not              \
    | {upcase, filtered tee to stderr} | cut -c 1-10   \
    | {translate 'E' to '3'} | grep K | grep Z | {downcase}

其中在大括号每一级{}在Python被实现,而其他使用标准的外部程序。 TL; DR: 看到这个要点 。

我们先从预期的进口。

#!/usr/bin/env python
from subprocess import Popen, PIPE
import sys, threading

管道的Python的阶段

但所有管道的最后一个Python的实现阶段需要在一个线程中去,这样它的IO不会阻止其他人。 这些可能不是在Python子进程运行,如果你想他们真正并行运行(避开GIL)。

def writer(output):
    for line in open('/usr/share/dict/words'):
        output.write(line)
    output.close()
def filter(input, output):
    for line in input:
        if 'k' in line and 'z' in line: # Selective 'tee'
            sys.stderr.write('### ' + line)
        output.write(line.upper())
    output.close()
def leeter(input, output):
    for line in input:
        output.write(line.replace('E', '3'))
    output.close()

上述这些需求被投入在自己的线程,我们将不使用此功能方便。

def spawn(func, **kwargs):
    t = threading.Thread(target=func, kwargs=kwargs)
    t.start()
    return t

创建管道

创建使用外部阶段Popen和使用Python阶段spawn 。 这个论点bufsize=-1表示使用系统的默认缓冲(通常为4 KIB)。 这通常是比默认值(无缓冲)或行缓冲速度更快,但你要行缓冲,如果你想直观地监视没有延迟输出。

grepv   = Popen(['grep','-v','not'], stdin=PIPE, stdout=PIPE, bufsize=-1)
cut     = Popen(['cut','-c','1-10'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepk = Popen(['grep', 'K'], stdin=PIPE, stdout=PIPE, bufsize=-1)
grepz = Popen(['grep', 'Z'], stdin=grepk.stdout, stdout=PIPE, bufsize=-1)

twriter = spawn(writer, output=grepv.stdin)
tfilter = spawn(filter, input=grepv.stdout, output=cut.stdin)
tleeter = spawn(leeter, input=cut.stdout, output=grepk.stdin)

驾驶管道

如上组装,在管道中的所有缓冲区将填满,但因为没有人是从末端(阅读grepz.stdout ),他们将所有的块。 我们可以看整个事情在一个调用grepz.stdout.read()但会使用大量内存大文件。 相反,我们不断的读取。

for line in grepz.stdout:
    sys.stdout.write(line.lower())

该线程和进程清理,一旦达到EOF 。 我们可以明确地清理使用

for t in [twriter, tfilter, tleeter]: t.join()
for p in [grepv, cut, grepk, grepz]: p.wait()

Python的2.6和更早版本

在内部, subprocess.Popen来电fork ,配置管文件描述符,并调用exec 。 从子进程fork在父进程所有文件描述符的副本, 这两个副本都需要被关闭之前,相应的读者将获得EOF 。 这可以通过手动地闭合管道(无论是通过固定close_fds=True或合适preexec_fn参数subprocess.Popen )或通过设置FD_CLOEXEC标志具有exec自动关闭的文件描述符。 这个标志在Python-2.7自动设置,后来,看到issue12786 。 我们可以通过调用得到早期版本的Python的Python 2.7的行为

p._set_cloexec_flags(p.stdin)

前传递p.stdin作为参数传递给随后的subprocess.Popen



Answer 3:

有对制作烟斗工作三大招数预期

  1. 确保管的两端各在不同的线程/进程(一些靠近顶部的例子存在这个问题)使用。

  2. 明确地关闭管道的未使用端在每个处理

  3. 由要么禁用它(Python的-u选项)处理缓冲,使用的pty的,或者干脆填写了一些缓冲,不会影响数据,(也许“\ n”,但是任何适合)。

Python的“管道”模块(我的作者)中的示例适合您的方案完全相同,使低级别的步骤相当清晰。

http://pypi.python.org/pypi/pipeline/

最近,我使用的子模块作为生产者处理器的消费者 - 控制器模式的一部分:

http://www.darkarchive.org/w/Pub/PythonInteract

本实施例中用缓冲标准输入比不求助于使用PTY,并且还示出了管道端部应被关闭的位置。 我宁愿进程线程,但原理是一样的。 此外,它说明了队列同步到饲料生产者和消费者收集输出,以及如何将它们完全关闭(看出来插入到队列中的哨兵)。 该模式允许根据最近的输出将要生成新的输入,允许递归发现和处理。



Answer 4:

如果太多的数据写入到管道的接收端Nosklo的提供的解决方案将很快突破:


from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)
p1.stdin.write('Hello World\n' * 20000)
p1.stdin.close()
result = p2.stdout.read() 
assert result == "Hello Worl\n"

如果此脚本不你的机器上挂,只是增加“20000”的东西,超过你的操作系统管缓冲区的大小。

这是因为操作系统正在缓冲输入的“grep”,但一旦缓冲区已满, p1.stdin.write调用将阻塞,直至有从读取p2.stdout 。 在玩具的场景,你可以用的方式写入/从在同一过程中管道读取,但在正常使用的情况,有必要从一个线程/进程写入和从一个单独的线程/进程读取。 这是真实的subprocess.popen,os.pipe,os.popen *等。

另外一个奇怪的是,有时你想保持饲养与同一管道的早期输出生成的项目管道。 解决的办法是使双方的管式加料器和管道读者异步的人程序,并实现两个队列:主程序和管式加料器和主程序和管读取器之间的一个之间的一个。 PythonInteract是这样一个例子。

子是一个很好的方便的模式,但由于它隐藏os.popen的细节,并呼吁os.fork它的引擎盖下,它有时更难以处理比低级别的调用它利用。 出于这个原因,子进程并不了解进程间的管道到底是如何工作的好办法。



Answer 5:

你必须在多个线程做到这一点。 孩子P1不会读取你的输入,因为P2没有读P1的输出,因为你不读P2的输出:否则,你将在一个情况下,你不能发送数据结束。

所以你需要一个后台线程读取什么P2写出。 这将允许P2一些数据写入到管道后继续,因此它可以读取输入的从P1的下一行,再次让P1处理您发送给它的数据。

或者,您可以将数据发送到P1与后台线程和读取从P2的输出在主线程。 但是,任何一方必须是一个线程。



Answer 6:

在回答nosklo的说法(见其他意见对这个问题),它不能没有做到close_fds=True

close_fds=True只是必要的,如果你离开了其他文件描述符打开。 当打开多个子进程,这是一件好事,持续跟踪可能会继承打开的文件,并明确地关闭不需要的任何:

from subprocess import Popen, PIPE

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p1.stdin.write('Hello World\n')
p1.stdin.close()
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
result = p2.stdout.read() 
assert result == "Hello Worl\n"

close_fds默认为False ,因为子倾向于信任调用程序知道它与打开的文件描述符做的,只是提供一个容易的选择主叫方所有关闭他们,如果这就是它想做的事。

但真正的问题是,管道缓冲区会咬你的所有,但玩具的例子。 正如我在其他的答案对这个问题说,经验法则是不是在同一个进程/线程打开你的读者和您的作家。 谁想要使用的子模块,用于双向通信将很好地服务于研究os.pipe和os.fork,第一。 他们其实并不难,如果你有一个使用很好的例子,来看看。



Answer 7:

我想你可能会检查错误的问题。 当然,亚伦说,如果你尝试既是生产者管道的开始,以及管道末端的消费者,很容易陷入死锁状态。 这是沟通()解决了这个问题。

沟通()是不完全正确的,因为你stdin和stdout在不同的子对象; 但如果你在subprocess.py看看落实,你会看到,它不正是亚伦建议。

一旦你看到沟通读取和写入,你会看到在你的第二个尝试沟通()与P2为P1的输出竞争:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
# ...
p1.communicate('data\n')       # reads from p1.stdout, as does p2

我在Win32平台上,这肯定有不同的I / O和缓存办学特色,但是这对我的作品:

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
t = threading.Thread(target=get_output, args=(p2,)) 
t.start()
p1.stdin.write('hello world\n' * 100000)
p1.stdin.close()
t.join()

我使用幼稚无螺纹p2.read时调谐的输入尺寸,以产生死锁()

您也可以尝试缓冲到一个文件中,如

fd, _ = tempfile.mkstemp()
os.write(fd, 'hello world\r\n' * 100000)
os.lseek(fd, 0, os.SEEK_SET)
p1 = Popen(["grep", "-v", "not"], stdin=fd, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE)
print p2.stdout.read()

这也为我工作没有死锁。



Answer 8:

在上述意见中的一个,我挑战nosklo要么张贴一些代码来支持自己的主张约select.select或给予好评我回应他此前曾下投票。 他回答下面的代码:

from subprocess import Popen, PIPE
import select

p1 = Popen(["grep", "-v", "not"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cut", "-c", "1-10"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 100000 * 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        data = p2.stdout.read(1024)
        if not data:
            p2.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            p1.stdin.close()
            to_write = []

print b

这个脚本的一个问题是,它的第二猜测系统管道缓冲区的大小/性质。 该脚本会出现故障少,如果它可以去除像1024幻数。

最大的问题是,这个脚本代码只与数据输入和外部程序的正确组合都能正常工作。 grep和切割线都工作,所以他们的内部缓冲区的行为有点不同。 如果我们用一个更通用的命令,如“猫”,和写入数据的小钻头进入管道,致命的竞争条件会更频繁地弹出:

from subprocess import Popen, PIPE
import select
import time

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)
p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE, close_fds=True)

data_to_write = 'hello world\n'
to_read = [p2.stdout]
to_write = [p1.stdin]
b = [] # create buffer
written = 0


while to_read or to_write:
    time.sleep(1)
    read_now, write_now, xlist = select.select(to_read, to_write, [])
    if read_now:
        print 'I am reading now!'
        data = p2.stdout.read(1024)
        if not data:
            p1.stdout.close()
            to_read = []
        else:
            b.append(data)

    if write_now:
        print 'I am writing now!'
        if written < len(data_to_write):
            part = data_to_write[written:written+1024]
            written += len(part)
            p1.stdin.write(part); p1.stdin.flush()
        else:
            print 'closing file'
            p1.stdin.close()
            to_write = []

print b

在这种情况下,两种不同的结果将体现:

write, write, close file, read -> success
write, read -> hang

所以,我再次nosklo挑战要么邮编展示使用select.select处理任意输入和管道缓冲从单个线程,或给予好评我的反应。

底线:不要试图从一个单独的线程处理管道的两端。 这只是不值得。 见管道用于如何正确地做到这一点一个很好的低级别的例子。



Answer 9:

怎么样使用SpooledTemporaryFile? 这就免去了(但也许并没有解决)问题:

http://docs.python.org/library/tempfile.html#tempfile.SpooledTemporaryFile

您可以写信给它像一个文件,但它实际上是一个内存块。

还是我完全误解...



Answer 10:

下面是使用POPEN连同os.fork来完成同样的事情的一个例子。 而不是使用的close_fds它只是关闭在正确的地方的管道。 远远高于试图用简单的select.select ,并充分利用系统管道的缓冲区。

from subprocess import Popen, PIPE
import os
import sys

p1 = Popen(["cat"], stdin=PIPE, stdout=PIPE)

pid = os.fork()

if pid: #parent
    p1.stdin.close()
    p2 = Popen(["cat"], stdin=p1.stdout, stdout=PIPE)
    data = p2.stdout.read()
    sys.stdout.write(data)
    p2.stdout.close()

else: #child
    data_to_write = 'hello world\n' * 100000
    p1.stdin.write(data_to_write)
    p1.stdin.close()


Answer 11:

它比你想象的要简单得多!

import sys
from subprocess import Popen, PIPE

# Pipe the command here. It will read from stdin.
#   So cat a file, to stdin, like (cat myfile | ./this.py),
#     or type on terminal and hit control+d when done, etc
#   No need to handle this yourself, that's why we have shell's!
p = Popen("grep -v not | cut -c 1-10", shell=True, stdout=PIPE)

nextData = None
while True:
    nextData = p.stdout.read()
    if nextData in (b'', ''):
        break
    sys.stdout.write ( nextData.decode('utf-8') )


p.wait()

此代码为Python 3.6编写,并与Python 2.7的作品。

使用它,如:

cat README.md  | python ./example.py

要么

python example.py < README.md

要管“README.md”的这个节目的内容。

但..在这一点上,为什么不只是像你想使用“猫”直接和管道输出? 喜欢:

cat filename | grep -v not | cut -c 1-10

键入到控制台将做的工作也是如此。 我个人只使用代码的选择,如果我被进一步处理输出,否则shell脚本会更容易维护和保留。

你只要,使用壳里做管道适合你。 在一个出。 这就是她会是在干什么,管理流程和管理输入和输出的单宽链大。 有人会说它是一个shell的最好的非交互式功能..



文章来源: blocks - send input to python subprocess pipeline