Python celery: Retrieve tasks arguments if there&#

2019-02-12 16:32发布

问题:

I am getting started with Celery and Python, and I have a question that is probably very simple, but I don't seem to be able to find any suitable answer around...

If I have a bunch of tasks, and one of them throws, an exception, is there a way of retrieving the arguments that were passed to said task?

For instance, if I want to get the IPs some hostnames resolve to, and I create a task...

@tasks_app.task
def resolve_hostname(hostname):
    return (hostname, {hst.address for hst in dns.resolver.query(hostname)})

... which can throw an exception, is there a way of getting the value of that hostname argument outside the call when that Exception happens?

Let's say I group the tasks like:

ip_subtasks = group(
    resolve_hostname.s(hostname) for hostname in ['google.com',
                                                  'yahoo.com',
                                                  'failure.kommm']
)()

The last one (that tries to resolve failure.kommm ) will raise an exception. I'd like to put the get() method of the celery task inside a try/catch, and show a message saying Something went wrong when trying to resolve failure.kommm (something like shown below):

for ip_subtask in ip_subtasks:
    try:
        hostname, ips = ip_subtask.get(timeout=45)
    except dns.exception.DNSException, e:
        # I WISHED THIS WORKED:
        logger.exception("Something happened when trying"
                         " to resolve %s" % ip_subtask.args[0])

So, that's the question... Is there a way of retrieving the arguments a task was executed with if I have the task instance itself?

Thank you in advance.

回答1:

To do this you can use an abstract class to implement an on_failure handler.

from celery import Task

class DebugTask(Task):
    abstract = True

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        logger.exception("Something happened when trying"
                         " to resolve %s" % args[0])

@tasks_app.task(base=DebugTask)
def resolve_hostname(hostname):
    return (hostname, {hst.address for hst in dns.resolver.query(hostname)})

From the docs:

on_failure(self, exc, task_id, args, kwargs, einfo)

Parameters: 
  exc     – The exception raised by the task.
  task_id – Unique id of the failed task.
  args    – Original arguments for the task that failed.
  kwargs  – Original keyword arguments for the task that failed.
  einfo   – ExceptionInfo instance, containing the traceback.