气流 - 正确的方式来处理的DAG回调(Airflow - Proper way to handle

2019-09-27 06:49发布

我有一个DAG ,然后每当它成功或失败,我希望它引发的帖子懈怠其中的方法。

DAG args是象下面这样:

default_args = {
    [...]
    'on_failure_callback': slack.slack_message(sad_message),
    'on_success_callback': slack.slack_message(happy_message),
    [...]
}

DAG定义本身:

dag = DAG(
    dag_id = dag_name_id,
    default_args=default_args,
    description='load data from mysql to S3',
    schedule_interval='*/10 * * * *',
    catchup=False
      )

但是,当我检查松弛有每分钟超过100个的消息,仿佛每个调度心跳评估和为每个登录它并拼命地跑了成功和失败的方法,如果它的工作,并为同一任务实例没有工作(不精细)。

我应该如何正确使用on_failure_callbackon_success_callback处理DAG的状态和调用自定义的方法?

Answer 1:

它创建的消息的原因是因为当您要定义default_args ,您正在执行的功能。 你需要只是传递函数定义不执行它。

由于函数有一个参数,它会变得有点棘手。 您可以定义两个部分功能或定义两个包装函数。

所以,你可以这样做:

from functools import partial

success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

要么

def success_msg():
    slack.slack_message(happy_message);

def failure_msg():
    slack.slack_message(sad_message);

default_args = {
    [...]
    'on_failure_callback': failure_msg
    'on_success_callback': success_msg
    [...]
}

在这两种方法中,注意如何只是函数定义failure_msgsuccess_msg执行时,他们给被传递,而不是结果。



Answer 2:

什么是slack你指的方法是什么? 调度是解析您的DAG文件的每一次心跳,所以如果slack在你的代码中定义的一些功能,它是会得到运行的每一次心跳。

有几件事情,你可以尝试:

  • 定义你要调用的PythonOperators,然后在任务级别,而不是在DAG级别给他们打电话的功能。

  • 你也可以使用TriggerRules设置将触发基于故障或父任务的成功,你的ETL任务下游任务。

从文档 : defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | dummy} defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | dummy}

你可以找到如何做到这一点看一个例子这里 (充分披露-我是作者)。



文章来源: Airflow - Proper way to handle DAGs callbacks