Can't find the solution with start time of tasks. I have code and can't find where I'm wrong.
When I`ve run DAG, 25.03, 26.03, 27.03. tasks were completed, but today(28.03) tasks not started in 6:48.
I have tried to use cron expressions, pendulum, datetime and result is the same. Local time(UTC+3) and airflow's time(UTC) is different. I've tried to use each time(local, airflow) in 'start date' or 'schedule interval' - no result.
Using: Ubuntu, Airflow v. 1.9.0 and local executor.
emailname = Variable.get('test_mail')
l_start_date = datetime(2018, 3, 25, 6, 48)
l_schedule_interval = '@daily'
WORKFLOW_DEFAULT_ARGS = {
'owner': 'owner',
'depends_on_past': True,
'start_date': l_start_date,
'email': emailname,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retries_delay': timedelta(minutes=1),
}
# initialize the DAG
dag = DAG(
dag_id='test_dag_mail',
default_args=WORKFLOW_DEFAULT_ARGS,
schedule_interval=l_schedule_interval,
start_date=l_start_date,
)
This is a feature of Airflow:
From: https://airflow.apache.org/scheduler.html
The scheduler page does a poor job of explaining the that the
execution_date
is set to the start of the period between the previous run and the current run. Why? Well, it's assumed that a monthly, daily, hourly job needs to read all the the data FROM that time UNTIL the current interval time. Yes they could have switched it around.Examples (Chopping the nano/millis secs to a single digit):
For this last case given node that the start date is not a Tuesday as requested in the interval; because it's confusing that the start time might not align with an interval time, I haven't exactly tested this and recommend that if you wanted to run every Tuesday, your start time should be on a Tuesday like 2017-12-05 or 2017-11-28.
Reference for that 2017-12:
So in your DAG's tasks, using the jinja2 templates from the provided context of a run with
{{ds}}
or{{execution_date}}
will refer to the last column of the examples, as opposed to whatdatetime.now()
would tell you regarding the second to last column.These are provided so that your tasks can be idempotent: f(data) = f(f(data)).
If you run your DAG once, the final state should be the same as if you run your DAG N times. That way if you run your (linear) DAG and 3 of 5 tasks succeed, but task 4 failed and task 5 never ran, you can re-run the whole dag, and tasks 1-3 will pre-clean or overwrite data such that the output is not changed from the prior successes of those tasks, and then tasks 4 and 5 can hopefully succeed and you're left in a final good state.