How to properly handle Daylight Savings Time in Ap

2019-04-10 10:23发布

问题:

In airflow, everything is supposed to be UTC (which is not affected by DST).

However, we have workflows that deliver things based on time zones that are affected by DST.

An example scenario:

  • We have a job scheduled with a start date at 8:00 AM Eastern and a schedule interval of 24 hours.
  • Everyday at 8 AM Eastern the scheduler sees that it has been 24 hours since the last run, and runs the job.
  • DST Happens and we lose an hour.
  • Today at 8 AM Eastern the scheduler sees that it has only been 23 hours because the time on the machine is UTC, and doesn't run the job until 9AM Eastern, which is a late delivery

Is there a way to schedule dags so they run at the correct time after a time change?

回答1:

Off the top of my head:

If your machine is timezone-aware, set up your DAG to run at 8AM EST and 8AM EDT in UTC. Something like 0 11,12 * * *. Have the first task a ShortCircuit operator. Then use something like pytz to localize the current time. If it is within your required time, continue (IE: run the DAG). Otherwise, return False. You'll have a tiny overhead 2 extra tasks per day, but the latency should be minimal as long as your machine isn't overloaded.

sloppy example:

from datetime import datetime
from pytz import utc, timezone

# ...

def is8AM(**kwargs):
    ti = kwargs["ti"]
    curtime = utc.localize(datetime.utcnow())
    # If you want to use the exec date:
    # curtime = utc.localize(ti.execution_date)
    eastern = timezone('US/Eastern') # From docs, check your local names
    loc_dt = curtime.astimezone(eastern)
    if loc_dt.hour == 8:
        return True
    return False

start_task = ShortCircuitOperator(
                task_id='check_for_8AM',
                python_callable=is8AM,
                provide_context=True,
                dag=dag
            )

Hope this is helpful

Edit: runtimes were wrong, subtracted instead of adding. Additionally, due to how runs are launched, you'll probably end up wanting to schedule for 7AM with an hourly schedule if you want them to run at 8.



回答2:

We used @apathyman solution, but instead of ShortCircuit we just used PythonOperator that fails if its not the hour we want, and has a retry with timedelta of 1 hour. that way we have only 1 run per day instead of 2.

and the schedule interval set to run only on the first hour

So basicly, something like that (most code taken from above answer, thanks @apathyman):

from datetime import datetime
from datetime import timedelta
from pytz import utc, timezone


def is8AM(**kwargs):
    ti = kwargs["ti"]
    curtime = utc.localize(datetime.utcnow())
    # If you want to use the exec date:
    # curtime = utc.localize(ti.execution_date)
    eastern = timezone('US/Eastern') # From docs, check your local names
    loc_dt = curtime.astimezone(eastern)
    if loc_dt.hour == 8:
        return True
    exit("Not the time yet, wait 1 hour")

start_task = PythonOperator(
            task_id='check_for_8AM',
            python_callable=is8AM,
            provide_context=True,
            retries=1,
            retry_delay=timedelta(hours=1),
            dag=dag
        )


回答3:

I believe we just need a PythonOperator to handle this case.

If the DAG need to run in DST TZ (for ex.: America/New_York, Europe/London, Australia/Sydney), then below is the workaround steps I can think about:

  1. Convert the DAG schedule to UTC TZ.
    Because the TZ having DST, then we need to choose the bigger offset when doing the convert. For ex:
    • With America/New_York TZ: we must use the offset -4. So schedule */10 11-13 * * 1-5 will be converted to */10 15-17 * * 1-5
    • With Europe/London: we must use the offset +1. So schedule 35 */4 * * * will be converted to 35 3-23/4 * * *
    • With Australia/Sydney: we must use the offset +11. So schedule 15 8,9,12,18 * * * will be converted to 15 21,22,1,7 * * *
  2. Use PythonOperator to make a task before all the main tasks. This task will check if current time is in DST of specified TZ or not. If it's, then the task will sleep in 1 hour. This way we can handle the case of DST TZ.

    def is_DST(zonename):
        tz = pytz.timezone(zonename)
        now = pytz.utc.localize(datetime.utcnow())
        return now.astimezone(tz).dst() != timedelta(0)
    
    
    def WQ_DST_handler(TZ, **kwargs):
        if is_DST(TZ):
            print('Currently is daily saving time (DST) in {0}, will process to next task now'.format(TZ))
        else:
            print('Currently is not daily saving time (DST) in {0}, will sleep 1 hour...'.format(TZ))
            time.sleep(60 * 60)
    
    
    DST_handler = PythonOperator(
        task_id='DST_handler',
        python_callable=WQ_DST_handler,
        op_kwargs={'TZ': TZ_of_dag},
        dag=dag
    )
    
    DST_handler >> main_tasks
    

This workaround has a disadvantage: with any DAG that need to run in DST TZ, we have to create 1 further task (DST_handler in above example), and this task still need to send to work nodes to execute, too (although it's almost just a sleep command).



回答4:

This question was asked when airflow was on version 1.8.x.

This functionality is built-in now, as of airflow 1.10.

https://airflow.apache.org/timezone.html

Set the timezone in airflow.cfg and dst should be handled correctly.



标签: dst airflow