pipe large amount of data to stdin while using sub

2020-02-08 05:50发布

问题:

I'm kind of struggling to understand what is the python way of solving this simple problem.

My problem is quite simple. If you use the follwing code it will hang. This is well documented in the subprocess module doc.

import subprocess

proc = subprocess.Popen(['cat','-'],
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        )
for i in range(100000):
    proc.stdin.write('%d\n' % i)
output = proc.communicate()[0]
print output

Searching for a solution (there is a very insightful thread, but I've lost it now) I found this solution (among others) that uses an explicit fork:

import os
import sys
from subprocess import Popen, PIPE

def produce(to_sed):
    for i in range(100000):
        to_sed.write("%d\n" % i)
        to_sed.flush()
    #this would happen implicitly, anyway, but is here for the example
    to_sed.close()

def consume(from_sed):
    while 1:
        res = from_sed.readline()
        if not res:
            sys.exit(0)
            #sys.exit(proc.poll())
        print 'received: ', [res]

def main():
    proc = Popen(['cat','-'],stdin=PIPE,stdout=PIPE)
    to_sed = proc.stdin
    from_sed = proc.stdout

    pid = os.fork()
    if pid == 0 :
        from_sed.close()
        produce(to_sed)
        return
    else :
        to_sed.close()
        consume(from_sed)

if __name__ == '__main__':
    main()

While this solution is conceptually very easy to understand, it uses one more process and stuck as too low level compared to the subprocess module (that is there just to hide this kind of things...).

I'm wondering: is there a simple and clean solution using the subprocess module that won't hung or to implement this patter I have to do a step back and implement an old-style select loop or an explicit fork?

Thanks

回答1:

If you want a pure Python solution, you need to put either the reader or the writer in a separate thread. The threading package is a lightweight way to do this, with convenient access to common objects and no messy forking.

import subprocess
import threading
import sys

proc = subprocess.Popen(['cat','-'],
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        )
def writer():
    for i in range(100000):
        proc.stdin.write('%d\n' % i)
    proc.stdin.close()
thread = threading.Thread(target=writer)
thread.start()
for line in proc.stdout:
    sys.stdout.write(line)
thread.join()
proc.wait()

It might be neat to see the subprocess module modernized to support streams and coroutines, which would allow pipelines that mix Python pieces and shell pieces to be constructed more elegantly.



回答2:

If you don't want to keep all the data in memory, you have to use select. E.g. something like:

import subprocess
from select import select
import os

proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)

i = 0;
while True:
    rlist, wlist, xlist = [proc.stdout], [], []
    if i < 100000:
        wlist.append(proc.stdin)
    rlist, wlist, xlist = select(rlist, wlist, xlist)
    if proc.stdout in rlist:
        out = os.read(proc.stdout.fileno(), 10)
        print out,
        if not out:
            break
    if proc.stdin in wlist:
        proc.stdin.write('%d\n' % i)
        i += 1
        if i >= 100000:
            proc.stdin.close()


回答3:

Here's something I used to load 6G mysql dump file loads via subprocess. Stay away from shell=True. Not secure and start out of process wasting resources.

import subprocess

fhandle = None

cmd = [mysql_path,
      "-u", mysql_user, "-p" + mysql_pass],
      "-h", host, database]

fhandle = open(dump_file, 'r')
p = subprocess.Popen(cmd, stdin=fhandle, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

(stdout,stderr) = p.communicate()

fhandle.close()


回答4:

For this kind of thing, the shell works a lot better than subprocess.

Write very simple Python apps which read from sys.stdin and write to sys.stdout.

Connect the simple apps together using a shell pipeline.

If you want, start the pipeline using subprocess or simply write a one-line shell script.

python part1.py | python part2.py

This is very, very efficient. It's also portable to all Linux (and Windows) as long as you keep it very simple.



回答5:

Using the aiofiles & asyncio in python 3.5:

A bit complicated, but you need only 1024 Bytes memory to writing in stdin!

import asyncio
import aiofiles
import sys
from os.path import dirname, join, abspath
import subprocess as sb


THIS_DIR = abspath(dirname(__file__))
SAMPLE_FILE = join(THIS_DIR, '../src/hazelnut/tests/stuff/sample.mp4')
DEST_PATH = '/home/vahid/Desktop/sample.mp4'


async def async_file_reader(f, buffer):
    async for l in f:
        if l:
            buffer.append(l)
        else:
            break
    print('reader done')


async def async_file_writer(source_file, target_file):
    length = 0
    while True:
        input_chunk = await source_file.read(1024)
        if input_chunk:
            length += len(input_chunk)
            target_file.write(input_chunk)
            await target_file.drain()
        else:
            target_file.write_eof()
            break

    print('writer done: %s' % length)


async def main():
    dir_name = dirname(DEST_PATH)
    remote_cmd = 'ssh localhost mkdir -p %s && cat - > %s' % (dir_name, DEST_PATH)

    stdout, stderr = [], []
    async with aiofiles.open(SAMPLE_FILE, mode='rb') as f:
        cmd = await asyncio.create_subprocess_shell(
            remote_cmd,
            stdin=sb.PIPE,
            stdout=sb.PIPE,
            stderr=sb.PIPE,
        )

        await asyncio.gather(*(
            async_file_reader(cmd.stdout, stdout),
            async_file_reader(cmd.stderr, stderr),
            async_file_writer(f, cmd.stdin)
        ))

        print('EXIT STATUS: %s' % await cmd.wait())

    stdout, stderr = '\n'.join(stdout), '\n'.join(stderr)

    if stdout:
        print(stdout)

    if stderr:
        print(stderr, file=sys.stderr)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Result:

writer done: 383631
reader done
reader done
EXIT STATUS: 0


回答6:

Your code deadlocks as soon as cat's stdout OS pipe buffer is full. If you use stdout=PIPE; you have to consume it in time otherwise the deadlock as in your case may happen.

If you don't need the output while the process is running; you could redirect it to a temporary file:

#!/usr/bin/env python3
import subprocess
import tempfile

with tempfile.TemporaryFile('r+') as output_file:
    with subprocess.Popen(['cat'],
                          stdin=subprocess.PIPE,
                          stdout=output_file,
                          universal_newlines=True) as process:
        for i in range(100000):
            print(i, file=process.stdin)
    output_file.seek(0)  # rewind (and sync with the disk)
    print(output_file.readline(), end='')  # get  the first line of the output

If the input/output are small (fit in memory); you could pass the input all at once and get the output all at once using .communicate() that reads/writes concurrently for you:

#!/usr/bin/env python3
import subprocess

cp = subprocess.run(['cat'], input='\n'.join(['%d' % i for i in range(100000)]),
                    stdout=subprocess.PIPE, universal_newlines=True)
print(cp.stdout.splitlines()[-1]) # print the last line

To read/write concurrently manually, you could use threads, asyncio, fcntl, etc. @Jed provided a simple thread-based solution. Here's asyncio-based solution:

#!/usr/bin/env python3
import asyncio
import sys
from subprocess import PIPE

async def pump_input(writer):
     try:
         for i in range(100000):
             writer.write(b'%d\n' % i)
             await writer.drain()
     finally:
         writer.close()

async def run():
    # start child process
    # NOTE: universal_newlines parameter is not supported
    process = await asyncio.create_subprocess_exec('cat', stdin=PIPE, stdout=PIPE)
    asyncio.ensure_future(pump_input(process.stdin)) # write input
    async for line in process.stdout: # consume output
        print(int(line)**2) # print squares
    return await process.wait()  # wait for the child process to exit


if sys.platform.startswith('win'):
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()

On Unix, you could use fcntl-based solution:

#!/usr/bin/env python3
import sys
from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK
from shutil import copyfileobj
from subprocess import Popen, PIPE, _PIPE_BUF as PIPE_BUF

def make_blocking(pipe, blocking=True):
    fd = pipe.fileno()
    if not blocking:
        fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) # set O_NONBLOCK
    else:
        fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) # clear it


with Popen(['cat'], stdin=PIPE, stdout=PIPE) as process:
    make_blocking(process.stdout, blocking=False)
    with process.stdin:
        for i in range(100000):
            #NOTE: the mode is block-buffered (default) and therefore
            # `cat` won't see it immidiately
            process.stdin.write(b'%d\n' % i)
            # a deadblock may happen here with a *blocking* pipe
            output = process.stdout.read(PIPE_BUF)
            if output is not None:
                sys.stdout.buffer.write(output)
    # read the rest
    make_blocking(process.stdout)
    copyfileobj(process.stdout, sys.stdout.buffer)


回答7:

Here is an example (Python 3) of reading one record at a time from gzip using a pipe:

cmd = 'gzip -dc compressed_file.gz'
pipe = Popen(cmd, stdout=PIPE).stdout

for line in pipe:
    print(":", line.decode(), end="")

I know there is a standard module for that, it is just meant as an example. You can read the whole output in one go (like shell back-ticks) using the communicate method, but obviously you hav eto be careful of memory size.

Here is an example (Python 3 again) of writing records to the lp(1) program on Linux:

cmd = 'lp -'
proc = Popen(cmd, stdin=PIPE)
proc.communicate(some_data.encode())


回答8:

Now I know this is not going to satisfy the purist in you completely, as the input will have to fit in memory, and you have no option to work interactively with input-output, but at least this works fine on your example. The communicate method optionally takes the input as an argument, and if you feed your process its input this way, it will work.

import subprocess

proc = subprocess.Popen(['cat','-'],
                        stdin=subprocess.PIPE,
                        stdout=subprocess.PIPE,
                        )

input = "".join('{0:d}\n'.format(i) for i in range(100000))
output = proc.communicate(input)[0]
print output

As for the larger problem, you can subclass Popen, rewrite __init__ to accept stream-like objects as arguments to stdin, stdout, stderr, and rewrite the _communicate method (hairy for crossplatform, you need to do it twice, see the subprocess.py source) to call read() on the stdin stream and write() the output to the stdout and stderr streams. What bothers me about this approach is that as far as I know, it hasn't already been done. When obvious things have not been done before, there's usually a reason (it doesn't work as intended), but I can't see why it shoudn't, apart from the fact you need the streams to be thread-safe in Windows.



回答9:

The simplest solution I can think of:

from subprocess import Popen, PIPE
from threading import Thread

s = map(str,xrange(10000)) # a large string
p = Popen(['cat'], stdin=PIPE, stdout=PIPE, bufsize=1)
Thread(target=lambda: any((p.stdin.write(b) for b in s)) or p.stdin.close()).start()
print (p.stdout.read())

Buffered version:

from subprocess import Popen, PIPE
from threading import Thread

s = map(str,xrange(10000)) # a large string
n = 1024 # buffer size
p = Popen(['cat'], stdin=PIPE, stdout=PIPE, bufsize=n)
Thread(target=lambda: any((p.stdin.write(c) for c in (s[i:i+n] for i in xrange(0, len(s), n)))) or p.stdin.close()).start()
print (p.stdout.read())


回答10:

I was looking for an example code to iterate over process output incrementally as this process consumes its input from provided iterator (incrementally as well). Basically:

import string
import random

# That's what I consider a very useful function, though didn't
# find any existing implementations.
def process_line_reader(args, stdin_lines):
    # args - command to run, same as subprocess.Popen
    # stdin_lines - iterable with lines to send to process stdin
    # returns - iterable with lines received from process stdout
    pass

# Returns iterable over n random strings. n is assumed to be infinity if negative.
# Just an example of function that returns potentially unlimited number of lines.
def random_lines(n, M=8):
    while 0 != n:
        yield "".join(random.choice(string.letters) for _ in range(M))
        if 0 < n:
            n -= 1

# That's what I consider to be a very convenient use case for
# function proposed above.
def print_many_uniq_numbered_random_lines():
    i = 0
    for line in process_line_reader(["uniq", "-i"], random_lines(100500 * 9000)):
        # Key idea here is that `process_line_reader` will feed random lines into
        # `uniq` process stdin as lines are consumed from returned iterable.
        print "#%i: %s" % (i, line)
        i += 1

Some of solutions suggested here allow to do it with threads (but it's not always convenient) or with asyncio (which is not available in Python 2.x). Below is example of working implementation that allows to do it.

import subprocess
import os
import fcntl
import select

class nonblocking_io(object):
    def __init__(self, f):
        self._fd = -1
        if type(f) is int:
            self._fd = os.dup(f)
            os.close(f)
        elif type(f) is file:
            self._fd = os.dup(f.fileno())
            f.close()
        else:
            raise TypeError("Only accept file objects or interger file descriptors")
        flag = fcntl.fcntl(self._fd, fcntl.F_GETFL)
        fcntl.fcntl(self._fd, fcntl.F_SETFL, flag | os.O_NONBLOCK)
    def __enter__(self):
        return self
    def __exit__(self, type, value, traceback):
        self.close()
        return False
    def fileno(self):
        return self._fd
    def close(self):
        if 0 <= self._fd:
            os.close(self._fd)
            self._fd = -1

class nonblocking_line_writer(nonblocking_io):
    def __init__(self, f, lines, autoclose=True, buffer_size=16*1024, encoding="utf-8", linesep=os.linesep):
        super(nonblocking_line_writer, self).__init__(f)
        self._lines = iter(lines)
        self._lines_ended = False
        self._autoclose = autoclose
        self._buffer_size = buffer_size
        self._buffer_offset = 0
        self._buffer = bytearray()
        self._encoding = encoding
        self._linesep = bytearray(linesep, encoding)
    # Returns False when `lines` iterable is exhausted and all pending data is written
    def continue_writing(self):
        while True:
            if self._buffer_offset < len(self._buffer):
                n = os.write(self._fd, self._buffer[self._buffer_offset:])
                self._buffer_offset += n
                if self._buffer_offset < len(self._buffer):
                    return True
            if self._lines_ended:
                if self._autoclose:
                    self.close()
                return False
            self._buffer[:] = []
            self._buffer_offset = 0
            while len(self._buffer) < self._buffer_size:
                line = next(self._lines, None)
                if line is None:
                    self._lines_ended = True
                    break
                self._buffer.extend(bytearray(line, self._encoding))
                self._buffer.extend(self._linesep)

class nonblocking_line_reader(nonblocking_io):
    def __init__(self, f, autoclose=True, buffer_size=16*1024, encoding="utf-8"):
        super(nonblocking_line_reader, self).__init__(f)
        self._autoclose = autoclose
        self._buffer_size = buffer_size
        self._encoding = encoding
        self._file_ended = False
        self._line_part = ""
    # Returns (lines, more) tuple, where lines is iterable with lines read and more will
    # be set to False after EOF.
    def continue_reading(self):
        lines = []
        while not self._file_ended:
            data = os.read(self._fd, self._buffer_size)
            if 0 == len(data):
                self._file_ended = True
                if self._autoclose:
                    self.close()
                if 0 < len(self._line_part):
                    lines.append(self._line_part.decode(self._encoding))
                    self._line_part = ""
                break
            for line in data.splitlines(True):
                self._line_part += line
                if self._line_part.endswith(("\n", "\r")):
                    lines.append(self._line_part.decode(self._encoding).rstrip("\n\r"))
                    self._line_part = ""
            if len(data) < self._buffer_size:
                break
        return (lines, not self._file_ended)

class process_line_reader(object):
    def __init__(self, args, stdin_lines):
        self._p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
        self._reader = nonblocking_line_reader(self._p.stdout)
        self._writer = nonblocking_line_writer(self._p.stdin, stdin_lines)
        self._iterator = self._communicate()
    def __iter__(self):
        return self._iterator
    def __enter__(self):
        return self._iterator
    def __exit__(self, type, value, traceback):
        self.close()
        return False
    def _communicate(self):
        read_set = [self._reader]
        write_set = [self._writer]
        while read_set or write_set:
            try:
                rlist, wlist, xlist = select.select(read_set, write_set, [])
            except select.error, e:
                if e.args[0] == errno.EINTR:
                    continue
                raise
            if self._reader in rlist:
                stdout_lines, more = self._reader.continue_reading()
                for line in stdout_lines:
                    yield line
                if not more:
                    read_set.remove(self._reader)
            if self._writer in wlist:
                if not self._writer.continue_writing():
                    write_set.remove(self._writer)
        self.close()
    def lines(self):
        return self._iterator
    def close(self):
        if self._iterator is not None:
            self._reader.close()
            self._writer.close()
            self._p.wait()
            self._iterator = None