Subprocess Popen invalid argument/broken pipe whil

2019-05-10 07:15发布

问题:

I have this code

All the needed libraries are imported

class VERTEX(Structure):
 _fields_ = [("index", c_int),
            ("x", c_float),
            ("y", c_float)]

Other stuff

This create and array from a list of vertex

def writelist_buf(size, nomeID): 
 Nvert_VERTEX_Array_Type = VERTEX * len(bpy.data.objects[nomeID].data.vertices)
 passarr = Nvert_VERTEX_Array_Type()
 for i in range(len(passarr)):
  vert = bpy.data.objects[nomeID].data.vertices[i]
  passarr[i] = VERTEX(vert.index, vert.co[0], vert.co[1])
 return passarr

bpy.data.objects[nomeID].data.vertices is a list of vertices.

Other stuff

This is inside a def, and communicate to a C program the previous array

input = writelist_buf(size, nomeID)
c_program_and_args = "here is the program with his arguments(it works)"
cproc = Popen(c_program_and_args, stdin=PIPE, stdout=PIPE)
out, err = cproc.communicate(input)
#the program returns 2 integers separed by a space
return [int(i) for i in out.decode.split()]

size and nomeID are declared before the writelist call.

After a bit of "debugging" i found that the type passed by the writelist_buf is "legal"(it's bytes, since is an array created with c_types), but i keep receiving a Errno32 Broken Pipe or Errno22 Invalid argument... The C program just make a read in the stdiin to retrive all the vertices(like the C code below)..

The strange think is that before "integrating" inside the code i was working on, i have tried a simpler code: this one, and it works!

from subprocess import Popen, PIPE
from ctypes import *

class VERTEX(Structure):
 _fields_ = [("index", c_int),
            ("x", c_float),
            ("y", c_float)]

nverts = 5
vlist = [VERTEX(0,1,1), VERTEX(1,2,2), VERTEX(2,3,3), VERTEX(3,4,4), VERTEX(4,5,5)]
array = VERTEX * nverts
input = array()
for i in range(nverts):
 input[i] = vlist[i]
print(type(input))
cproc = Popen("pipeinout.exe random arg", stdin=PIPE, stdout=PIPE)
out, err = cproc.communicate(input)
print(out.decode())

And the C code

#include<stdio.h>
#include<stdlib.h>
typedef struct {
    int index;
    float x;
    float y;
} vertex;

int main(int argc, char* argv[]) {
    int n=5;
    int i;
    printf("%s",argv[1]);
    vertex* VV;
    VV=(vertex*)malloc(sizeof(vertex)*n);
    fread(VV,sizeof(vertex),n,stdin);
    //fread(&VV,sizeof(VV),1,stdin);//metti nel valore di VV(non a quello che punta) l'indirizzo passato||sizeof(VV) is the size of a pointer
    for(i=0;i<n;i++)
        printf(" %i , %f , %f\n",VV[i].index,VV[i].x,VV[i].y);
}

回答1:

From your comments I understand that you pass millions of items hundreds of times to a C program. The approach below (pipe input using subprocess) might be too slow in your case. Possible alternatives could be to write a C extension (e.g., using Cython) or to use ctypes to call C functions directly. You could ask a separate question describing your use case in detail about what approach could be preferable.

If you've chosen an approach then make sure that it works correctly before any optimization (write some tests, measure performance and only after optimize it if needed) -- Make it work, make it right, make it fast.

On the other hand there is no point to invest too much time in approaches that are known to be thrown away later -- Fail fast.

if the output of the C program is bounded; the .communicate() method from your code works (source):

import struct, sys    
from subprocess import Popen, PIPE

vertex_struct = struct.Struct('i f f')

def pack(vertices, n):    
    yield struct.pack('i', n)
    for v in vertices:
        yield vertex_struct.pack(*v)

def main():
    try: n = int(sys.argv[1])
    except IndexError:
        n = 100
    vertices = ((i,i+1,i+2) for i in range(n))

    p = Popen(["./echo_vertices", "random", "arg"], stdin=PIPE, stdout=PIPE)
    out, _ = p.communicate(b''.join(pack(vertices, n)))

    index, x, y = vertex_struct.unpack(out)
    assert index == (n-1) and int(x) == n and int(y) == (n+1)

if __name__ == '__main__':
    main()

Here's the code from the comments to the question. It works without errors for large n values on my machine:

import struct, sys
from subprocess import Popen, PIPE
from threading import Thread

def pack(vertices, n):
    yield struct.pack('i', n)
    s = struct.Struct('i f f')
    for v in vertices:
        yield s.pack(*v)

def write(output_file, chunks):
    for chunk in chunks:
        output_file.write(chunk)
    output_file.close()

def main():
    try: n = int(sys.argv[1])
    except IndexError:
        n = 100
    vertices = ((i,i+1,i+2) for i in range(n))

    p = Popen(["./echo_vertices", "random", "arg"], stdin=PIPE, stdout=PIPE)

    Thread(target=write, args=[p.stdin, pack(vertices, n)]).start()

    for line in iter(p.stdout.readline, b''):
        pass
    p.stdout.close()
    sys.stdout.buffer.write(line)
    p.wait()

if __name__ == '__main__':
    main()

Q&A

Q: I don't really understand the pack functions (i know that yield returns an iterable object that is iterable only one time, but in your code you use 2 yield, so i don't get what it returns.

pack() is a generator. Generators do not work how you've described them, e.g.:

>>> def f():
...     yield 1
...     yield 2
... 
>>> for i in f():
...     print(i)
...     
1
2

Note each yield produces a value.

>>> def g(n):
...     for i in range(n):
...         yield i
... 
>>> list(g(10))
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

Here's the yield is present in the text only one time but it is executed 10 times and each time it produces a value (an integer in this case). See Generators in the Python tutorial. "Generator Tricks for Systems Programmers" contains multiple examples on how to use generators from a simple to an advanced usage.


Q: In addition i dont know what (*v) means at line 10)

s.pack(*v) calls the pack method using argument unpacking:

>>> def h(a, b):
...     print(a, b)
... 
>>> h(*[1, 'a'])
1 a
>>> h(*range(2))
0 1
>>> h(0, 1)
0 1

Q: I don't get how the Thread in line 25 works,

Thread(target=write, args=[p.stdin, pack(vertices, n)]).start()

This line starts a new thread that calls write() function with the arguments from the args keyword argument i.e. output_file=p.stdin and chunks=pack(vertices, n). The write() function in this case is equivalent to:

p.stdin.write(struct.pack('i', n))
p.stdin.write(s.pack(0, 1, 2))
p.stdin.write(s.pack(1, 2, 3))
...
p.stdin.write(s.pack(n-1, n, n+1))
p.stdin.close()

After that the thread exits.


Q: ...and all the read output of the program.. It isn't stored in a variable, is it?

The whole output is not stored anywhere. The code:

for line in iter(p.stdout.readline, b''):
    pass

reads from p.stdout line-by-line until the .readline() returns empty string b'' and stores the current line in the line variable (see iter() docs). So:

sys.stdout.buffer.write(line)

just prints the last line of the output.


Q: 1)after starting the Thread, the python script waits until it finished, right?

No, the main thread exits. The started thread is not daemon. It runs until it completes i.e., the script (the program) doesn't exit until it completes.


Q: 2)i understood how you read from the stdout of the C program,but i don't get when you start it.Afa i understood,with the write function we write in a buffer(or something like a file in the ram) the data we want,and when we run the c program, it can read from it the data we wrote.But when we start the C program in your code? :)

The C program is started by p = Popen(...).

p.stdin.write() writes to stdin of the C program (there are number of buffers in between but we can forget about it for a moment). The process is the same as in:

$ echo abc | some_program

Q: 3)last thing: why do you use a wait on p? There's a warning http://docs.python.org/library/subprocess.html?#subprocess.Popen.wait

For the provided C code it is not necessary to write to p.stdin in a separate thread. I use the thread exactly to avoid the situation described in the warning i.e., C program produces enough output before the script finishes writing to its stdin (your C code doesn't write anything before it finishes reading so the thread is not necessary).

In other words p.wait() is safe in this case.

Without p.wait() stderr output from the C program might be lost. Though I can reproduce the stderr loss only on jython with the scripts. Yet again for the provided C code it doesn't matter due to it is not writing to stderr anything.