How can you catch a custom exception from Celery w

2019-03-29 00:24发布

问题:

My Celery task raises a custom exception NonTransientProcessingError, which is then caught by AsyncResult.get(). Tasks.py:

class NonTransientProcessingError(Exception):
    pass

@shared_task()
def throw_exception():
    raise NonTransientProcessingError('Error raised by POC model for test purposes')

In the Python console:

from my_app.tasks import *
r = throw_exception.apply_async()
try:
    r.get()
except NonTransientProcessingError as e:
    print('caught NonTrans in type specific except clause')

But my custom exception is my_app.tasks.NonTransientProcessingError, whereas the exception raised by AsyncResult.get() is celery.backends.base.NonTransientProcessingError, so my except clause fails.

Traceback (most recent call last):
  File "<input>", line 4, in <module>
  File "/...venv/lib/python3.5/site-packages/celery/result.py", line 175, in get
    raise meta['result']
celery.backends.base.NonTransientProcessingError: Error raised by POC model for test purposes

If I catch the exception within the task, it works fine. It is only when the exception is raised to the .get() call that it is renamed.

How can I raise a custom exception and catch it correctly?

I have confirmed that the same happens when I define a Task class and raise the custom exception in its on_failure method. The following does work:

try:
    r.get()
except Exception as e:
    if type(e).__name__ == 'NonTransientProcessingError':
        print('specific exception identified')
    else:
        print('caught generic but not identified')

Outputs:

specific exception identified

But this can't be the best way of doing this? Ideally I'd like to catch exception superclasses for categories of behaviour.

I'm using Django 1.8.6, Python 3.5 and Celery 3.1.18, with a Redis 3.1.18, Python redis lib 2.10.3 backend.

回答1:

import celery
from celery import shared_task


class NonTransientProcessingError(Exception):
    pass


class CeleryTask(celery.Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        if isinstance(exc, NonTransientProcessingError):
            """
            deal with NonTransientProcessingError
            """
            pass

    def run(self, *args, **kwargs):
        pass


@shared_task(base=CeleryTask)
def add(x, y):
    raise NonTransientProcessingError

Use a base Task with on_failure callback to catch custom exception.