Can a failed Airflow DAG Task Retry with changed p

2020-02-16 01:57发布

问题:

With Airflow, is it possible to restart an upstream task if a downstream task fails? This seems to be against the "Acyclic" part of the term DAG. I would think this is a common problem though.

Background

I'm looking into using Airflow to manage a data processing workflow that has been managed manually.

There is a task that will fail if a parameter x is set too high, but increasing the parameter value gives better quality results. We have not found a way to calculate a safe but maximally high parameter x. The process by hand has been to restart the job if failed with a lower parameter until it works.

The workflow looks something like this:

Task A - Gather the raw data

Task B - Generate config file for job

Task C - Modify config file parameter x

Task D - Run the data manipulation Job

Task E - Process Job results

Task F - Generate reports

Issue

If task D fails because of parameter x being too high, I want to rerun task C and task D. This doesn't seem to be supported. I would really appreciate some guidance on how to handle this.

回答1:

First of all: that's an excellent question, I wonder why it hasn't been discussed widely until now


I can think of two possible approaches

  1. Fusing Operators: As pointed out by @Kris, Combining Operators together appears to be the most obvious workaround

  2. Separate Top-Level DAGs: Read below


Separate Top-Level DAGs approach

Given

  • Say you have tasks A & B
  • A is upstream to B
  • You want execution to resume (retry) from A if B fails

(Possibile) Idea: If your'e feeling adventurous

  • Put tasks A & B in separate top-level DAGs, say DAG-A & DAG-B
  • At the end of DAG-A, trigger DAG-B using TriggerDagRunOperator
    • In all likelihood, you will also have to use an ExternalTaskSensor after TriggerDagRunOperator
  • In DAG-B, put a BranchPythonOperator after Task-B with trigger_rule=all_done
  • This BranchPythonOperator should branch out to another TriggerDagRunOperator that then invokes DAG-A (again!)

Useful references

  • Fusing Operators Together
  • Wiring Top-Level DAGs together

EDIT-1

Here's a much simpler way that can achieve similar behaviour

How can you re-run upstream task if a downstream task fails in Airflow (using Sub Dags)