how to setup sqlalchemy session in celery tasks wi

2019-02-09 19:22发布

Summary: I want to use a sqlalchemy session in celery tasks without having a global variable containing that session.

I am using SQLAlchemy in a project with celery tasks, and I'm having

Currently, I have a global variable 'session' defined along with my celery app setup (celery.py), with a worker signal to set it up.

session = scoped_session(sessionmaker())

@celeryd_init.connect
def configure_workers(sender=None, conf=None, **kwargs):
    # load the application configuration
    # db_uri = conf['db_uri']
    engine = create_engine(db_uri)
    session.configure(bind=engine)

In the module defining the tasks, I simply import 'session' and use it. Tasks are defined with a custom class that closes the session after returning:

class DBTask(Task):
    def after_return(self, *args, **kwargs):
        session.remove()

That works well, however: when unit testing with CELERY_ALWAYS_EAGER=True, the session won't be configured. The only solution I've found so far is to mock that 'session' variable when running a task in a unit test:

with mock.patch('celerymodule.tasks.session', self.session):
    do_something.delay(...)

While it works, I don't want to do that.

Is there any way to setup a session that will no be a global variable, that will work both for normal asynchronous behavior and without workers with CELERY_ALWAYS_EAGER=True?

1条回答
可以哭但决不认输i
2楼-- · 2019-02-09 20:08

The answer was right under my nose in the official documentation about custom task classes.

I modified the custom task class that I use for tasks accessing the database:

class DBTask(Task):
    _session = None

    def after_return(self, *args, **kwargs):
        if self._session is not None:
            self._session.remove()

    @property
    def session(self):
        if self._session is None:
            _, self._session = _get_engine_session(self.conf['db_uri'],
                                                   verbose=False)

        return self._session

I define my tasks this way:

@app.task(base=DBTask, bind=True)
def do_stuff_with_db(self, conf, some_arg):
    self.conf = conf
    thing = self.session.query(Thing).filter_by(arg=some_arg).first()

That way, the SQLAlchemy session will only be created once for each celery worker process, and I don't need any global variable.

This solves the problem with my unit tests, since the SQLAlchemy session setup is now independant from the celery workers.

查看更多
登录 后发表回答