How to delete XCOM objects once the DAG finishes i

2020-08-17 18:00发布

I have a huge json file in the XCOM which later I do not need once the dag execution is finished, but I still see the Xcom Object in the UI with all the data, Is there any way to delete the XCOM programmatically once the DAG run is finished.

Thank you

3条回答
Explosion°爆炸
2楼-- · 2020-08-17 18:11

Below is the code that worked for me,this will delete xcom of all tasks in DAG(Add task_id to SQL if xcom of only specific task needs to be deleted):

As dag_id is dynamic and dates should follow respective syntax of SQL.

from airflow.operators.postgres_operator import PostgresOperator

delete_xcom_task_inst = PostgresOperator(task_id='delete_xcom',
                                            postgres_conn_id='your_conn_id',
                                            sql="delete from xcom where dag_id= '"+dag.dag_id+"' and date(execution_date)=date('{{ ds }}')"
                                            )
查看更多
Animai°情兽
3楼-- · 2020-08-17 18:18

You can perform the cleanup programmatically through sqlalchemy so your solution won't break if the database structure changes:

from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.dag_id == "your dag id").delete()

You can also purge old XCom data:

from airflow.utils.db import provide_session
from airflow.models import XCom
from sqlalchemy import func

@provide_session
def cleanup_xcom(session=None):
    session.query(XCom).filter(XCom.execution_date <= func.date('2019-06-01')).delete()

If you want to purge the XCom once the dag is finished I think the cleanest solution is to use the "on_success_callback" property of the DAG model class:

from airflow.models import DAG
from airflow.utils.db import provide_session
from airflow.models import XCom

@provide_session
def cleanup_xcom(context, session=None):
    dag_id = context["ti"]["dag_id"]
    session.query(XCom).filter(XCom.dag_id == dag_id).delete()

dag = DAG( ...
    on_success_callback=cleanup_xcom,
)
查看更多
SAY GOODBYE
4楼-- · 2020-08-17 18:21

You have to add a task depends on you metadatadb (sqllite, PostgreSql, MySql..) that delete XCOM once the DAG run is finished.

delete_xcom_task = PostgresOperator(
      task_id='delete-xcom-task',
      postgres_conn_id='airflow_db',
      sql="delete from xcom where dag_id=dag.dag_id and 
           task_id='your_task_id' and execution_date={{ ds }}",
      dag=dag)

You can verify your query before you run the dag.

Data Profiling -> Ad Hoc Query -> airflow_db -> query -> Run!

xcom metadata

查看更多
登录 后发表回答