Cannot deploy DAGs on Google Composer if I import

2019-09-05 03:06发布

问题:

For some reason I can't deploy DAG files on Google Composer if I import google.cloud.storage in the DAG. If I try to deploy such a DAG file then it doesn't get added to the DagBag so ends up with a non-link entry in the Airflow website and is not usable. At this point there's the usual information icon saying: This DAG isn't available in the web server's DagBag object. It shows up in this list because the scheduler marked it as active in the metadata database. Unlike an actual syntax error there is no error message at the top of the page.

I have boiled this down precisely as to whether I import google.cloud.storage or not. Not even whether I actually use it. For example this dag works fine if I comment out the storage import line, does not install in Composer if I replace it. Would anyone have any clue as to why?

import datetime
from airflow import DAG
from google.cloud import storage
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'email': ['kevin@here.com'],
    'email_on_failure': True,
    'email_on_retry': True,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    'start_date': datetime.datetime(2017,1,1),
}

def ingest_file(**kwargs):
    status = 'OK'
    return status

# Not scheduled, trigger only
dag = DAG('ingest_test', default_args=default_args, schedule_interval=None)

ingest = PythonOperator(task_id = 'ingest', provide_context = True, 
  python_callable = ingest_file, dag = dag)

回答1:

If you require PyPi packages in your DAG or custom Operators then you don't get an error message, the DAG just doesn't deploy. If you're getting this then make sure all the packages you need are installed in the Composer environment.

Note, that the behaviour of being present then not present is still there but does actually settle after a short while