How to implement polling in Airflow?

2019-08-18 00:12发布

I want to use Airflow to implement data flows that periodically poll external systems (ftp servers, etc), check for new files matching certain conditions, and then run a bunch of tasks for those files. Now, I'm a newbie to Airflow and read that Sensors are something you would use for this kind of a case, and I actually managed to write a sensor that works ok when I run "airflow test" for it. But I'm a bit confused regarding the relation of poke_interval for the sensor and the DAG scheduling. How should I define those settings for my use case? Or should I use some other approach? I just want Airflow to run the tasks when those files become available, and not flood the dashboard with failures when no new files were available for a while.

1条回答
聊天终结者
2楼-- · 2019-08-18 01:02

Your understanding is correct, using a sensor is the way to go when you want to poll, either by using an existing sensor or by implementing your own.

They are, however, always part of a DAG and they do not execute outside of its boundaries. DAG execution depends on the start_date and schedule_interval, but you can leverage this and a sensor to implement some sort of DAG depending on the status of an external server: one possible approach would be starting the whole DAG with a sensor which checks for a condition to occur and decide to skip the whole DAG if the condition is not met (you can make sure that sensors mark downstream tasks as skipped and not failed by setting their soft_fail parameter to True). You can have a polling interval of one minute by using the most frequent scheduling option (* * * * *). If you really need a shortest polling time you can tweak the sensor's poke_interval and timeout parameters.

Keep in mind, however, that execution times are not probably guaranteed by Airflow itself, so for very short polling times you may want to investigate alternatives (or at least consider different approaches to the one I've just shared).

查看更多
登录 后发表回答