How can I cancel a hanging asyncronous task in tor

2019-05-26 21:53发布

问题:

My setup is python tornado server, which asynchronously processes tasks with a ThreadPoolExecutor. In some conditions, the task might turn into infinite loop. With the with_timeout decorator, I have managed to catch the timeout exception and return an error result to the client. The problem is that the task is still running in the background. How it is possible to stop the task from running in the ThreadPoolExecutor? Or is it possible to cancel the Future? Here is the code that reproduces the problem. Run the code with tornado 4 and concurrent.futures libraries and go to http://localhost:8888/test

from tornado.concurrent import run_on_executor
from tornado.gen import with_timeout
from tornado.ioloop import IOLoop
import tornado.web
from tornado import gen
from concurrent.futures import ThreadPoolExecutor
import datetime
MAX_WAIT_SECONDS = 10

class MainHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(2)

    @run_on_executor
    def test_func(self):
        ...
        #infinite loop might be here
        ...

    @tornado.gen.coroutine
    def get(self):
        future = self.test_func()
        try:
            result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
            self.write({'status' : 0})
            self.finish()
        except Exception, e:
            #how to cancel the task here if it was timeout
            future.cancel() # <-- Does not work
            self.write({'status' : 100})
            self.finish()

application = tornado.web.Application([
    (r"/test", MainHandler),
])
application.listen(8888)
IOLoop.instance().start()

回答1:

Future instances themselves can't be cancelled once they're actually executing, they can only be cancelled if they're in a pending state. This is noted in the docs:

cancel()

Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

So, the only way to abort the method you're running in the background is to actually insert logic into your potentially infinite loop so that it can be aborted when you tell it to. With your example, you could use a threading.Event:

class MainHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(2)

    @run_on_executor
    def test_func(self, event):
        i = 0
        while not event.is_set():
            print i
            i = i + 1

    @tornado.gen.coroutine
    def get(self):
        event = threading.Event()
        future = self.test_func(event)
        try:
            result_search_struct = yield with_timeout(datetime.timedelta(seconds=MAX_WAIT_SECONDS), future )
            self.write({'status' : 0})
            self.finish()
        except Exception, e:
            future.cancel() # Might not work, depending on how busy the Executor is
            event.set()
            self.write({'status' : 100})
            self.finish()

application = tornado.web.Application([
    (r"/test", MainHandler),
])