I am trying to use Airflow to execute a simple task python.
from __future__ import print_function
from airflow.operators.python_operator import PythonOperator
from airflow.models import DAG
from datetime import datetime, timedelta
from pprint import pprint
seven_days_ago = datetime.combine(datetime.today() - timedelta(7),
datetime.min.time())
args = {
'owner': 'airflow',
'start_date': seven_days_ago,
}
dag = DAG(dag_id='python_test', default_args=args)
def print_context(ds, **kwargs):
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=print_context,
dag=dag)
If i try, for example:
airflow test python_test print 2015-01-01
It works!
Now i want to put my def print_context(ds, **kwargs)
function in other python file. So i create antoher file called: simple_test.py and change:
run_this = PythonOperator(
task_id='print',
provide_context=True,
python_callable=simple_test.print_context,
dag=dag)
Now I try to run again:
airflow test python_test print 2015-01-01
And OK! It still work!
But if i create a module, for example, worker module with file SimplePython.py
, import (from worker import SimplePython
)it and try:
airflow test python_test print 2015-01-01
It gives the message :
ImportError: No module named worker
The questions:
- Is it possible to import a module inside a DAG definition?
- How Airflow+Celery is going to distribute all necessary python sources files across the worker nodes?
For your second question : How Airflow+Celery is going to distribute all necessary python sources files across the worker nodes?
From documentation : The worker needs to have access to its DAGS_FOLDER, and you need to synchronize the filesystems by your own mean. A common setup would be to store your DAGS_FOLDER in a Git repository and sync it across machines using Chef, Puppet, Ansible, or whatever you use to configure machines in your environment. If all your boxes have a common mount point, having your pipelines files shared there should work as well
http://pythonhosted.org/airflow/installation.html?highlight=chef
You can package dependencies of your DAG as per:
https://airflow.apache.org/concepts.html#packaged-dags
When using CeleryExecutor, you need to manually sync DAG directories, Airflow doesn't take care of that for you:
https://airflow.apache.org/configuration.html?highlight=scaling%20out%20celery#scaling-out-with-celery
While packaging your dags into a zip as covered in the docs is the only supported solution I have seen, you can also do imports of modules that are inside the dags folder. This is useful if you sync the dags folder automatically using other tools like puppet & git.
I am not clear on your directory structure from the question, so here is an example dags folder based on a typical python project structure:
I have left out the (required [1])
__init__.py
files. Note the location of the three example dags. You would almost certainly use only one of these places for all your dags. I include them all here for sake of example because it shouldn't matter for the import. To importmy_test_globals
from any of them:I believe this means that airflow runs with the python path set to the dags directory so each subdirectory of the dags folder can be treated as a python package. In my case it was the additional intermediate project root directory getting in the way of doing a typical intra-package absolute import. Thus, we could restructure this airflow project like this:
So that imports look as we expect them to:
For your first question, it is possible.
And I guess you should create an empty file named
__init__.py
under the same directory withSimplePython.py
(It isworker
directory in your case). By doing thatworker
directory will be regarded as a python module.Then in your DAG definition, try
from worker.SimplePython import print_context
.In you case, I guess it would be better if you write a plugin for airflow, because you might want to upgrade airflow core project without removing your customized functions.