我有一个DAG
,然后每当它成功或失败,我希望它引发的帖子懈怠其中的方法。
我DAG args
是象下面这样:
default_args = {
[...]
'on_failure_callback': slack.slack_message(sad_message),
'on_success_callback': slack.slack_message(happy_message),
[...]
}
和DAG
定义本身:
dag = DAG(
dag_id = dag_name_id,
default_args=default_args,
description='load data from mysql to S3',
schedule_interval='*/10 * * * *',
catchup=False
)
但是,当我检查松弛有每分钟超过100个的消息,仿佛每个调度心跳评估和为每个登录它并拼命地跑了成功和失败的方法,如果它的工作,并为同一任务实例没有工作(不精细)。
我应该如何正确使用on_failure_callback
和on_success_callback
处理DAG的状态和调用自定义的方法?
它创建的消息的原因是因为当您要定义default_args
,您正在执行的功能。 你需要只是传递函数定义不执行它。
由于函数有一个参数,它会变得有点棘手。 您可以定义两个部分功能或定义两个包装函数。
所以,你可以这样做:
from functools import partial
success_msg = partial(slack.slack_message, happy_message);
failure_msg = partial(slack.slack_message, sad_message);
default_args = {
[...]
'on_failure_callback': failure_msg
'on_success_callback': success_msg
[...]
}
要么
def success_msg():
slack.slack_message(happy_message);
def failure_msg():
slack.slack_message(sad_message);
default_args = {
[...]
'on_failure_callback': failure_msg
'on_success_callback': success_msg
[...]
}
在这两种方法中,注意如何只是函数定义failure_msg
和success_msg
执行时,他们给被传递,而不是结果。
什么是slack
你指的方法是什么? 调度是解析您的DAG文件的每一次心跳,所以如果slack
在你的代码中定义的一些功能,它是会得到运行的每一次心跳。
有几件事情,你可以尝试:
从文档 : defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | dummy}
defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | one_success | one_failed | dummy}
你可以找到如何做到这一点看一个例子这里 (充分披露-我是作者)。