Celery Task the difference between these two tasks

2020-07-17 07:48发布

问题:

What's the difference between these two tasks below?

The first one gives an error, the second one runs just fine. Both are the same, they accept extra arguments and they are both called in the same way.

ProcessRequests.delay(batch) **error object.__new__() takes no parameters**


SendMessage.delay(message.pk, self.pk) **works!!!!**   

Now, I have been made aware of what the error means, but my confusion is why one works and not the other.

Tasks...

1)

class ProcessRequests(Task):
    name = "Request to Process"
    max_retries = 1
    default_retry_delay = 3

    def run(self, batch):
       #do something

2)

class SendMessage(Task):
    name = "Sending SMS"
    max_retries = 10
    default_retry_delay = 3

    def run(self, message_id, gateway_id=None, **kwargs):
        #do something

Full Task Code....

from celery.task import Task
from celery.decorators import task

import logging

from sms.models import Message, Gateway, Batch
from contacts.models import Contact
from accounts.models import Transaction, Account


class SendMessage(Task):
    name = "Sending SMS"
    max_retries = 10
    default_retry_delay = 3

    def run(self, message_id, gateway_id=None, **kwargs):
        logging.debug("About to send a message.")

        # Because we don't always have control over transactions
        # in our calling code, we will retry up to 10 times, every 3
        # seconds, in order to try to allow for the commit to the database
        # to finish. That gives the server 30 seconds to write all of
        # the data to the database, and finish the view.
        try:
            message = Message.objects.get(pk=message_id)
        except Exception as exc:
            raise SendMessage.retry(exc=exc)


        if not gateway_id:
            if hasattr(message.billee, 'sms_gateway'):
                gateway = message.billee.sms_gateway
            else:
                gateway = Gateway.objects.all()[0]
        else:
            gateway = Gateway.objects.get(pk=gateway_id)

        # Check we have a credits to sent me message
        account = Account.objects.get(user=message.sender)
        # I'm getting the non-cathed version here, check performance!!!!!
        if account._balance() >= message.length:
            response = gateway._send(message)

            if response.status == 'Sent':
                # Take credit from users account.
                transaction = Transaction(
                    account=account,
                    amount=- message.charge,
                    description="Debit: SMS Sent",

                    )
                transaction.save()
                message.billed = True
                message.save()
        else:
            pass


        logging.debug("Done sending message.")


class ProcessRequests(Task):
    name = "Request to Process"
    max_retries = 1
    default_retry_delay = 3

    def run(self, message_batch):
        for e in Contact.objects.filter(contact_owner=message_batch.user, group=message_batch.group):
            msg = Message.objects.create(
                recipient_number=e.mobile,
                content=message_batch.content,
                sender=e.contact_owner,
                billee=message_batch.user,
                sender_name=message_batch.sender_name
            )
            gateway = Gateway.objects.get(pk=2)
            msg.send(gateway)
            #replace('[FIRSTNAME]', e.first_name)

tried:

 ProcessRequests.delay(batch) should work gives error error object.__new__() takes no parameters     
 ProcessRequests().delay(batch) also gives error error object.__new__() takes no parameters

回答1:

I was able to reproduce your issue:

import celery
from celery.task import Task

@celery.task
class Foo(celery.Task):
    name = "foo"
    def run(self, batch):
       print 'Foo'

class Bar(celery.Task):
    name = "bar"
    def run(self, batch):
       print 'Bar'

# subclass deprecated base Task class
class Bar2(Task):
    name = "bar2"
    def run(self, batch):
       print 'Bar2'

@celery.task(name='def-foo')
def foo(batch):
    print 'foo'

Output:

In [2]: foo.delay('x')
[WARNING/PoolWorker-4] foo

In [3]: Foo().delay('x')
[WARNING/PoolWorker-2] Foo

In [4]: Bar().delay('x')
[WARNING/PoolWorker-3] Bar

In [5]: Foo.delay('x')
TypeError: object.__new__() takes no parameters

In [6]: Bar.delay('x')
TypeError: unbound method delay() must be called with Bar instance as first argument (got str instance instead)

In [7]: Bar2.delay('x')
[WARNING/PoolWorker-1] Bar2

I see you use deprecated celery.task.Task base class, this is why you don't get unbound method errors:

Definition: Task(self, *args, **kwargs)
Docstring:
Deprecated Task base class.

Modern applications should use :class:`celery.Task` instead.

I don't know why ProcessRequests doesn't work though. Maybe it is some caching issues, you may have tried to apply the decorator to your class before and it got cached, and this is exactly the error that you get when you try to apply this decorator to a Task class.

Delete all .pyc file, restart celery workers and try again.

Don't use classes directly

  1. Tasks are instantiated only once per (worker) process, so creating objects of task classes (on client-side) every time doesn't make sense, i.e. Bar() is wrong.
  2. Foo.delay() or Foo().delay() might or might not work, depends on combination of decorator name argument and class name attribute.

Get the task object from celery.registry.tasks dictionary or just use @celery.task decorator on functions (foo in my example) instead.