I'm trying to set up a system that elegantly defers database operations to a seperate thread in order to avoid blocking during Twisted callbacks.
So far, here is my approach:
from contextlib import contextmanager
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
from twisted.internet.threads import deferToThread
_engine = create_engine(initialization_string)
Session = scoped_session(sessionmaker(bind=_engine))
@contextmanager
def transaction_context():
session = Session()
try:
yield session
session.commit()
except:
# No need to do session.rollback(). session.remove will do it.
raise
finally:
session.remove()
def threaded(fn):
@wraps(fn)
def wrapper(*args, **kwargs):
return deferToThread(fn, *args, **kwargs)
return wrapper
This should allow me to wrap a function with the threaded
decorator and then use the transaction_context
context manager in said function's body. Below is an example:
from __future__ import print_function
from my_lib.orm import User, transaction_context, threaded
from twisted.internet import reactor
@threaded
def get_n_users(n):
with transaction_context() as session:
return session.query(User).limit(n).all()
if __name__ == '__main__':
get_n_users(n).addBoth(len)
reactor.run()
However, when I run the above script, I get a failure containing the following traceback:
Unhandled error in Deferred:
Unhandled Error
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 781, in __bootstrap
self.__bootstrap_inner()
File "/usr/lib/python2.7/threading.py", line 808, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 761, in run
self.__target(*self.__args, **self.__kwargs)
--- <exception caught here> ---
File "/usr/local/lib/python2.7/dist-packages/twisted/python/threadpool.py", line 191, in _worker
result = context.call(ctx, function, *args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/twisted/python/context.py", line 118, in callWithContext
return self.currentContext().callWithContext(ctx, func, *args, **kw)
File "/usr/local/lib/python2.7/dist-packages/twisted/python/context.py", line 81, in callWithContext
return func(*args,**kw)
File "testaccess.py", line 9, in get_n_users
return session.query(User).limit(n).all()
File "/usr/lib/python2.7/contextlib.py", line 24, in __exit__
self.gen.next()
File "/home/louis/Documents/Python/knacki/knacki/db.py", line 36, in transaction_context
session.remove()
exceptions.AttributeError: 'Session' object has no attribute 'remove'
I was not expecting this at all. What am I missing? Did I not instantiate my scoped_session
properly?
Edit: Here is a related question about integrating this setup with Twisted. It might help clarify what I'm trying to achieve.
Short answer
Call
.remove()
onSession
, notsession
.Long answer:
scoped_session
doesn't really return aSession
class. Instead, it creates an object that pays attention to which thread it's called in. Calling it will either return the existingSession
instance associated with that thread or associate a new one and return that. A thread local is what associates a thread with a session.The
remove
method on ascoped_session
object removes the session object currently associated with the thread in which it's called. That means it's the opposite ofscoped_session.__call__
, which is kind of a confusing API.Here's a short Python script to illustrate the behavior.
Its output: