I'm attempting to use the new Tornado queue object along with concurrent.futures
to allow my webserver to pass off cpu-intensive tasks to other processes. I want to have access to the Future
object that's returned from the ProcessPoolExecutor
from the concurrent.futures
module so that I can query its state to show on the front-end (e.g. show the process is currently running; show that it has finished).
I seem to have two hurdles with this method:
- How can I submit multiple
q.get()
objects to theProcessPoolExecutor
while also having access to the returnedFuture
objects? - How can I let the
HomeHandler
get access to theFuture
object returned by theProcessPoolExecutor
so that I may show the state information on the front-end?
Thanks for any help.
from tornado import gen
from tornado.ioloop import IOLoop
from tornado.queues import Queue
from concurrent.futures import ProcessPoolExecutor
define("port", default=8888, help="run on the given port", type=int)
q = Queue(maxsize=2)
def expensive_function(input_dict):
gen.sleep(1)
@gen.coroutine
def consumer():
while True:
input_dict = yield q.get()
try:
with ProcessPoolExecutor(max_workers=4) as executor:
future = executor.submit(expensive_function, input_dict)
finally:
q.task_done()
@gen.coroutine
def producer(input_dict):
yield q.put(input_dict)
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/", HomeHandler),
]
settings = dict(
blog_title=u"test",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
debug=True,
)
super(Application, self).__init__(handlers, **settings)
class HomeHandler(tornado.web.RequestHandler):
def get(self):
self.render("home.html")
def post(self, *args, **kwargs):
input_dict = {'foo': 'bar'}
producer(input_dict)
self.redirect("/")
def main():
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(Application())
http_server.listen(options.port)
tornado.ioloop.IOLoop.current().start()
def start_consumer():
tornado.ioloop.IOLoop.current().spawn_callback(consumer)
if __name__ == "__main__":
tornado.ioloop.IOLoop.current().run_sync(start_consumer)
main()
What are you trying to accomplish by combining a
Queue
and aProcessPoolExecutor
? The executor already has it's own internal queue. All you need to do is make theProcessPoolExecutor
a global (it doesn't have to be a global, but you'll want to do something similar to a global even if you keep the queue; it doesn't make sense to create a newProcessPoolExecutor
each time throughconsumer
's loop) and submit things to it directly from the handler.