Airflow : ExternalTaskSensor doesn't trigger t

2020-02-14 02:35发布

I have already seen this and this questions on SO and made the changes accordingly. However, my dependent DAG still gets stuck in poking state. Below is my master DAG:

from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime
from airflow.operators.bash_operator import BashOperator

today = datetime.today()

default_args = {
    'depends_on_past': False,
    'retries': 0,
    'start_date': datetime(today.year, today.month, today.day),
    'schedule_interval': '@once'
}

dag = DAG('call-procedure-and-bash', default_args=default_args)

call_procedure = JdbcOperator(
    task_id='call_procedure',
    jdbc_conn_id='airflow_db2',
    sql='CALL AIRFLOW.TEST_INSERT (20)',
    dag=dag
)

call_procedure

Below is my dependent DAG:

from airflow import DAG
from airflow.operators.jdbc_operator import JdbcOperator
from datetime import datetime, timedelta
from airflow.sensors.external_task_sensor import ExternalTaskSensor

today = datetime.today()

default_args = {
    'depends_on_past': False,
    'retries': 0,
    'start_date': datetime(today.year, today.month, today.day),
    'schedule_interval': '@once'
}

dag = DAG('external-dag-upstream', default_args=default_args)

task_sensor = ExternalTaskSensor(
    task_id='link_upstream',
    external_dag_id='call-procedure-and-bash',
    external_task_id='call_procedure',
    execution_delta=timedelta(minutes=-2),
    dag=dag
)

count_rows = JdbcOperator(
    task_id='count_rows',
    jdbc_conn_id='airflow_db2',
    sql='SELECT COUNT(*) FROM AIRFLOW.TEST',
    dag=dag
)

count_rows.set_upstream(task_sensor)

Below are the logs of dependent DAG once the master DAG gets executed:

[2019-01-10 11:43:52,951] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:44:52,955] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:45:52,961] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:46:52,949] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:47:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:48:52,928] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 
[2019-01-10 11:49:52,905] {{external_task_sensor.py:91}} INFO - Poking for call-procedure-and-bash.call_procedure on 2019-01-10T11:45:47.893735+00:00 ... 

Below are the logs of master DAG execution:

[2019-01-10 11:45:20,215] {{jdbc_operator.py:56}} INFO - Executing: CALL AIRFLOW.TEST_INSERT (20)
[2019-01-10 11:45:21,477] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:21,476] {{dbapi_hook.py:166}} INFO - CALL AIRFLOW.TEST_INSERT (20)
[2019-01-10 11:45:24,139] {{logging_mixin.py:95}} INFO - [2019-01-10 11:45:24,137] {{jobs.py:2627}} INFO - Task exited with return code 0

My assumption is, Airflow should trigger the dependent DAG if the master runs fine? I have tried playing around with execution_delta but that doesn't seem to work.

Also, schedule_interval and start_date are same for both of the DAGs so don't think that should cause any trouble.

Am I missing anything?

2条回答
放我归山
2楼-- · 2020-02-14 02:47

Make sure both DAGs start at the same time and you don't start either DAGs manually.

查看更多
Fickle 薄情
3楼-- · 2020-02-14 02:58

It may be that you should use a positive timedelta: https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/external_task_sensor.html because when subtracting the execution delta it's going to end up looking for a task that ran 2 minutes after itself.

However the delta isn't really a range, the TI has to have a matching Dag ID, Task ID, successful result and also an execution date in the list of datetimes. Which when you give execution_delta as a delta, is a list of one datetime taking the current execution date and subtracting the timedelta.

This probably comes down to you either removing the timedelta so that the two execution dates match and the sensor will wait until the other task is successful, OR your start date and schedule interval being set as basically today and @once are getting execution dates not in predictable lock-step with each other. You could try setting say datetime(2019,1,10) and 0 1 * * * to get them to both run daily at 1am (again without an execution_delta).

查看更多
登录 后发表回答