I'm trying to use the methods of class as the django-celery tasks, marking it up using @task decorator. The same situation is discribed here, asked by Anand Jeyahar.
It's something like this
class A:
@task
def foo(self, bar):
...
def main():
a = A()
...
# what i need
a.foo.delay(bar) # executes as celery task
a.foo(bar) # executes locally
The problem is even if i use class instance like this a.foo.delay(bar)
it says, that foo
needs at least two arguments, which meens that self
pointer misses.
More information:
- I can't convert class to module because of inheritance
- Methods are strongly depended on class members, so i can't make them static
- Marking class as the task with @task decorator makes the class a task itself, and it could be possible to execute the methods from
run()
method, using some argument as a key for method selection, but it's not exactly what i want.
- Creating an instance of class and passing it as
self
argument to methods changes the way i execute the methods not as celery taks, but as usual methods (i.e. while testing)
- I've tried to find out how i can register the task dinamically, from constructor for example, but celery shares the code between the workers, so that's why it seems to be impossible.
Thanks for your help!
Celery has experimental support for using methods as tasks since version 3.0.
The documentation for this is in celery.contrib.methods
, and also mentions some caveats you should be aware of:
http://docs.celeryproject.org/en/latest/reference/celery.contrib.methods.html
Be aware: support for contrib.methods
removed from Celery since 4.0
Jeremy Satterfield has a clean and straight forward tutorial to write class based tasks if that's what you want to accomplish. You can check it here.
The magic is basically extending celery.Task
class including a run()
method, like something like this:
from celery import Task
class CustomTask(Task):
ignore_result = True
def __init__(self, arg):
self.arg = arg
def run(self):
do_something_with_arg(self.arg)
and then run the task like this:
your_arg = 3
custom_task = CustomTask()
custom_task.delay(your_arg)
I am not sure if ignore_result = True
part is necessary or not BTW.
When you have:
a = A()
you can do:
A.foo.delay(a, param0, .., paramN)
Cheers
For me the only one that works is celery.current_app because just this passes self
to the method.
So this should look like this:
from celery import current_app
from celery.contrib.methods import task_method
class A:
@current_app.task(filter=task_method, name='A.foo')
def foo(self, bar):
...
The name must be used if you have method with the same name in different classes.