For some reason I can't deploy DAG files on Google Composer if I import google.cloud.storage in the DAG. If I try to deploy such a DAG file then it doesn't get added to the DagBag so ends up with a non-link entry in the Airflow website and is not usable. At this point there's the usual information icon saying: This DAG isn't available in the web server's DagBag object. It shows up in this list because the scheduler marked it as active in the metadata database. Unlike an actual syntax error there is no error message at the top of the page.
I have boiled this down precisely as to whether I import google.cloud.storage or not. Not even whether I actually use it. For example this dag works fine if I comment out the storage import line, does not install in Composer if I replace it. Would anyone have any clue as to why?
import datetime
from airflow import DAG
from google.cloud import storage
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'Airflow',
'depends_on_past': False,
'email': ['kevin@here.com'],
'email_on_failure': True,
'email_on_retry': True,
'retries': 1,
'retry_delay': datetime.timedelta(minutes=5),
'start_date': datetime.datetime(2017,1,1),
}
def ingest_file(**kwargs):
status = 'OK'
return status
# Not scheduled, trigger only
dag = DAG('ingest_test', default_args=default_args, schedule_interval=None)
ingest = PythonOperator(task_id = 'ingest', provide_context = True,
python_callable = ingest_file, dag = dag)