I'm trying to access at subdag creation time some xcom data from parent dag, I was searching to achieve this on internet but I didn't find something.
def test(task_id):
logging.info(f' execution of task {task_id}')
def load_subdag(parent_dag_id, child_dag_id, args):
dag_subdag = DAG(
dag_id='{0}.{1}'.format(parent_dag_id, child_dag_id),
default_args=args,
schedule_interval="@daily",
)
with dag_subdag:
r = DummyOperator(task_id='random')
for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
t = PythonOperator(
task_id='load_subdag_{0}'.format(i),
default_args=args,
python_callable=print_context,
op_kwargs={'task_id': 'load_subdag_{0}'.format(i)},
dag=dag_subdag,
)
return dag_subdag
load_tasks = SubDagOperator(
task_id='load_tasks',
subdag=load_subdag(dag.dag_id,
'load_tasks', args),
default_args=args,
)
got this error with my code
1 | Traceback (most recent call last):
airflow_1 | File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 374, in process_file
airflow_1 | m = imp.load_source(mod_name, filepath)
airflow_1 | File "/usr/local/lib/python3.6/imp.py", line 172, in load_source
airflow_1 | module = _load(spec)
airflow_1 | File "<frozen importlib._bootstrap>", line 684, in _load
airflow_1 | File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
airflow_1 | File "<frozen importlib._bootstrap_external>", line 678, in exec_module
airflow_1 | File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
airflow_1 | File "/app/dags/airflow_dag_test.py", line 75, in <module>
airflow_1 | 'load_tasks', args),
airflow_1 | File "/app/dags/airflow_dag_test.py", line 55, in load_subdag
airflow_1 | for i in range(r.xcom_pull(task_ids='take_Ana', key='the_message', dag_id=parent_dag_id)):
airflow_1 | TypeError: xcom_pull() missing 1 required positional argument: 'context'
The error is simple: you are missing the
context
argument required byxcom_pull()
method. But you really can't just createcontext
to pass into this method; it is aPython
dictionary thatAirflow
passes to anchor methods likepre_execute()
andexecute()
ofBaseOperator
(parent class of allOperator
s).In other words,
context
becomes available only whenOperator
is actually executed, not duringDAG
-definition. And it makes sense because in taxanomy ofAirflow
,xcom
s are communication mechanism betweentask
s in realtime: talking to each other while they are running.But at the end of the day
Xcom
s, just like every otherAirflow
model, are persisted in backend meta-db. So of course you can directly retrieve it from there (obviously only the XCOMs oftask
s that had run in the past). While I don't have a code-snippet, you can have a look atcli.py
where they've used theSQLAlchemy
ORM to play with models and backend-db. Do understand that this would mean a query being fired to your backend-db every time theDAG
-definition file is parsed, which happens rather quickly.Useful links
EDIT-1
After looking at your code-snippet, I got alarmed. Assuming the value returned by
xcom_pull()
will keep changing frequently, the number oftask
s in yourdag
will also keep changing. This can lead to unpredictable behaviours (you should do a fair bit of research but I don't have a good feeling about it)I'd suggest you revisit your entire task workflow and condense down to a design where the - number of
task
s and - structure ofDAG
are known ahead of time (at the time of execution of dag-definition file). You can of-course iterate over ajson
file / result of aSQL
query (like theSQLAlchemy
thing mentioned earlier) etc. to spawn your actualtask
s, but that file / db / whatever shouldn't be changing frequently.Do understand that merely iterating over a list to generate
task
s is not problematic; what's NOT possible is to have structure of yourDAG
dependent on result ofupstream
task
. For example you can't have ntask
s created in yourDAG
based on an upstream task calculating value of n at runtime.So this is not possible
But this is possible (including what you are trying to achieve; even though the way you are doing it doesn't seem like a good idea)