I'm calling a function in Python which I know may stall and force me to restart the script.
How do I call the function or what do I wrap it in so that if it takes longer than 5 seconds the script cancels it and does something else?
I'm calling a function in Python which I know may stall and force me to restart the script.
How do I call the function or what do I wrap it in so that if it takes longer than 5 seconds the script cancels it and does something else?
I had a need for nestable timed interrupts (which SIGALARM can't do) that won't get blocked by time.sleep (which the thread-based approach can't do). I ended up copying and lightly modifying code from here: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
The code itself:
and a usage example:
There are a lot of suggestions, but none using concurrent.futures, which I think is the most legible way to handle this.
Super simple to read and maintain.
We make a pool, submit a single process and then wait up to 5 seconds before raising a TimeoutError that you could catch and handle however you needed.
Native to python 3.2+ and backported to 2.7 (pip install futures).
Switching between threads and processes is as simple as replacing
ProcessPoolExecutor
withThreadPoolExecutor
.If you want to terminate the Process on timeout I would suggest looking into Pebble.
I ran across this thread when searching for a timeout call on unit tests. I didn't find anything simple in the answers or 3rd party packages so I wrote the decorator below you can drop right into code:
Then it's as simple as this to timeout a test or any function you like:
You can use
multiprocessing.Process
to do exactly that.Code
We can use signals for the same. I think the below example will be useful for you. It is very simple compared to threads.