-->

Simple async example with tornado python

2020-02-26 10:34发布

问题:

I want find simple async server example. I have got some function with lot of wait, database transactions ... etc:

def blocking_task(n):
    for i in xrange(n):
        print i
        sleep(1)
    return i

I need run it function in separated process without blocking. Is it possible?

回答1:

Tornado is designed to run all your operations in a single thread, but utilize asynchronous I/O to avoid blocking as much as possible. If the DB you're using has asychronous Python bindings (ideally ones geared for Tornado specifically, like Motor for MongoDB or momoko for Postgres), then you'll be able to run your DB queries without blocking the server; no separate processes or threads needed.

To address the exact example you gave, where time.sleep(1) is called, you could use this approach to do it asynchronously via tornado coroutines:

#!/usr/bin/python

import tornado.web
from tornado.ioloop import IOLoop
from tornado import gen 
import time

@gen.coroutine
def async_sleep(seconds):
    yield gen.Task(IOLoop.instance().add_timeout, time.time() + seconds)

class TestHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        for i in xrange(100):
            print i
            yield async_sleep(1)
        self.write(str(i))
        self.finish()


application = tornado.web.Application([
    (r"/test", TestHandler),
    ])  

application.listen(9999)
IOLoop.instance().start()

The interesting part is async_sleep. That method is creating an asynchronous Task, which is calling the ioloop.add_timeout method. add_timeout will run a specified callback after a given number of seconds, without blocking the ioloop while waiting for the timeout to expire. It expects two arguments:

add_timeout(deadline, callback) # deadline is the number of seconds to wait, callback is the method to call after deadline.

As you can see in the example above, we're only actually providing one parameter to add_timeout explicitly in the code, which means we end up this this:

add_timeout(time.time() + seconds, ???)

We're not providing the expected callback parameter. In fact, when gen.Task executes add_timeout, it appends a callback keyword argument to the end of the explicitly provided parameters. So this:

yield gen.Task(loop.add_timeout, time.time() + seconds)

Results in this being executed inside gen.Task():

loop.add_timeout(time.time() + seconds, callback=gen.Callback(some_unique_key))

When gen.Callback is executed after the timeout, it signals that the gen.Task is complete, and the program execution will continue on to the next line. This flow is kind of difficult to fully understand, at least at first (it certainly was for me when I first read about it). It'll probably be helpful to read over the Tornado gen module documentation a few times.



回答2:

import tornado.web
from tornado.ioloop import IOLoop
from tornado import gen

from tornado.concurrent import run_on_executor
from concurrent.futures import ThreadPoolExecutor   # `pip install futures` for python2

MAX_WORKERS = 16

class TestHandler(tornado.web.RequestHandler):
    executor = ThreadPoolExecutor(max_workers=MAX_WORKERS)

    """
    In below function goes your time consuming task
    """

    @run_on_executor
    def background_task(self):
        sm = 0
        for i in range(10 ** 8):
            sm = sm + 1

        return sm

    @tornado.gen.coroutine
    def get(self):
        """ Request that asynchronously calls background task. """
        res = yield self.background_task()
        self.write(str(res))

class TestHandler2(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        self.write('Response from server')
        self.finish()


application = tornado.web.Application([
    (r"/A", TestHandler),
    (r"/B", TestHandler2),
    ])

application.listen(5000)
IOLoop.instance().start()

When you run above code, you can run a computationally expensive operation at http://127.0.0.1:5000/A , which does not block execution, see by visiting http://127.0.0.1:5000/B immediately after you visit http://127.0.0.1:5000/A.



回答3:

Here I update the information about Tornado 5.0. Tornado 5.0 add a new method IOLoop.run_in_executor. In the "Calling blocking functions" of Coroutine patterns Chapter:

The simplest way to call a blocking function from a coroutine is to use IOLoop.run_in_executor, which returns Futures that are compatible with coroutines:

@gen.coroutine def call_blocking(): yield IOLoop.current().run_in_executor(blocking_func, args)

Also, in the documeng of run_on_executor, is says:

This decorator should not be confused with the similarly-named IOLoop.run_in_executor. In general, using run_in_executor when calling a blocking method is recommended instead of using this decorator when defining a method. If compatibility with older versions of Tornado is required, consider defining an executor and using executor.submit() at the call site.

In 5.0 version, IOLoop.run_in_executor is recommanded in use case of Calling blocking functions.