Tasks added to DAG during runtime fail to be sched

2020-03-28 19:38发布

My idea is to have a task foo which generates a list of inputs (users, reports, log files, etc), and a task is launched for every element in the input list. The goal is to make use of Airflow's retrying and other logic, instead of reimplementing it.

So, ideally, my DAG should look something like this: enter image description here

The only variable here is the number of tasks generated. I want to do some more tasks after all of these are completed, so spinning up a new DAG for every task does not seem appropriate.

This is my code:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2015, 6, 1)
}

dag = DAG('dynamic_dag_generator', schedule_interval=None, default_args=default_args)

foo_operator = BashOperator(
    task_id='foo',
    bash_command="echo '%s'" % json.dumps(range(0, random.randint(40,60))),
    xcom_push=True,
    dag=dag)

def gen_nodes(**kwargs):
    ti = kwargs['ti']
    workers = json.loads(ti.xcom_pull(task_ids='foo'))

    for wid in workers:
        print("Iterating worker %s" % wid)
        op = PythonOperator(
            task_id='test_op_%s' % wid,
            python_callable=lambda: print("Dynamic task!"),
            dag=dag
        )

        op.set_downstream(bar_operator)
        op.set_upstream(dummy_op)

gen_subdag_node_op = PythonOperator(
    task_id='gen_subdag_nodes',
    python_callable=gen_nodes,
    provide_context=True,
    dag=dag
)

gen_subdag_node_op.set_upstream(foo_operator)

dummy_op = DummyOperator(
    task_id='dummy',
    dag=dag
)

dummy_op.set_upstream(gen_subdag_node_op)

bar_operator = DummyOperator(
    task_id='bar',
    dag=dag)

bar_operator.set_upstream(dummy_op)

In the logs, I can see that gen_nodes is executed correctly (i.e. Iterating worker 5, etc). However, the new tasks are not scheduled and there is no evidence that they were executed.

I found related code samples online, such as this, but could not make it work. Am I missing something?

Alternatively, is there a more appropriate approach to this problem (isolating units of work)?

1条回答
兄弟一词,经得起流年.
2楼-- · 2020-03-28 20:37

At this point in time, airflow does not support adding/removing a task while the dag is running.

The workflow order will be whatever is evaluated at the start of the dag run.

See the second paragraph here.

This means you cannot add/remove tasks based on something that happens in the run. You can add X tasks in a for loop based on something not related to the run, but after the run has begun there is no changing the workflow shape/order.

Many times you can instead use a BranchPythonOperator to make a decision during a dag run, (and these decisions can be based on your xcom values) but they must be a decision to go down a branch that already exists in the workflow.

Dag runs, and Dag definitions are separated in airflow in ways that aren't entirely intuitive, but more or less anything that is created/generated inside a dag run (xcom, dag_run.conf, etc.) is not usable for defining the dag itself.

查看更多
登录 后发表回答