Incorrect work of scheduler interval and start tim

2019-04-15 20:16发布

问题:

Can't find the solution with start time of tasks. I have code and can't find where I'm wrong.

When I`ve run DAG, 25.03, 26.03, 27.03. tasks were completed, but today(28.03) tasks not started in 6:48.

I have tried to use cron expressions, pendulum, datetime and result is the same. Local time(UTC+3) and airflow's time(UTC) is different. I've tried to use each time(local, airflow) in 'start date' or 'schedule interval' - no result.

Using: Ubuntu, Airflow v. 1.9.0 and local executor.

emailname = Variable.get('test_mail')
l_start_date = datetime(2018, 3, 25, 6, 48) 
l_schedule_interval = '@daily'

WORKFLOW_DEFAULT_ARGS = {
    'owner': 'owner',
    'depends_on_past': True,
    'start_date': l_start_date,
    'email': emailname,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retries_delay': timedelta(minutes=1),
}

# initialize the DAG
dag = DAG(
    dag_id='test_dag_mail',
    default_args=WORKFLOW_DEFAULT_ARGS,
    schedule_interval=l_schedule_interval,
    start_date=l_start_date,
 )

回答1:

This is a feature of Airflow:

Note that if you run a DAG on a schedule_interval of one day, the run stamped 2016-01-01 will be trigger soon after 2016-01-01T23:59. In other words, the job instance is started once the period it covers has ended.

Let’s Repeat That: The scheduler runs your job one schedule_interval AFTER the start date, at the END of the period.

From: https://airflow.apache.org/scheduler.html



回答2:

The scheduler page does a poor job of explaining the that the execution_date is set to the start of the period between the previous run and the current run. Why? Well, it's assumed that a monthly, daily, hourly job needs to read all the the data FROM that time UNTIL the current interval time. Yes they could have switched it around.

Examples (Chopping the nano/millis secs to a single digit):

DAG start_date  DAG schedule_interval Task Started          Task execution_date
2017-12-01      '@daily'              2017-12-02 00:00:02.8 2017-12-01 00:00:00.0
                                      2017-12-03 00:00:01.4 2017-12-02 00:00:00.0
2017-12-01      '@weekly'             2017-12-08 00:00:01.5 2017-12-01 00:00:00.0
                                      2017-12-15 00:00:03.9 2017-12-08 00:00:00.0
2017-12-01      '33 03 * * *'         2017-12-02 03:33:01.6 2017-12-01 03:33:00.0
                                      2017-12-03 03:33:02.2 2017-12-02 03:33:00.0
2017-12-01      '33 03 * * 2'         2017-12-12 03:33:01.7 2017-12-05 03:33:00.0
                                      2017-12-19 03:33:03.1 2017-12-12 03:33:00.0

For this last case given node that the start date is not a Tuesday as requested in the interval; because it's confusing that the start time might not align with an interval time, I haven't exactly tested this and recommend that if you wanted to run every Tuesday, your start time should be on a Tuesday like 2017-12-05 or 2017-11-28.

Reference for that 2017-12:

#  December 2017
Su Mo Tu We Th Fr Sa
                1  2
 3  4  5  6  7  8  9
10 11 12 13 14 15 16
17 18 19 20 21 22 23
24 25 26 27 28 29 30
31

So in your DAG's tasks, using the jinja2 templates from the provided context of a run with {{ds}} or {{execution_date}} will refer to the last column of the examples, as opposed to what datetime.now() would tell you regarding the second to last column.

These are provided so that your tasks can be idempotent: f(data) = f(f(data)).

If you run your DAG once, the final state should be the same as if you run your DAG N times. That way if you run your (linear) DAG and 3 of 5 tasks succeed, but task 4 failed and task 5 never ran, you can re-run the whole dag, and tasks 1-3 will pre-clean or overwrite data such that the output is not changed from the prior successes of those tasks, and then tasks 4 and 5 can hopefully succeed and you're left in a final good state.