Get Exception details on Airflow on_failure_callba

2020-08-16 03:22发布

问题:

Is there any way to get the exception details on the airflow on_failure_callback?

I've noticed it's not part of context. I'd like to create a generic exception handling mechanism which posts to Slack information about the errors, including details about the exception. I've now managed to trigger/execute the callback and post to Slack, but can't post the exception details.

Thanks.

回答1:

The error is added to the context here. So you can actually get that simply by doing:

context.get("exception")

Unfortunately it doesn't look like you can get a stack trace or something like that from the context.



回答2:

An on_failure_callback can be supplied to the DAG and/or individual tasks.

In the first case (supplying to the DAG), there is no 'exception' in the context (the argument Airflow calls your on_failure_callback with).

In the second case (supplying to a task), there is.

The contained object should be a python Exception. It's surprisingly non-intuitive to get something like a stack trace from that, but from this answer I use the following to get a fairly readable stack trace:

import traceback

...

exception = context.get('exception')
formatted_exception = ''.join(
   traceback.format_exception(etype=type(exception), 
     value=exception, tb=exception.__traceback__
   )
).strip()



回答3:

I think it's probably no way to get the exception detail inside the callback. Look at the source code

# Handling callbacks pessimistically
try:
    if self.state == State.UP_FOR_RETRY and task.on_retry_callback:
        task.on_retry_callback(context)
    if self.state == State.FAILED and task.on_failure_callback:
        task.on_failure_callback(context)
except Exception as e3:
    logging.error("Failed at executing callback")
    logging.exception(e3)


标签: airflow