Airflow - Proper way to handle DAGs callbacks

2019-08-13 13:50发布

问题:

I have a DAG and then whenever it success or fails, I want it to trigger a method which posts to Slack.

My DAG args is like below:

default_args = {
    [...]
    'on_failure_callback': slack.slack_message(sad_message),
    'on_success_callback': slack.slack_message(happy_message),
    [...]
}

And the DAG definition itself:

dag = DAG(
    dag_id = dag_name_id,
    default_args=default_args,
    description='load data from mysql to S3',
    schedule_interval='*/10 * * * *',
    catchup=False
      )

But when I check Slack there is more than 100 message each minute, as if is evaluating at each scheduler heartbeat and for every log it did runned the success and failure method as if it worked and didn't work for the same task instance (not fine).

How should I properly use the on_failure_callback and on_success_callback to handle dags statuses and call a custom method?

回答1:

The reason it's creating the messages is because when you are defining your default_args, you are executing the functions. You need to just pass the function definition without executing it.

Since the function has an argument, it'll get a little trickier. You can either define two partial functions or define two wrapper functions.

So you can either do:

from functools import partial

success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

or

def success_msg():
    slack.slack_message(happy_message);

def failure_msg():
    slack.slack_message(sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

In either method, note how just the function definition failure_msg and success_msg are being passed, not the result they give when executed.



回答2:

What is the slack method you are referring to? The scheduler is parsing your DAG file every heartbeat, so if the slack some function defined in your code, it is going to get run every heartbeat.

A few things you can try:

  • Define the functions you want to call as PythonOperators and then call them at the task level instead of at the DAG level.

  • You could also use TriggerRules to set tasks downstream of your ETL task that will trigger based on failure or success of the parent task.

From the docs: defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | dummy}

You can find an example of how this would look here (full disclosure - I'm the author).