How to access the orm with celery tasks?

2019-04-09 11:47发布

问题:

I'm trying to flip a boolean flag for particular types of objects in my database using sqlalchemy+celery beats. But how do I access my orm from the tasks.py file?

from models import Book
from celery.decorators import periodic_task
from application import create_celery_app

celery = create_celery_app()
# Create celery: http://flask.pocoo.org/docs/0.10/patterns/celery/

# This task works fine
@celery.task
def celery_send_email(to,subject,template):
    with current_app.app_context():
        msg = Message(
            subject,
            recipients=[to],
            html=template,
            sender=current_app.config['MAIL_DEFAULT_SENDER']
        )
        return mail.send(msg)

#This fails
@periodic_task(name='release_flag',run_every=timedelta(seconds=10))
def release_flag():
    with current_app.app_context(): <<< #Fails on this line
        books = Book.query.all() <<<< #Fails here too
        for book in books:
          book.read = True
          book.save()

I'm using celery beat command to run this:

celery -A tasks worker -l INFO --beat

But I'm getting the following error:

raise RuntimeError('working outside of application context')
RuntimeError: working outside of application context

Which points back to the with current_app.app_context() line

If I remove the current_app.app_context() line I will get the following error:

RuntimeError: application not registered on db instance and no application bound to current context

Is there a particular way to access the flask-sqlalchemy orm for celery tasks? Or would there be a better approach to what I'm trying to do?

So far the only workaround which works was to add the following line after db.init_app(app) in my application factory pattern:

db.app = app

I was following this repo to create my celery app https://github.com/mattupstate/overholt/blob/master/overholt/factory.py

回答1:

You're getting that error because current_app requires an app context to work, but you're trying to use it to set up an app context. You need to use the actual app to set up the context, then you can use current_app.

with app.app_context():
    # do stuff that requires the app context

Or you can use a pattern such as the one described in the Flask docs to subclass celery.Task so it knows about the app context by default.

from celery import Celery

def make_celery(app):
     celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'])
     celery.conf.update(app.config)
     TaskBase = celery.Task

     class ContextTask(TaskBase):
         abstract = True

         def __call__(self, *args, **kwargs):
             with app.app_context():
                 return TaskBase.__call__(self, *args, **kwargs)

     celery.Task = ContextTask
     return celery

 celery = make_celery(app)