How to make SQLAlchemy in Tornado to be async?

2019-01-10 01:51发布

问题:

How to make SQLAlchemy in Tornado to be async ? I found example for MongoDB on async mongo example but I couldn't find anything like motor for SQLAlchemy. Does anyone know how to make SQLAlchemy queries to execute with tornado.gen ( I am using MySQL below SQLAlchemy, at the moment my handlers reads from database and return result, I would like to make this async).

回答1:

ORMs are poorly suited for explicit asynchronous programming, that is, where the programmer must produce explicit callbacks anytime something that uses network access occurs. A primary reason for this is that ORMs make extensive use of the lazy loading pattern, which is more or less incompatible with explicit async. Code that looks like this:

user = Session.query(User).first()
print user.addresses

will actually emit two separate queries - one when you say first() to load a row, and the next when you say user.addresses, in the case that the .addresses collection isn't already present, or has been expired. Essentially, nearly every line of code that deals with ORM constructs might block on IO, so you'd be in extensive callback spaghetti within seconds - and to make matters worse, the vast majority of those code lines won't actually block on IO, so all the overhead of connecting callbacks together for what would otherwise be simple attribute access operations will make your program vastly less efficient too.

A major issue with explicit asynchronous models is that they add tremendous Python function call overhead to complex systems - not just on the user-facing side like you get with lazy loading, but on the internal side as well regarding how the system provides abstraction around the Python database API (DBAPI). For SQLAlchemy to even have basic async support would impose a severe performance penalty on the vast majority of programs that don't use async patterns, and even those async programs that are not highly concurrent. Consider SQLAlchemy, or any other ORM or abstraction layer, might have code like the following:

def execute(connection, statement):
     cursor = connection.cursor()
     cursor.execute(statement)
     results = cursor.fetchall()
     cursor.close()
     return results

The above code performs what seems to be a simple operation, executing a SQL statement on a connection. But using a fully async DBAPI like psycopg2's async extension, the above code blocks on IO at least three times. So to write the above code in explicit async style, even when there's no async engine in use and the callbacks aren't actually blocking, means the above outer function call becomes at least three function calls, instead of one, not including the overhead imposed by the explicit asynchronous system or the DBAPI calls themselves. So a simple application is automatically given a penalty of 3x the function call overhead surrounding a simple abstraction around statement execution. And in Python, function call overhead is everything.

For these reasons, I continue to be less than excited about the hype surrounding explicit async systems, at least to the degree that some folks seem to want to go all async for everything, like delivering web pages (see node.js). I'd recommend using implicit async systems instead, most notably gevent, where you get all the non-blocking IO benefits of an asynchronous model and none of the structural verbosity/downsides of explicit callbacks. I continue to try to understand use cases for these two approaches, so I'm puzzled by the appeal of the explicit async approach as a solution to all problems, i.e. as you see with node.js - we're using scripting languages in the first place to cut down on verbosity and code complexity, and explicit async for simple things like delivering web pages seems to do nothing but add boilerplate that can just as well be automated by gevent or similar, if blocking IO is even such a problem in a case like that (plenty of high volume websites do fine with a synchronous IO model). Gevent-based systems are production proven and their popularity is growing, so if you like the code automation that ORMs provide, you might also want to embrace the async-IO-scheduling automation that a system like gevent provides.

Update: Nick Coghlan pointed out his great article on the subject of explicit vs. implicit async which is also a must read here. And I've also been updated to the fact that pep-3156 now welcomes interoperability with gevent, reversing its previously stated disinterest in gevent, largely thanks to Nick's article. So in the future I would recommend a hybrid of Tornado using gevent for the database logic, once the system of integrating these approaches is available.



回答2:

I had this same issue in the past and I couldn't find a reliable Async-MySQL library. However there is a cool solution using Asyncio + Postgres. You just need to use the aiopg library, which comes with SQLAlchemy support out of the box:

import asyncio
from aiopg.sa import create_engine
import sqlalchemy as sa


metadata = sa.MetaData()

tbl = sa.Table('tbl', metadata,
           sa.Column('id', sa.Integer, primary_key=True),
           sa.Column('val', sa.String(255)))

@asyncio.coroutine
def go():
    engine = yield from create_engine(user='aiopg',
                                      database='aiopg',
                                      host='127.0.0.1',
                                      password='passwd')

    with (yield from engine) as conn:
        yield from conn.execute(tbl.insert().values(val='abc'))

        res = yield from conn.execute(tbl.select().where(tbl.c.val=='abc'))
        for row in res:
            print(row.id, row.val)


loop = asyncio.get_event_loop()
loop.run_until_complete(go())


回答3:

I am using tornado with sqlalchemy in next way:


from tornado_mysql import pools
from sqlalchemy.sql import table, column, select, join
from sqlalchemy.dialects import postgresql, mysql

# from models import M, M2

t = table(...)
t2 = table(...)

xxx_id = 10

j = join(t, t2, t.c.t_id == t2.c.id)
s = select([t]).select_from(j).where(t.c.xxx == xxx_id)

sql_str = s.compile(dialect=mysql.dialect(),compile_kwargs={"literal_binds": True})


pool = pools.Pool(conn_data...)
cur = yield pool.execute(sql_str)
data = cur.fetchone()

In that case we are able to use sqlalchemy models, and sqlalchemy tools for constructig queries.



回答4:

Not tornado, but we sort of made SQLAlchemy async in asyncio in the GINO project:

import asyncio
from gino import Gino, enable_task_local
from sqlalchemy import Column, Integer, Unicode, cast

db = Gino()


class User(db.Model):
    __tablename__ = 'users'

    id = Column(Integer(), primary_key=True)
    nickname = Column(Unicode(), default='noname')


async def main():
    await db.create_pool('postgresql://localhost/gino')

    # Create object, `id` is assigned by database
    u1 = await User.create(nickname='fantix')
    print(u1.id, u1.nickname)  # 1 fantix

    # Retrieve the same row, as a different object
    u2 = await User.get(u1.id)
    print(u2.nickname)  # fantix

    # Update affects only database row and the operating object
    await u2.update(nickname='daisy')
    print(u2.nickname)  # daisy
    print(u1.nickname)  # fantix

    # Returns all user objects with "d" in their nicknames
    users = await User.query.where(User.nickname.contains('d')).gino.all()

    # Find one user object, None if not found
    user = await User.query.where(User.nickname == 'daisy').gino.first()

    # Execute complex statement and return command status
    status = await User.update.values(
        nickname='No.' + cast(User.id, Unicode),
    ).where(
        User.id > 10,
    ).gino.status()

    # Iterate over the results of a large query in a transaction as required
    async with db.transaction():
        async for u in User.query.order_by(User.id).gino.iterate():
            print(u.id, u.nickname)


loop = asyncio.get_event_loop()
enable_task_local(loop)
loop.run_until_complete(main())

It looks a bit like, but actually quite different than SQLAlchemy ORM. Because we used only a part of SQLAlchemy core, and built a simple ORM on top of it. It uses asyncpg underneath, so it is for PostgreSQL only.

Update: GINO supports Tornado now, thanks to the contribution of Vladimir Goncharov. See docs here