Queue and ProcessPoolExecutor in Tornado

2019-04-16 09:12发布

问题:

I'm attempting to use the new Tornado queue object along with concurrent.futures to allow my webserver to pass off cpu-intensive tasks to other processes. I want to have access to the Future object that's returned from the ProcessPoolExecutor from the concurrent.futures module so that I can query its state to show on the front-end (e.g. show the process is currently running; show that it has finished).

I seem to have two hurdles with this method:

  1. How can I submit multiple q.get() objects to the ProcessPoolExecutor while also having access to the returned Future objects?
  2. How can I let the HomeHandler get access to the Future object returned by the ProcessPoolExecutor so that I may show the state information on the front-end?

Thanks for any help.

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue

from concurrent.futures import ProcessPoolExecutor

define("port", default=8888, help="run on the given port", type=int)
q = Queue(maxsize=2)


def expensive_function(input_dict):
    gen.sleep(1)


@gen.coroutine
def consumer():
    while True:
        input_dict = yield q.get()
        try:
            with ProcessPoolExecutor(max_workers=4) as executor:
                future = executor.submit(expensive_function, input_dict)
        finally:
            q.task_done()


@gen.coroutine
def producer(input_dict):
    yield q.put(input_dict)


class Application(tornado.web.Application):
def __init__(self):
    handlers = [
        (r"/", HomeHandler),
    ]
    settings = dict(
        blog_title=u"test",
        template_path=os.path.join(os.path.dirname(__file__), "templates"),
        static_path=os.path.join(os.path.dirname(__file__), "static"),
        debug=True,
    )
    super(Application, self).__init__(handlers, **settings)


class HomeHandler(tornado.web.RequestHandler):
    def get(self):
        self.render("home.html")

    def post(self, *args, **kwargs):
        input_dict = {'foo': 'bar'}

        producer(input_dict)

        self.redirect("/")


def main():
    tornado.options.parse_command_line()
    http_server = tornado.httpserver.HTTPServer(Application())
    http_server.listen(options.port)
    tornado.ioloop.IOLoop.current().start()


def start_consumer():
    tornado.ioloop.IOLoop.current().spawn_callback(consumer)


if __name__ == "__main__":
    tornado.ioloop.IOLoop.current().run_sync(start_consumer)
    main()

回答1:

What are you trying to accomplish by combining a Queue and a ProcessPoolExecutor? The executor already has it's own internal queue. All you need to do is make the ProcessPoolExecutor a global (it doesn't have to be a global, but you'll want to do something similar to a global even if you keep the queue; it doesn't make sense to create a new ProcessPoolExecutor each time through consumer's loop) and submit things to it directly from the handler.

@gen.coroutine
def post(self):
    input_dict = ...
    result = yield executor.submit(expensive_function, input_dict)