I want to create a dynamically created DAG from database table query. When I'm trying to create a dynamically creating DAG from both of range of exact number or based on available object in airflow settings it's succeeded. However when I'm trying to use a PostgresHook and create a DAG for each of row of my table, I can see a new DAG generated whenever I add a new row in my table. However it turned out that I can't click the newly created DAG on my airflow web server ui. For more context I'm using Google Cloud Composer. I already following the steps mentioned in DAGs not clickable on Google Cloud Composer webserver, but working fine on a local Airflow. However it's still not working for my case.
Here's my code
from datetime import datetime, timedelta
from airflow import DAG
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from psycopg2.extras import NamedTupleCursor
import os
default_args = {
"owner": "debug",
"depends_on_past": False,
"start_date": datetime(2018, 10, 17),
"email": ["airflow@airflow.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
def create_dag(dag_id,
schedule,
default_args):
def hello_world_py(*args):
print 'Hello from DAG: {}'.format(dag_id)
dag = DAG(dag_id,
schedule_interval=timedelta(days=1),
default_args=default_args)
with dag:
t1 = PythonOperator(
task_id=dag_id,
python_callable=hello_world_py,
dag_id=dag_id)
return dag
dag = DAG("dynamic_yolo_pg_", default_args=default_args,
schedule_interval=timedelta(hours=1))
"""
Bahavior:
Create an exact DAG which in turn will create it's own file
https://www.astronomer.io/guides/dynamically-generating-dags/
"""
pg_hook = PostgresHook(postgres_conn_id='some_db')
conn = pg_hook.get_conn()
cursor = conn.cursor(cursor_factory=NamedTupleCursor)
cursor.execute("SELECT * FROM airflow_test_command;")
commands = cursor.fetchall()
for command in commands:
dag_id = command.id
schedule = timedelta(days=1)
id = "dynamic_yolo_" + str(dag_id)
print id
globals()[id] = create_dag(id,
schedule,
default_args)
Best,