Stopping a function in Python using a timeout

2019-02-18 23:13发布

I'm writing a healthcheck endpoint for my web service.

The end point calls a series of functions which return True if the component is working correctly:

The system is considered to be working if all the components are working:

def is_health():
    healthy = all(r for r in (database(), cache(), worker(), storage()))
    return healthy

When things aren't working, the functions may take a long time to return. For example if the database is bogged down with slow queries, database() could take more than 30 seconds to return.

The healthcheck endpoint runs in the context of a Django view, running inside a uWSGI container. If the request / response cycle takes longer than 30 seconds, the request is harakiri-ed!

This is a huge bummer, because I lose all contextual information that I could have logged about which component took a long time.

What I'd really like, is for the component functions to run within a timeout or a deadline:

with timeout(seconds=30):
    database_result = database()
    cache_result = cache()
    worker_result = worker()
    storage_result = storage()

In my imagination, as the deadline / harakiri timeout approaches, I can abort the remaining health checks and just report the work I've completely.

What's the right way to handle this sort of thing?

I've looked at threading.Thread and Queue.Queue - the idea being that I create a work and result queue, and then use a thread to consume the work queue while placing the results in result queue. Then I could use the thread's Thread.join function to stop processing the rest of the components.

The one challenge there is that I'm not sure how to hard exit the thread - I wouldn't want it hanging around forever if it didn't complete it's run.

Here is the code I've got so far. Am I on the right track?

import Queue
import threading
import time

class WorkThread(threading.Thread):
    def __init__(self, work_queue, result_queue):
        super(WorkThread, self).__init__()
        self.work_queue = work_queue
        self.result_queue = result_queue

        self._timeout = threading.Event()

    def timeout(self):
        self._timeout.set()

    def timed_out(self):
        return self._timeout.is_set()

    def run(self):
        while not self.timed_out():
            try:
                work_fn, work_arg = self.work_queue.get()
                retval = work_fn(work_arg)
                self.result_queue.put(retval)
            except (Queue.Empty):
                break

def work(retval, timeout=1):
    time.sleep(timeout)
    return retval

def main():
    # Two work items that will take at least two seconds to complete.
    work_queue = Queue.Queue()
    work_queue.put_nowait([work, 1])
    work_queue.put_nowait([work, 2])

    result_queue = Queue.Queue()

    # Run the `WorkThread`. It should complete one item from the work queue
    # before it times out.
    t = WorkThread(work_queue=work_queue, result_queue=result_queue)
    t.start()
    t.join(timeout=1.1)
    t.timeout()

    results = []
    while True:
        try:
            result = result_queue.get_nowait()
            results.append(result)
        except (Queue.Empty):
            break

    print results

if __name__ == "__main__":
    main()

Update

It seems like in Python you've got a few options for timeouts of this nature:

  1. Use SIGALARMS which work great if you have full control of the signals used by the process but probably are a mistake when you're running in a container like uWSGI.
  2. Threads, which give you limited timeout control. Depending on your container environment (like uWSGI) you might need to set options to enable them.
  3. Subprocesses, which give you full timeout control, but you need to be conscious of how they might change how your service consumes resources.
  4. Use existing network timeouts. For example, if part of your healthcheck is to use Celery workers, you could rely on AsyncResult's timeout parameter to bound execution.
  5. Do nothing! Log at regular intervals. Analyze later.

I'm exploring the benefits of these different options more.


Update #2

I put together a GitHub repo with quite a bit more information on the topic:

I'll type it up into a answer one day but the TLDR is here:

0条回答
登录 后发表回答