streaming data into command with subprocess.Popen

2019-06-01 10:30发布

问题:

I frequently need to sort a collection of files that contain headers. Because sorting depends on the contents of the header, this use case is more complicated that similar questions (e.g., Is there a way to ignore header lines in a UNIX sort?).

I was hoping to use Python to read files, output the header of the first file, then pipe the tails into sort. I've tried this as a proof of concept:

#!/usr/bin/env python

import io
import subprocess
import sys

header_printed = False

sorter = subprocess.Popen(['sort'], stdin=subprocess.PIPE)

for f in sys.argv[1:]:
    fd = io.open(f,'r')
    line = fd.readline()
    if not header_printed:
        print(line)
        header_printed = True
    sorter.communicate(line)

When called as header-sort fileA fileB, with fileA and fileB containing lines like

c   float   int
Y   0.557946     413
F   0.501935     852
F   0.768102     709

I get:

# sort file 1
Traceback (most recent call last):
  File "./archive/bin/pipetest", line 17, in <module>
    sorter.communicate(line)
  File "/usr/lib/python2.7/subprocess.py", line 785, in communicate
    self.stdin.write(input)
ValueError: I/O operation on closed file

The problem is communicate takes a string and the pipe is closed after writing. This means that the content must be read fully into memory. communicate doesn't take a generator (I tried).

An even simpler demonstration of this is:

>>> import subprocess
>>> p = subprocess.Popen(['tr', 'a-z', 'A-Z'], stdin=subprocess.PIPE)
>>> p.communicate('hello')
HELLO(None, None)
>>> p.communicate('world')
Traceback (most recent call last):
  File "<ipython-input-14-d6873fd0f66a>", line 1, in <module>
    p.communicate('world')
  File "/usr/lib/python2.7/subprocess.py", line 785, in communicate
    self.stdin.write(input)
ValueError: I/O operation on closed file

So, the question is, what's the right way (with Popen or otherwise) to stream data into a pipe in Python?

回答1:

Just write to the pipe directly:

#!/usr/bin/env python2
import fileinput
import subprocess

process = subprocess.Popen(['sort'], stdin=subprocess.PIPE)
with process.stdin as pipe, fileinput.FileInput() as file:
    for line in file:
        if file.isfirstline(): # print header
            print line,
        else: # pipe tails
            pipe.write(line)
process.wait()


回答2:

For your specific case, if you only passed subprocess.PIPE for a single standard handle (in your case, stdin), then in your example, you can safely call sorter.stdin.write(line) over and over. When you're finished writing output, call sorter.stdin.close() so sort knows the input is finished, and it can perform the actual sort and output work (sorter.communicate() with no argument would probably work too; otherwise, after closing stdin you'd probably want to call sorter.wait() to let it finish).

If you need to deal with more than one piped standard handle, the right way is either threading with a dedicated thread for each pipe that must be handled beyond the first (relatively simple in concept, but heavyweight and introduces all the headaches of threading), or using the select module (or in Python 3.4+, the selectors module), which is quite tricky to get right, but can (under some circumstances) be more efficient. Lastly, there is creating temporary files for output, so you can write directly to the process's stdin while the process writes to a file (and therefore won't block); you can then read the file at your leisure (note that the subprocess won't necessarily have flushed it's own output buffers until it exits, so the output may not arrive promptly in response to your input until further inputs and outputs have filled and flushed the buffer).

subprocess.Popen's .communicate() method uses either threads or select module primitives itself (depending on OS support; the implementation is under the various _communicate methods here) whenever you pass subprocess.PIPE for more than one of the standard handles; it's how you have to do it.



回答3:

You can use writing/reading from stdin and stdout, however depending on your subprocess, you need a "flushing mechanism" for the subprocess to process your input. The below code works for the first part, but since it closes stdin, it also kills the subprocess. If you change it with flush() or if you can add some trailing characters to push your subprocess, then you can use it. Else, I would recommend to take a look at Multithreading in Python, especially pipes.

p=subprocess.Popen(['tr','a-z','A-Z'],stdin=subprocess.PIPE,stdout=subprocess.PIPE)
p.stdin.write("hello\n")
p.stdin.close()
p.stdout.readline()
'HELLO\n'