Correct setup of django redis celery and celery be

2019-04-19 13:40发布

问题:

I have being trying to setup django + celery + redis + celery_beats but it is giving me trouble. The documentation is quite straightforward, but when I run the django server, redis, celery and celery beats, nothing gets printed or logged (all my test task does its log something).

This is my folder structure:

- aenima 
 - aenima
   - __init__.py
   - celery.py

 - criptoball
   - tasks.py

celery.py looks like this:

from __future__ import absolute_import, unicode_literals
import os
from django.conf import settings
from celery import Celery


# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'aenima.settings')

app = Celery("criptoball")
app.conf.broker_url = 'redis://localhost:6379/0'

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
app.conf.timezone = 'UTC'

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

app.conf.beat_schedule = {
    'test-every-30-seconds': {
        'task': 'tasks.test_celery',
        'schedule': 30.0,
        'args': (16, 16)
    }, }

and tasks.py looks like this:

from __future__ import absolute_import, unicode_literals
from datetime import datetime, timedelta
from celery import shared_task
import logging

from django_celery_beat.models import PeriodicTask, IntervalSchedule

cada_10_seg = IntervalSchedule.objects.create(every=10, period=IntervalSchedule.SECONDS)

test_celery_periodic = PeriodicTask.objects.create(interval=cada_10_seg, name='test_celery', task='criptoball.tasks.test_celery',
expires=datetime.utcnow()+timedelta(seconds=30))

@shared_task
def test_celery(x, y):
    logger = logging.getLogger("AENIMA")
    print("EUREKA")
    logger.debug("EUREKA")

This is the django_celery_beat docs

Not sure what am I missing. When I run

celery -A aenima beat -l debug --scheduler django_celery_beat.schedulers:DatabaseScheduler

celery -A aenima worker -l debug

redis-cli ping PONG

django runserver and redis server, I get nothing printed.

settings.py

CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = TIME_ZONE
CELERY_IMPORTS = ('criptoball.tasks',)

Haven't found any authorative answer to this topic in SO so far.

I would like to solve it all, this error may be just one of many. Thanks a lot for your help!

Edit:

Added settings for redis, declared the the task differently and increased debug level. Now the error is:

Received unregistered task of type u'tasks.test_celery'. The message has been ignored and discarded.

Did you remember to import the module containing this task? Or maybe you're using relative imports? KeyError: u'aenima.criptoball.tasks.test_celery'

I believe Celery's documentation is poor.

EDIT 2 After trying everything, it worked when I put all the tasks inside the same celery.py file. the @shared_task doesn't work, had to use @app.task .

回答1:

I had those issues before. It's not your code. It's usually a problem with the environment. You should run everything under virtualenv, adding a requirements.txt file with the specific package versions.

There is a know issue regarding celery 4.x and django 1.x, so you should consider the packages you are using.

This tutorial will explain in detail how to build virtualenv with celery.

If you can tell me your packages versions I might try and help in a different way.

Edit:

I think its something about the way you run your celery. If we fixed the first problem, try play with this:

celery -A aenima.celery:app beat -l debug --scheduler django_celery_beat.schedulers:DatabaseScheduler

or

celery -A aenima.aenima.celery:app beat -l debug --scheduler django_celery_beat.schedulers:DatabaseScheduler

The latest error you are getting is something to do with your module discovery. Try it first.



回答2:

Using virtualenv for this would be handy.

First like @Gal said you need to make sure you have celery 4.x.

You can install this doing it through pip:

pip install celery

Of course you can also install the 4.x version adding it in your requirements.txt like so:

celery==4.1.0

Or higher versions if available in the future.

Then you could reinstall all your packages using:

  • pip install -r requirements.txt

Which will make sure you have that certain celery package installed.

Now the Celery part, although your code might not be wrong, but I will write in a way how I got my Celery app to work.

__init __.py:

from __future__ import absolute_import, unicode_literals

# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery_conf import app as celery_app

__all__ = ['celery_app']

celery_conf.py:

from __future__ import absolute_import, unicode_literals

import os

from celery import Celery
from datetime import timedelta

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<PATH.TO.YOUR.SETTINGS>')

app = Celery('tasks')

# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

# Set a beat schedule to update every hour.
app.conf.beat_schedule = {
    'update-every-hour': {
        'task': 'tasks.update',
        'schedule': timedelta(minutes=60),
        'args': (16, 16),
    },
}

# The default task that Celery runs.
@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

tasks.py:

# -*- coding: utf-8 -*-
from __future__ import unicode_literals

import requests

from django.conf import settings
from django.http import HttpResponse

from celery.task import Task
from celery.five import python_2_unicode_compatible
from celery import Celery
app = Celery()


@python_2_unicode_compatible
class Update(Task):
    name = 'tasks.update'

    def run(self, *args, **kwargs):
        # Run the task you want to do.

""" For me the regular TaskRegistry didn't work to register classes, 
so I found this handy TaskRegistry demo and made use of it to 
register tasks as classes."""
class TaskRegistry(Task):

    def NotRegistered_str(self):
        self.assertTrue(repr(TaskRegistry.NotRegistered('tasks.add')))

    def assertRegisterUnregisterCls(self, r, task):
        with self.assertRaises(r.NotRegistered):
            r.unregister(task)
        r.register(task)
        self.assertIn(task.name, r)

    def assertRegisterUnregisterFunc(self, r, task, task_name):
        with self.assertRaises(r.NotRegistered):
            r.unregister(task_name)
        r.register(task, task_name)
        self.assertIn(task_name, r)

    def task_registry(self):
        r = TaskRegistry()
        self.assertIsInstance(r, dict, 'TaskRegistry is mapping')

        self.assertRegisterUnregisterCls(r, Update)

        r.register(Update)
        r.unregister(Update.name)
        self.assertNotIn(Update, r)
        r.register(Update)

        tasks = dict(r)
        self.assertIsInstance(
            tasks.get(Update.name), Update)

        self.assertIsInstance(
            r[Update.name], Update)

        r.unregister(Update)
        self.assertNotIn(Update.name, r)

        self.assertTrue(Update().run())

    def compat(self):
        r = TaskRegistry()
        r.regular()
        r.periodic()

As I explained in the code as well, the regular taskregistry did not work thats built in the Celery 4.x, so I made use of the demo taskregistry. You can of course also not use classes to make tasks, but I prefered to use a class.

settings.py:

# Broker settings for redis
CELERY_BROKER_HOST = '<YOUR_HOST>'
CELERY_BROKER_PORT = 6379
CELERY_BROKER_URL = 'redis://'
CELERY_DEFAULT_QUEUE = 'default'

# Celery routes
CELERY_IMPORTS = (
    'PATH.TO.tasks' # The path to your tasks.py
)

CELERY_DATABASE_URL = {
    'default': '<CELERY_DATABASE>', # You can also use your already being used database here
}

INSTALLED_APPS = [
    ...
    'PATH.TO.TASKS' # But exclude the tasks.py from this path
]

LOGGING = {
    ...
    'loggers': {
        'celery': {
            'level': 'DEBUG',
            'handlers': ['console'],
            'propagate': True,
        },
    }
}

I start my worker with the following commands:

redis-server --daemonize yes

celery multi start worker -A PATH.TO.TASKS -l info --beat # But exclude tasks.py from the path

I hope this information may help you or anyone out that's struggling with Celery.

EDIT:

Note that I start the worker as daemon, so you won't actually be able to see the logs in the console. For me it's logged in a .txt file.

Plus note as well the paths to use for example for some you need to include the path to your app like so:

project.apps.app

And for other cases you need to include the tasks.py without the .py as well, I wrote down when to exclude this file and when not to.

EDIT 2:

The @shared_task decorator returns a proxy that always uses the task instance in the current_app. This makes the @shared_task decorator useful for libraries and reusable apps, since they will not have access to the app of the user.

Notice that @shared_task does not have access to the app of the user. The app you're currently trying to register doesn't have access to your app. The method you actually want to use to register a task is:

from celery import Celery
app = Celery()

@app.task
def test_celery(x, y):
    logger = logging.getLogger("AENIMA")
    print("EUREKA")
    logger.debug("EUREKA")


回答3:

Received unregistered task of type u'tasks.test_celery'. The message has been ignored and discarded.

Did you remember to import the module containing this task? Or maybe you're using relative imports?

Maybe your task path is incorrect, should be:

app.conf.beat_schedule = {
    'test-every-30-seconds': {
        'task': 'criptoball.tasks.test_celery',
        'schedule': 30.0,
        'args': (16, 16)
    }, 
}

tasks.test_celery should be full path: criptoball.tasks.test_celery



回答4:

There is one thing you should fix, use:

app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

to tell Celery which apps' tasks do you want it to discover if you're using Celery 3.x.