Recover from task failed beyond max_retries

2019-03-10 03:57发布

问题:

I am attempting to asynchronously consume a web service because it takes up to 45 seconds to return. Unfortunately, this web service is also somewhat unreliable and can throw errors. I have set up django-celery and have my tasks executing, which works fine until the task fails beyond max_retries.

Here is what I have so far:

@task(default_retry_delay=5, max_retries=10)
def request(xml):
    try:
        server = Client('https://www.whatever.net/RealTimeService.asmx?wsdl')
        xml = server.service.RunRealTimeXML(
            username=settings.WS_USERNAME,
            password=settings.WS_PASSWORD,
            xml=xml
        )
    except Exception, e:
        result = Result(celery_id=request.request.id, details=e.reason, status="i")
        result.save()
        try:
            return request.retry(exc=e)
        except MaxRetriesExceededError, e:
            result = Result(celery_id=request.request.id, details="Max Retries Exceeded", status="f")
            result.save()
            raise
    result = Result(celery_id=request.request.id, details=xml, status="s")
    result.save()
    return result

Unfortunately, MaxRetriesExceededError is not being thrown by retry(), so I'm not sure how to handle the failure of this task. Django has already returned HTML to the client, and I am checking the contents of Result via AJAX, which is never getting to a full fail f status.

So the question is: How can I update my database when the Celery task has exceeded max_retries?

回答1:

You can override the after_return method of the celery task class, this method is called after the execution of the task whatever is the ret status (SUCCESS,FAILED,RETRY)

class MyTask(celery.task.Task)

    def run(self, xml, **kwargs)
        #Your stuffs here

    def after_return(self, status, retval, task_id, args, kwargs, einfo=None):
        if self.max_retries == int(kwargs['task_retries']):
            #If max retries are equals to task retries do something
        if status == "FAILURE":
            #You can do also something if the tasks fail instead of check the retries

http://readthedocs.org/docs/celery/en/latest/reference/celery.task.base.html#celery.task.base.BaseTask.after_return

http://celery.readthedocs.org/en/latest/reference/celery.app.task.html?highlight=after_return#celery.app.task.Task.after_return



回答2:

With Celery version 2.3.2 this approach has worked well for me:

class MyTask(celery.task.Task):
    abstract = True

    def after_return(self, status, retval, task_id, args, kwargs, einfo):
        if self.max_retries == self.request.retries:
            #If max retries is equal to task retries do something

@task(base=MyTask, default_retry_delay=5, max_retries=10)
def request(xml):
    #Your stuff here


回答3:

The issue is that celery is trying to re-raise the exception you passed in when it hits the retry limit. The code for doing this re-raising is here: https://github.com/celery/celery/blob/v3.1.20/celery/app/task.py#L673-L681

The simplest way around this is to just not have celery manage your exceptions at all:

@task(max_retries=10)
def mytask():
    try:
        do_the_thing()
    except Exception as e:
        try:
            mytask.retry()
        except MaxRetriesExceededError:
            do_something_to_handle_the_error()
            logger.exception(e)


回答4:

I'm just going with this for now, spares me the work of subclassing Task and is easily understood.

# auto-retry with delay as defined below. After that, hook is disabled.
@celery.shared_task(bind=True, max_retries=5, default_retry_delay=300)
def post_data(self, hook_object_id, url, event, payload):
    headers = {'Content-type': 'application/json'}
    try:
        r = requests.post(url, data=payload, headers=headers)
        r.raise_for_status()
    except requests.exceptions.RequestException as e:
        if self.request.retries >= self.max_retries:
            log.warning("Auto-deactivating webhook %s for event %s", hook_object_id, event)
            Webhook.objects.filter(object_id=hook_object_id).update(active=False)
            return False
        raise self.retry(exc=e)
    return True