SQLAlchemy session issues with celery

2019-03-16 03:10发布

I have scheduled a few recurring tasks with celery beat for our web app

The app itself is build using pyramid web framework. Using the zopetransaction extension to manage session

In celery, I am using the app as a library. I am redefining session in models with a function.

It works well but once in a while, it raises InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction

I am not sure what is wrong and why it issues these warnings.

Sample code:

in tasks.py

def initialize_async_session():
    import sqlalchemy
    from webapp.models import Base, set_dbsession, engine

    Session = sqlalchemy.orm.scoped_session(
                sqlalchemy.orm.sessionmaker(autocommit=True, autoflush=True)
                            )
    Session.configure(bind=engine)
    session = Session()

    set_dbsession(session)
    Base.metadata.bind = engine
    return session


@celery.task
def rerun_scheduler():
    log.info("Starting pipeline scheduler")
    session = initialize_async_session()
    webapp.sheduledtask.service.check_for_updates(session)
    log.info("Ending pipeline scheduler")

In models.py in webapp

DBSession = scoped_session(sessionmaker(bind=engine, expire_on_commit=False,
                    extension=ZopeTransactionExtension()))

def set_dbsession(db_session=None):
    """
    This function sets the db session
    """
    global DBSession
    if db_session:
        DBSession = db_session
        log.info("session changed to {0}".format(db_session))

UPDATE:

traceback:

Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 54, in new_function
    result = f(*args, **kwargs)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 100, in new_function
    result = f(*args, **kwargs)
  File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/data/mongo_service.py", line 1274, in run
    self.table_params.set_task_status_as_finished()
  File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/mem_objects.py", line 33, in set_task_status_as_finished
    task = Task.get_by_id(self.task_id)
  File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/models.py", line 162, in get_by_id
    return DBSession.query(cls).filter(cls.id == obj_id).first()
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2156, in first
    ret = list(self[0:1])
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2023, in __getitem__
    return list(res)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2227, in __iter__
    return self._execute_and_instances(context)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2240, in _execute_and_instances
    close_with_result=True)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2231, in _connection_from_session
    **kw)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 777, in connection
    close_with_result=close_with_result)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 781, in _connection_for_bind
    return self.transaction._connection_for_bind(engine)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 289, in _connection_for_bind
    self._assert_is_active()
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 217, in _assert_is_active
    "This Session's transaction has been rolled back "
InvalidRequestError: This Session's transaction has been rolled back by a nested rollback() call.  To begin a new transaction, issue Session.rollback() first.

#########################################################################

[2013-05-30 14:32:57,782: WARNING/PoolWorker-3] Exception in thread Thread-4:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 54, in new_function
    result = f(*args, **kwargs)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 100, in new_function
    result = f(*args, **kwargs)
  File "/home/ranjith/wksp/mvc-service/webapp/webapp/data/mongo_service.py", line 1274, in run
    self.table_params.set_task_status_as_finished()
  File "/home/ranjith/wksp/mvc-service/webapp/webapp/mem_objects.py", line 33, in set_task_status_as_finished
    task = Task.get_by_id(self.task_id)
  File "/home/ranjith/wksp/mvc-service/webapp/webapp/models.py", line 166, in get_by_id
    return DBSession.query(cls).filter(cls.id == obj_id).first()
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2145, in first
    ret = list(self[0:1])
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2012, in __getitem__
    return list(res)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2216, in __iter__
    return self._execute_and_instances(context)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2229, in _execute_and_instances
    close_with_result=True)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2220, in _connection_from_session
    **kw)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 798, in connection
    close_with_result=close_with_result)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 802, in _connection_for_bind
    return self.transaction._connection_for_bind(engine)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 281, in _connection_for_bind
    self._assert_active()
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 181, in _assert_active
    "This session is in 'prepared' state; no further "
InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction.

1条回答
beautiful°
2楼-- · 2019-03-16 03:39

I believe the problem is that you are attempting to use the SQLAlchemy session in your Celery task.

The first thing I recommend doing is creating two separate scoped sessions, one for your Celery application and another one for your web application. Next, I would make sure your Celery database session is only configured once during Celery initialization. You can use the Celery worker_init.connect to make sure it creates the database during Celery startup (http://hynek.me/articles/using-celery-with-pyramid/).

It is very important that your web application does not use the same database session as your Celery application.

Something like this for your tasks.py file:

from celery import Celery
from celery.signals import worker_init
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

Session = sqlalchemy.orm.scoped_session(
    sqlalchemy.orm.sessionmaker(autocommit=True, autoflush=True))


@worker_init.connect
def initialize_session():
    some_engine = create_engine('database_url')    
    Session.configure(bind=some_engine)

@celery.task
def rerun_scheduler():
    log.info("Starting pipeline scheduler")
    webapp.sheduledtask.service.check_for_updates(Session)
    log.info("Ending pipeline scheduler")
查看更多
登录 后发表回答