I want one dag starts after completion of another dag. one solution is using external sensor function, below you can find my solution. the problem I encounter is that the dependent dag is stuck at poking, I checked this answer and made sure that both of the dags runs on the same schedule, my simplified code is as follows:
any help would be appreciated.
leader dag:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
schedule = '* * * * *'
dag = DAG('leader_dag', default_args=default_args,catchup=False,
schedule_interval=schedule)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
the dependent dag:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.sensors import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 10, 8),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
schedule='* * * * *'
dag = DAG('dependent_dag', default_args=default_args, catchup=False,
schedule_interval=schedule)
wait_for_task = ExternalTaskSensor(task_id = 'wait_for_task',
external_dag_id = 'leader_dag', external_task_id='t1', dag=dag)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t1.set_upstream(wait_for_task)
the log for leader_dag:
the log for dependent dag:
First the task_id
in the leader_dag
is named print_date
but you setup your dependent_dag
with a task wait_for_task
which is waiting on leader_dag
's task named t1
. There is no task named t1
. What you assigned it to in the py
file is not relevant, nor used in the Airflow db and transversely by the sensor. It should be waiting on task name print_date
.
Second your logs do not line up in which leader_dag run you show for what the dependent_dag is waiting for.
Finally, I can't recommend you use Airflow to schedule tasks every minute. Certainly not two dependent tasks together.
Consider writing streaming jobs in a different system like Spark, or rolling your own Celery or Dask environment for this.
You could also avoid the the ExternalTaskSensor
by adding a TriggerDagRunOperator
to the end of your leader_dag to trigger the dependent_dag, and removing the schedule from that by setting the schedule_interval
to None
.
What I see in your logs is a log for the leader from 2018-10-13T19:08:11. This at best would be the dagrun for execution_date 2018-10-13 19:07:00 because the minute period starting 19:07 ends at 19:08 which is the earliest it can be scheduled. And I see some delay between scheduling and execution of about 11 seconds if this is the case. However there can be multiple minutes of scheduling lag in Airflow.
I also see a log from the dependent_dag
which runs from 19:14:04 to 19:14:34 and is looking for the completion of the corresponding 19:13:00 dagrun. There's no indication that your scheduler is lag free enough to have started the 19:13:00 dagrun of leader_dag
by 19:14:34. You could have better convinced me if you showed it poking for 5 minutes or so. Of course it's never going to sense leader_dag.t1 because that isn't what you named the tasks shown.
So, Airflow has scheduling delay, If you had a few 1000 dags in the system, it might be higher than 1 minute, such that a with catchup=False
you're going to get some runs following each other IE 19:08, 19:09 and some runs that skip a minute (or 6) like 19:10 followed by 19:16 can happen, and since the delay is a bit random on a dag-by-dag basis, you might get unaligned runs with the sensor waiting for ever, EVEN if you have the correct task id to wait for:
wait_for_task = ExternalTaskSensor(
task_id='wait_for_task',
external_dag_id='leader_dag',
- external_task_id='t1',
+ external_task_id='print_date',
dag=dag)
While using ExternalTaskSensor
you have to give both DAGs the same starting date. If that does not work for your use case then you need to use execution_delta
or execution_date_fn
in your ExternalTaskSensor
.