I want to execute a process, limit the execution-time by some timeout in seconds and grab the output produced by the process. And I want to do this on windows, linux and freebsd.
I have tried implementing this in three different ways:
cmd - Without timeout and subprocess.PIPE for output capture.
BEHAVIOUR: Operates as expected but does not support timeout, i need timeout...
cmd_to - With timeout and subprocess.PIPE for output capture.
BEHAVIOUR: Blocks subprocess execution when output >= 2^16 bytes.
cmd_totf - With timeout and tempfile.NamedTemporaryfile for output capture.
BEHAVIOUR: Operates as expected but uses temporary files on disk.
These are available below for closer inspection.
As can be seen in the output below, then the timeout-code blocks the execution of the sub-process when using subprocessing.PIPE and output from the subprocess is >= 2^16 bytes.
The subprocess documentation states that this is expected when calling process.wait() and using subprocessing.PIPE, however no warnings are given when using process.poll(), so what is going wrong here?
I have a solution in cmd_totf which use the tempfile module but the tradeoff is that it writes the output to disk, something I would REALLY like to avoid.
So my questions are:
- What am I doing wrong in cmd_to?
- Is there a way to do what I want and without using tempfiles / keeping the output in memory.
Script to generate a bunch of output ('exp_gen.py'):
#!/usr/bin/env python
import sys
output = "b"*int(sys.argv[1])
print output
Three different implementations (cmd, cmd_to, cmd_totf) of wrappers around subprocessing.Popen:
#!/usr/bin/env python
import subprocess, time, tempfile
bufsize = -1
def cmd(cmdline, timeout=60):
"""
Execute cmdline.
Uses subprocessing and subprocess.PIPE.
"""
p = subprocess.Popen(
cmdline,
bufsize = bufsize,
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE
)
out, err = p.communicate()
returncode = p.returncode
return (returncode, err, out)
def cmd_to(cmdline, timeout=60):
"""
Execute cmdline, limit execution time to 'timeout' seconds.
Uses subprocessing and subprocess.PIPE.
"""
p = subprocess.Popen(
cmdline,
bufsize = bufsize,
shell = False,
stdin = subprocess.PIPE,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE
)
t_begin = time.time() # Monitor execution time
seconds_passed = 0
while p.poll() is None and seconds_passed < timeout:
seconds_passed = time.time() - t_begin
time.sleep(0.1)
#if seconds_passed > timeout:
#
# try:
# p.stdout.close() # If they are not closed the fds will hang around until
# p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception
# p.terminate() # Important to close the fds prior to terminating the process!
# # NOTE: Are there any other "non-freed" resources?
# except:
# pass
#
# raise TimeoutInterrupt
out, err = p.communicate()
returncode = p.returncode
return (returncode, err, out)
def cmd_totf(cmdline, timeout=60):
"""
Execute cmdline, limit execution time to 'timeout' seconds.
Uses subprocessing and tempfile instead of subprocessing.PIPE.
"""
output = tempfile.NamedTemporaryFile(delete=False)
error = tempfile.NamedTemporaryFile(delete=False)
p = subprocess.Popen(
cmdline,
bufsize = 0,
shell = False,
stdin = None,
stdout = output,
stderr = error
)
t_begin = time.time() # Monitor execution time
seconds_passed = 0
while p.poll() is None and seconds_passed < timeout:
seconds_passed = time.time() - t_begin
time.sleep(0.1)
#if seconds_passed > timeout:
#
# try:
# p.stdout.close() # If they are not closed the fds will hang around until
# p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception
# p.terminate() # Important to close the fds prior to terminating the process!
# # NOTE: Are there any other "non-freed" resources?
# except:
# pass
#
# raise TimeoutInterrupt
p.wait()
returncode = p.returncode
fd = open(output.name)
out = fd.read()
fd.close()
fd = open(error.name)
err = fd.read()
fd.close()
error.close()
output.close()
return (returncode, err, out)
if __name__ == "__main__":
implementations = [cmd, cmd_to, cmd_totf]
bytes = ['65535', '65536', str(1024*1024)]
timeouts = [5]
for timeout in timeouts:
for size in bytes:
for i in implementations:
t_begin = time.time()
seconds_passed = 0
rc, err, output = i(['exp_gen.py', size], timeout)
seconds_passed = time.time() - t_begin
filler = ' '*(8-len(i.func_name))
print "[%s%s: timeout=%d, iosize=%s, seconds=%f]" % (repr(i.func_name), filler, timeout, size, seconds_passed)
Output from execution:
['cmd' : timeout=5, iosize=65535, seconds=0.016447]
['cmd_to' : timeout=5, iosize=65535, seconds=0.103022]
['cmd_totf': timeout=5, iosize=65535, seconds=0.107176]
['cmd' : timeout=5, iosize=65536, seconds=0.028105]
['cmd_to' : timeout=5, iosize=65536, seconds=5.116658]
['cmd_totf': timeout=5, iosize=65536, seconds=0.104905]
['cmd' : timeout=5, iosize=1048576, seconds=0.025964]
['cmd_to' : timeout=5, iosize=1048576, seconds=5.128062]
['cmd_totf': timeout=5, iosize=1048576, seconds=0.103183]
As opposed to all the warnings in the subprocess documentation then directly reading from process.stdout and process.stderr has provided a better solution.
By better I mean that I can read output from a process that exceeds 2^16 bytes without having to temporarily store the output on disk.
The code follows:
import fcntl
import os
import subprocess
import time
def nonBlockRead(output):
fd = output.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
try:
return output.read()
except:
return ''
def cmd(cmdline, timeout=60):
"""
Execute cmdline, limit execution time to 'timeout' seconds.
Uses the subprocess module and subprocess.PIPE.
Raises TimeoutInterrupt
"""
p = subprocess.Popen(
cmdline,
bufsize = bufsize, # default value of 0 (unbuffered) is best
shell = False, # not really needed; it's disabled by default
stdout = subprocess.PIPE,
stderr = subprocess.PIPE
)
t_begin = time.time() # Monitor execution time
seconds_passed = 0
stdout = ''
stderr = ''
while p.poll() is None and seconds_passed < timeout: # Monitor process
time.sleep(0.1) # Wait a little
seconds_passed = time.time() - t_begin
# p.std* blocks on read(), which messes up the timeout timer.
# To fix this, we use a nonblocking read()
# Note: Not sure if this is Windows compatible
stdout += nonBlockRead(p.stdout)
stderr += nonBlockRead(p.stderr)
if seconds_passed >= timeout:
try:
p.stdout.close() # If they are not closed the fds will hang around until
p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception
p.terminate() # Important to close the fds prior to terminating the process!
# NOTE: Are there any other "non-freed" resources?
except:
pass
raise TimeoutInterrupt
returncode = p.returncode
return (returncode, stdout, stderr)
Disclaimer: This answer is not tested on windows, nor freebsd. But the used modules should work on these systems. I believe this should be a working answer to your question - it works for me.
Here's code I just hacked to solve the problem on linux. It is a combination of several Stackoverflow threads and my own research in the Python 3 documents.
Main characteristics of this code:
- Uses processes not threads for blocking I/O because they can more reliably be p.terminated()
- Implements a retriggerable timeout watchdog that restarts counting whenever some output happens
- Implements a long-term timeout watchdog to limit overall runtime
- Can feed in stdin (although I only need to feed in one-time short strings)
- Can capture stdout/stderr in the usual Popen means (Only stdout is coded, and stderr redirected to stdout; but can easily be separated)
- It's almost realtime because it only checks every 0.2 seconds for output. But you could decrease this or remove the waiting interval easily
- Lots of debugging printouts still enabled to see whats happening when.
The only code dependency is enum as implemented here, but the code could easily be changed to work without. It's only used to distinguish the two timeouts - use separate exceptions if you like.
Here's the code - as usual - feedback is highly appreciated:
(Edit 29-Jun-2012 - the code is now actually working)
# Python module runcmd
# Implements a class to launch shell commands which
# are killed after a timeout. Timeouts can be reset
# after each line of output
#
# Use inside other script with:
#
# import runcmd
# (return_code, out) = runcmd.RunCmd(['ls', '-l', '/etc'],
# timeout_runtime,
# timeout_no_output,
# stdin_string).go()
#
import multiprocessing
import queue
import subprocess
import time
import enum
def timestamp():
return time.strftime('%Y%m%d-%H%M%S')
class ErrorRunCmd(Exception): pass
class ErrorRunCmdTimeOut(ErrorRunCmd): pass
class Enqueue_output(multiprocessing.Process):
def __init__(self, out, queue):
multiprocessing.Process.__init__(self)
self.out = out
self.queue = queue
self.daemon = True
def run(self):
try:
for line in iter(self.out.readline, b''):
#print('worker read:', line)
self.queue.put(line)
except ValueError: pass # Readline of closed file
self.out.close()
class Enqueue_input(multiprocessing.Process):
def __init__(self, inp, iterable):
multiprocessing.Process.__init__(self)
self.inp = inp
self.iterable = iterable
self.daemon = True
def run(self):
#print("writing stdin")
for line in self.iterable:
self.inp.write(bytes(line,'utf-8'))
self.inp.close()
#print("writing stdin DONE")
class RunCmd():
"""RunCmd - class to launch shell commands
Captures and returns stdout. Kills child after a given
amount (timeout_runtime) wallclock seconds. Can also
kill after timeout_retriggerable wallclock seconds.
This second timer is reset whenever the child does some
output
(return_code, out) = RunCmd(['ls', '-l', '/etc'],
timeout_runtime,
timeout_no_output,
stdin_string).go()
"""
Timeout = enum.Enum('No','Retriggerable','Runtime')
def __init__(self, cmd, timeout_runtime, timeout_retriggerable, stdin=None):
self.dbg = False
self.cmd = cmd
self.timeout_retriggerable = timeout_retriggerable
self.timeout_runtime = timeout_runtime
self.timeout_hit = self.Timeout.No
self.stdout = '--Cmd did not yield any output--'
self.stdin = stdin
def read_queue(self, q):
time_last_output = None
try:
bstr = q.get(False) # non-blocking
if self.dbg: print('{} chars read'.format(len(bstr)))
time_last_output = time.time()
self.stdout += bstr
except queue.Empty:
#print('queue empty')
pass
return time_last_output
def go(self):
if self.stdin:
pstdin = subprocess.PIPE
else:
pstdin = None
p = subprocess.Popen(self.cmd, shell=False, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, stdin=pstdin)
pin = None
if (pstdin):
pin = Enqueue_input(p.stdin, [self.stdin + '\n'])
pin.start()
q = multiprocessing.Queue()
pout = Enqueue_output(p.stdout, q)
pout.start()
try:
if self.dbg: print('Beginning subprocess with timeout {}/{} s on {}'.format(self.timeout_retriggerable, self.timeout_runtime, time.asctime()))
time_begin = time.time()
time_last_output = time_begin
seconds_passed = 0
self.stdout = b''
once = True # ensure loop's executed at least once
# some child cmds may exit very fast, but still produce output
while once or p.poll() is None or not q.empty():
once = False
if self.dbg: print('a) {} of {}/{} secs passed and overall {} chars read'.format(seconds_passed, self.timeout_retriggerable, self.timeout_runtime, len(self.stdout)))
tlo = self.read_queue(q)
if tlo:
time_last_output = tlo
now = time.time()
if now - time_last_output >= self.timeout_retriggerable:
self.timeout_hit = self.Timeout.Retriggerable
raise ErrorRunCmdTimeOut(self)
if now - time_begin >= self.timeout_runtime:
self.timeout_hit = self.Timeout.Runtime
raise ErrorRunCmdTimeOut(self)
if q.empty():
time.sleep(0.1)
# Final try to get "last-millisecond" output
self.read_queue(q)
finally:
self._close(p, [pout, pin])
return (self.returncode, self.stdout)
def _close(self, p, procs):
if self.dbg:
if self.timeout_hit != self.Timeout.No:
print('{} A TIMEOUT occured: {}'.format(timestamp(), self.timeout_hit))
else:
print('{} No timeout occured'.format(timestamp()))
for process in [proc for proc in procs if proc]:
try:
process.terminate()
except:
print('{} Process termination raised trouble'.format(timestamp()))
raise
try:
p.stdin.close()
except: pass
if self.dbg: print('{} _closed stdin'.format(timestamp()))
try:
p.stdout.close() # If they are not closed the fds will hang around until
except: pass
if self.dbg: print('{} _closed stdout'.format(timestamp()))
#p.stderr.close() # os.fdlimit is exceeded and cause a nasty exception
try:
p.terminate() # Important to close the fds prior to terminating the process!
# NOTE: Are there any other "non-freed" resources?
except: pass
if self.dbg: print('{} _closed Popen'.format(timestamp()))
try:
self.stdout = self.stdout.decode('utf-8')
except: pass
self.returncode = p.returncode
if self.dbg: print('{} _closed all'.format(timestamp()))
Use with:
import runcmd
cmd = ['ls', '-l', '/etc']
worker = runcmd.RunCmd(cmd,
40, # limit runtime [wallclock seconds]
2, # limit runtime after last output [wallclk secs]
'' # stdin input string
)
(return_code, out) = worker.go()
if worker.timeout_hit != worker.Timeout.No:
print('A TIMEOUT occured: {}'.format(worker.timeout_hit))
else:
print('No timeout occured')
print("Running '{:s}' returned {:d} and {:d} chars of output".format(cmd, return_code, len(out)))
print('Output:')
print(out)
command
- the first argument - should be a list of a command and its arguments. It is used for the Popen(shell=False)
call and its timeouts are in seconds. There's currently no code to disable the timeouts. Set timeout_no_output
to time_runtime
to effectively disable the retriggerable timeout_no_output
.
stdin_string
can be any string which is to be sent to the command's standard input. Set to None
if your command does not need any input. If a string is provided, a final '\n' is appended.