I have three commands that would otherwise be easily chained together on the command-line like so:
$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput
In other words, the firstCommand
processes foo
from standard input and pipes the result to secondCommand
, which in turn processes that input and pipes its output to thirdCommand
, which does processing and redirects its output to the file finalOutput
.
I have been trying to recapitulate this in a Python script, using threading. I'd like to use Python in order to manipulate the output from firstCommand
before passing it to secondCommand
, and again between secondCommand
and thirdCommand
.
Here's an excerpt of code that does not seem to work:
first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)
first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))
first_thread.start()
second_thread.start()
third_thread.start()
first_thread.join()
second_thread.join()
third_thread.join()
first_process.communicate()
second_process.communicate()
third_process.communicate()
# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
chunk = from_stream.read(1024)
while chunk:
to_stream.write(chunk)
to_stream.flush()
chunk = from_stream.read(1024)
def consumeOutputFromFirstCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
break
processed_line = some_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
def consumeOutputFromSecondCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
break
processed_line = a_different_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
When I run this, the script hangs:
$ echo foo | ./myConversionScript.py
** hangs here... **
If I hit Ctrl-C
to terminate the script, the code is stuck on the line third_thread.join()
:
C-c C-c
Traceback (most recent call last):
File "./myConversionScript.py", line 786, in <module>
sys.exit(main(*sys.argv))
File "./myConversionScript.py", line 556, in main
third_thread.join()
File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join
self.__block.wait()
File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait
waiter.acquire()
KeyboardInterrupt
If I don't use a third_process
and third_thread
, instead only passing data from the output of the first thread to the input of the second thread, there is no hang.
Something about the third thread seems to cause things to break, but I don't know why.
I thought the point of communicate()
is that it will handle I/O for the three processes, so I'm not sure why there is an I/O hang.
How do I get three or more commands/processes working together, where one thread consumes the output of another thread/process?
UPDATE
Okay, I made some changes that seem to help, based on some comments here and on other sites. The processes are made to wait()
for completion, and within the thread methods, I close()
the pipes once the thread has processed all the data that it can. My concern is that memory usage will be very high for large datasets, but at least things are working:
first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout)
first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin))
second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin))
third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin))
first_thread.start()
second_thread.start()
third_thread.start()
first_thread.join()
second_thread.join()
third_thread.join()
first_process.wait()
second_process.wait()
third_process.wait()
# read 1K chunks from standard input
def consumeOutputFromStdin(from_stream, to_stream):
chunk = from_stream.read(1024)
while chunk:
to_stream.write(chunk)
to_stream.flush()
chunk = from_stream.read(1024)
def consumeOutputFromFirstCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
from_stream.close()
to_stream.close()
break
processed_line = some_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()
def consumeOutputFromSecondCommand(from_stream, to_stream):
while True:
unprocessed_line = from_stream.readline()
if not unprocessed_line:
from_stream.close()
to_stream.close()
break
processed_line = a_different_python_function_that_processes_line(unprocessed_line)
to_stream.write(processed_line)
to_stream.flush()