How does Airflow's BranchPythonOperator work?

2019-06-15 08:09发布

问题:

I'm struggling to understand how BranchPythonOperator in Airflow works. I know it's primarily used for branching, but am confused by the documentation as to what to pass into a task and what I need to pass/expect from the task upstream.

Given the simple example in the documentation on this page what would the source code look like for the upstream task called run_this_first and the 2 downstream ones that are branched? How exactly does Airflow know to run branch_a instead of branch_b? Where does the upstream task's` output get noticed/read?

回答1:

Your BranchPythonOperator is created with a python_callable, which will be a function. That function shall return, based on your business logic, the task name of the immediately downstream tasks that you have connected. This could be 1 to N tasks immediately downstream. There is nothing that the downstream tasks HAVE to read, however you could pass them metadata using xcom.

def decide_which_path():
    if something is True:
        return "branch_a"
    else:
        return "branch_b"


branch_task = BranchPythonOperator(
    task_id='run_this_first',
    python_callable=decide_which_path,
    trigger_rule="all_done",
    dag=dag)

branch_task.set_downstream(branch_a)
branch_task.set_downstream(branch_b)

It's important to set the trigger_rule or all of the rest will be skipped, as the default is all_success.