Airflow depends_on_past for whole DAG

2019-04-30 04:01发布

问题:

Is there a way in airflow of using the depends_on_past for an entire DagRun, not just applied to a Task?

I have a daily DAG, and the Friday DagRun errored on the 4th task however the Saturday and Sunday DagRuns still ran as scheduled. Using depends_on_past = True would have paused the DagRun on the same 4th task, however the first 3 tasks would still have run.

I can see in the DagRun DB table there is a state column that contains failed for the Friday DagRun. What I want is a way configuring a DagRun to not start if the previous DagRun failed, not start and run until finding a Task that previously failed.

Does anyone know if this is possible?

回答1:

At your first task, set depends_on_past=True and wait_for_downstream=True, the combination will result in that current dag-run runs only if the last run succeeded.

Because by setting the first task at current dag-run would waits for previous (depends_on_past) and all tasks (wait_for_downstream) to succeed



回答2:

One possible solution would be to use xcom:

  1. Add 2 PythonOperators start_task and end_task to the DAG.
  2. Make all other tasks depend on start_task
  3. Make end_task depend on all other tasks (set_upstream).
  4. end_task will always push a variable last_success = context['execution_date'] to xcom (xcom_push). (Requires provide_context = True in the PythonOperators).
  5. And start_task will always check xcom (xcom_pull) to see whether there exists a last_success variable with value equal to the previous DagRun's execution_date or to the DAG's start_date (to let the process start).

Example use of xcom:
https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_xcom.py