using class methods as celery tasks

2019-01-13 15:44发布

问题:

I'm trying to use the methods of class as the django-celery tasks, marking it up using @task decorator. The same situation is discribed here, asked by Anand Jeyahar. It's something like this

class A:
    @task
    def foo(self, bar):
        ...

def main():
    a = A()
    ...
    # what i need
    a.foo.delay(bar) # executes as celery task 
    a.foo(bar) # executes locally

The problem is even if i use class instance like this a.foo.delay(bar) it says, that foo needs at least two arguments, which meens that self pointer misses.

More information:

  • I can't convert class to module because of inheritance
  • Methods are strongly depended on class members, so i can't make them static
  • Marking class as the task with @task decorator makes the class a task itself, and it could be possible to execute the methods from run() method, using some argument as a key for method selection, but it's not exactly what i want.
  • Creating an instance of class and passing it as self argument to methods changes the way i execute the methods not as celery taks, but as usual methods (i.e. while testing)
  • I've tried to find out how i can register the task dinamically, from constructor for example, but celery shares the code between the workers, so that's why it seems to be impossible.

Thanks for your help!

回答1:

Celery has experimental support for using methods as tasks since version 3.0.

The documentation for this is in celery.contrib.methods, and also mentions some caveats you should be aware of:

http://docs.celeryproject.org/en/latest/reference/celery.contrib.methods.html

Be aware: support for contrib.methods removed from Celery since 4.0



回答2:

Jeremy Satterfield has a clean and straight forward tutorial to write class based tasks if that's what you want to accomplish. You can check it here.

The magic is basically extending celery.Task class including a run() method, like something like this:

from celery import Task

class CustomTask(Task):
    ignore_result = True

    def __init__(self, arg):
        self.arg = arg

    def run(self):
        do_something_with_arg(self.arg)

and then run the task like this:

your_arg = 3

custom_task = CustomTask()
custom_task.delay(your_arg)

I am not sure if ignore_result = True part is necessary or not BTW.



回答3:

When you have:

    a = A()

you can do:

    A.foo.delay(a, param0, .., paramN)

Cheers



回答4:

For me the only one that works is celery.current_app because just this passes self to the method.

So this should look like this:

from celery import current_app
from celery.contrib.methods import task_method

class A:
@current_app.task(filter=task_method, name='A.foo')
def foo(self, bar):
    ...

The name must be used if you have method with the same name in different classes.