I want one dag starts after completion of another dag. one solution is using external sensor function, below you can find my solution. the problem I encounter is that the dependent dag is stuck at poking, I checked this answer and made sure that both of the dags runs on the same schedule, my simplified code is as follows: any help would be appreciated. leader dag:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
schedule = '* * * * *'
dag = DAG('leader_dag', default_args=default_args,catchup=False,
schedule_interval=schedule)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
the dependent dag:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.sensors import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 10, 8),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
schedule='* * * * *'
dag = DAG('dependent_dag', default_args=default_args, catchup=False,
schedule_interval=schedule)
wait_for_task = ExternalTaskSensor(task_id = 'wait_for_task',
external_dag_id = 'leader_dag', external_task_id='t1', dag=dag)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t1.set_upstream(wait_for_task)
the log for dependent dag:
First the
task_id
in theleader_dag
is namedprint_date
but you setup yourdependent_dag
with a taskwait_for_task
which is waiting onleader_dag
's task namedt1
. There is no task namedt1
. What you assigned it to in thepy
file is not relevant, nor used in the Airflow db and transversely by the sensor. It should be waiting on task nameprint_date
.Second your logs do not line up in which leader_dag run you show for what the dependent_dag is waiting for.
Finally, I can't recommend you use Airflow to schedule tasks every minute. Certainly not two dependent tasks together. Consider writing streaming jobs in a different system like Spark, or rolling your own Celery or Dask environment for this.
You could also avoid the the
ExternalTaskSensor
by adding aTriggerDagRunOperator
to the end of your leader_dag to trigger the dependent_dag, and removing the schedule from that by setting theschedule_interval
toNone
.What I see in your logs is a log for the leader from 2018-10-13T19:08:11. This at best would be the dagrun for execution_date 2018-10-13 19:07:00 because the minute period starting 19:07 ends at 19:08 which is the earliest it can be scheduled. And I see some delay between scheduling and execution of about 11 seconds if this is the case. However there can be multiple minutes of scheduling lag in Airflow.
I also see a log from the
dependent_dag
which runs from 19:14:04 to 19:14:34 and is looking for the completion of the corresponding 19:13:00 dagrun. There's no indication that your scheduler is lag free enough to have started the 19:13:00 dagrun ofleader_dag
by 19:14:34. You could have better convinced me if you showed it poking for 5 minutes or so. Of course it's never going to sense leader_dag.t1 because that isn't what you named the tasks shown.So, Airflow has scheduling delay, If you had a few 1000 dags in the system, it might be higher than 1 minute, such that a with
catchup=False
you're going to get some runs following each other IE 19:08, 19:09 and some runs that skip a minute (or 6) like 19:10 followed by 19:16 can happen, and since the delay is a bit random on a dag-by-dag basis, you might get unaligned runs with the sensor waiting for ever, EVEN if you have the correct task id to wait for:While using
ExternalTaskSensor
you have to give both DAGs the same starting date. If that does not work for your use case then you need to useexecution_delta
orexecution_date_fn
in yourExternalTaskSensor
.