I have a class that extends celerys Task
. It runs just fine with the old style API, but I am having problems converting it to the new API.
# In app/tasks.py
from celery import Celery, Task
celery = Celery()
@celery.task
class CustomTask(Task):
def run(self, x):
try:
# do something
except Exception, e:
self.retry(args=[x], exc=e)
And then I run the task like so -
CustomTask().apply_async(args=[x], queue='q1')
And I get the error -
TypeError: run() takes exactly 2 arguments (1 given)
This SO answer seems to do the same thing and it was accepted so presumably it works. Can anyone help me out and explain to me why my code isn't working?
EDIT
This works if I name the task, different from the class name -
name = 'app.tasks.CustomTask2'
But if I keep the name of the task the same as the full class name, it doesn't work
name = 'app.tasks.CustomTask'
But the problem with having a different name is that celery has an extra task, with the same name as the task class name.