allowing multiple inputs to python subprocess

2019-07-08 09:20发布

问题:

I have a near-identical problem to one asked several years ago : Python subprocess with two inputs which received one answer but no implemention. I'm hoping that this repost may help clear things up for me and others.

As in the above, I would like to use subprocess to wrap a command-line tool that takes multiple inputs. In particular, I want to avoid writing the input files to disk, but would rather use e.g. named pipes, as alluded to in the above. That should read "learn how to" as I admittedly I have never tried using named pipes before. I'll further state that the inputs I have are currently two pandas dataframes, and I'd like to get one back as output.

The generic command-line implementation:

/usr/local/bin/my_command inputfileA.csv inputfileB.csv -o outputfile

My current implementation, predictably, doesn't work. I don't see how/when the dataframes get sent to the command process through the named pipes, and I'd appreciate some help!

import os
import StringIO
import subprocess
import pandas as pd
dfA = pd.DataFrame([[1,2,3],[3,4,5]], columns=["A","B","C"])
dfB = pd.DataFrame([[5,6,7],[6,7,8]], columns=["A","B","C"]) 

# make two FIFOs to host the dataframes
fnA = 'inputA'; os.mkfifo(fnA); ffA = open(fnA,"w")
fnB = 'inputB'; os.mkfifo(fnB); ffB = open(fnB,"w")

# don't know if I need to make two subprocesses to pipe inputs 
ppA  = subprocess.Popen("echo", 
                    stdin =subprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE)
ppB  = subprocess.Popen("echo", 
                    stdin = suprocess.PIPE,
                    stdout=subprocess.PIPE,
                    stderr=subprocess.PIPE)

ppA.communicate(input = dfA.to_csv(header=False,index=False,sep="\t"))
ppB.communicate(input = dfB.to_csv(header=False,index=False,sep="\t"))


pope = subprocess.Popen(["/usr/local/bin/my_command",
                        fnA,fnB,"stdout"],
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE)
(out,err) = pope.communicate()

try:
    out = pd.read_csv(StringIO.StringIO(out), header=None,sep="\t")
except ValueError: # fail
    out = ""
    print("\n###command failed###\n")

os.unlink(fnA); os.remove(fnA)
os.unlink(fnB); os.remove(fnB)

回答1:

You don't need additional processes to pass data to a child process without writing it to disk:

#!/usr/bin/env python
import os
import shutil
import subprocess
import tempfile
import threading
from contextlib import contextmanager    
import pandas as pd

@contextmanager
def named_pipes(count):
    dirname = tempfile.mkdtemp()
    try:
        paths = []
        for i in range(count):
            paths.append(os.path.join(dirname, 'named_pipe' + str(i)))
            os.mkfifo(paths[-1])
        yield paths
    finally:
        shutil.rmtree(dirname)

def write_command_input(df, path):
    df.to_csv(path, header=False,index=False, sep="\t")

dfA = pd.DataFrame([[1,2,3],[3,4,5]], columns=["A","B","C"])
dfB = pd.DataFrame([[5,6,7],[6,7,8]], columns=["A","B","C"])

with named_pipes(2) as paths:
    p = subprocess.Popen(["cat"] + paths, stdout=subprocess.PIPE)
    with p.stdout:
        for df, path in zip([dfA, dfB], paths):
            t = threading.Thread(target=write_command_input, args=[df, path]) 
            t.daemon = True
            t.start()
        result = pd.read_csv(p.stdout, header=None, sep="\t")
p.wait()

cat is used for demonstration. You should use your command instead ("/usr/local/bin/my_command"). I assume that you can't pass the data using standard input and you have to pass input via files. The result is read from subprocess' standard output.



回答2:

So there's a couple of things going on that might be screwing you up. The important thing from the previous post is to think of these FIFOs as you would normal files. Except that the normal thing that happens is that they block if you try to read from the pipe in one process without hooking up another process to write to it at the other end (and vice versa). This is how I might approach the situation, and I'll try my best to describe my thoughts.


First off, when you're in the main process, and you try to call ffA = open(fnA, 'w') you run into the issue I talked about above -- there's no one on the other end of the pipe reading data from it yet, so after issuing the command, the main process is just going to block. To account for this, you might want to change the code to remove the open() calls:

# make two FIFOs to host the dataframes
fnA = './inputA';
os.mkfifo(fnA);
fnB = './inputB';
os.mkfifo(fnB);

Okay, so we have the pipes 'inputA' and 'inputB' made and ready to be opened for reading/writing. To prevent the blocking from happening like above, we need to start a couple of subprocesses to call open(). Since I'm not particularly familiar with the subprocess library, I'll take to just forking a couple child processes.

for x in xrange(2):

    pid = os.fork()
    if pid == 0:
            if x == 0:
                    dfA.to_csv(open(fnA, 'w'), header=False, index=False, sep='\t')
            else:
                    dfB.to_csv(open(fnB, 'w'), header=False, index=False, sep='\t')
            exit()
    else:
            continue

Okay so now we'll have these two child processes blocking while waiting to write to their respective FIFOs. Now we can run our command to connect to the other end of the pipe and start reading.

pope = subprocess.Popen(["./my_cmd.sh",
                        fnA,fnB],
                        stdout=subprocess.PIPE,
                        stderr=subprocess.PIPE)
(out,err) = pope.communicate()

try:
    out = pd.read_csv(StringIO.StringIO(out), header=None,sep="\t")
except ValueError: # fail
    out = ""
    print("\n###command failed###\n")

The last note I found is that unlinking the pipe seems to delete it, so no need to call remove().

os.unlink(fnA); 
os.unlink(fnB);
print "out: ", out

On my machine the print statement yields:

out:     0  1  2
0  1  2  3
1  3  4  5
2  5  6  7
3  6  7  8

my command, by the way, is just a couple cat statements:

#!/bin/bash

cat $1
cat $2