反转上游/生成气流的多个任务时下游关系(Reversed upstream/downstream r

2019-09-26 15:04发布

有关这个问题的原代码,可以发现在这里 。

我通过了两个位移操作符和困惑set_upstream / set_downstream方法,我在我的DAG已经定义了一个任务循环中工作。 当DAG的主执行回路被配置如下:

for uid in dash_workers.get_id_creds():
    clear_tables.set_downstream(id_worker(uid))

要么

for uid in dash_workers.get_id_creds():
    clear_tables >> id_worker(uid)

该图是这样的(字母数字序列是用户ID,这也定义了任务ID):

当我配置DAG这样的主要执行循环:

for uid in dash_workers.get_id_creds():
    clear_tables.set_upstream(id_worker(uid))

要么

for uid in dash_workers.get_id_creds():
    id_worker(uid) >> clear_tables

该图是这样的:

第二个图是我想要的东西/我本来期望的代码的前两个片段已根据我的文档阅读产生的。 如果我想clear_tables到触发我的一批数据分析任务不同的用户ID之前先执行我应该指出这是clear_tables >> id_worker(uid)

编辑 -下面是完整的代码,因为我张贴的最后几个问题,供参考已更新:

from datetime import datetime
import os
import sys

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

import ds_dependencies

SCRIPT_PATH = os.getenv('DASH_PREPROC_PATH')
if SCRIPT_PATH:
    sys.path.insert(0, SCRIPT_PATH)
    import dash_workers
else:
    print('Define DASH_PREPROC_PATH value in environmental variables')
    sys.exit(1)

ENV = os.environ

default_args = {
  'start_date': datetime.now(),
}

DAG = DAG(
  dag_id='dash_preproc',
  default_args=default_args
)

clear_tables = PythonOperator(
  task_id='clear_tables',
  python_callable=dash_workers.clear_db,
  dag=DAG)

def id_worker(uid):
    return PythonOperator(
        task_id=id,
        python_callable=dash_workers.main_preprocess,
        op_args=[uid],
        dag=DAG)

for uid in dash_workers.get_id_creds():
    preproc_task = id_worker(uid)
    clear_tables << preproc_task

实施@ LadislavIndra的建议后,我继续有同样的逆转实现位位移操作,以获得正确的依赖关系图。

UPDATE @ AshBerlin -泰勒的回答是这是怎么回事的。 我认为图形视图和树视图都做同样的事情,但他们没有。 下面是id_worker(uid) >> clear_tables看起来像在图形视图:

我当然不希望在我的数据前准备程序的最后一步是删除所有数据表!

Answer 1:

在气流中的树视图是“倒退”怎么你(和我!)首先想到它。 在您的第一张截图则显示出“clear_tables”必须的“AAAG5608078M2”运行任务之前运行。 和DAG状态取决于每个ID工人的任务。 因此,而不是一个任务顺序,它的地位链的树。 如果让任何意义可言。

(这似乎在第一怪,但它是因为DAG可以分支出来,在分支回来。)

你可能有更好的运气在寻找你的DAG中的图形视图。 这其中有箭头,并显示在一个更直观的方式执行顺序。 (虽然我现在发现树视图是有用的。这只是不太清楚下手)



Answer 2:

通过您的其他代码来看,似乎get_id_creds是你的任务,你通过它试图循环,这是创造一些奇怪的相互作用。

将工作模式是:

clear_tables = MyOperator()

for uid in uid_list:
  my_task = MyOperator(task_id=uid)
  clear_tables >> my_task


文章来源: Reversed upstream/downstream relationships when generating multiple tasks in Airflow