How to retrieve a value from Airflow XCom pushed v

2019-05-14 17:07发布

I have the following DAG with two SSHExecuteOperator tasks. The first task executes a stored procedure which returns a parameter. The second task needs this parameter as an input.

Could please explain how to pull the value from the XCom pushed in task1, in order to use it in task2?

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.hooks.ssh_hook import SSHHook
from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator
from airflow.models import Variable

default_args = {
  'owner': 'airflow',
  'depends_on_past': False,
  'start_date': datetime.now(),
  'email': ['my@email.com'],
  'email_on_failure': True,
  'retries': 0
}

#server must be changed to point to the correct environment, to do so update DataQualitySSHHook variable in Airflow admin
DataQualitySSHHook = Variable.get('DataQualitySSHHook')
print('Connecting to: ' + DataQualitySSHHook)
sshHookEtl = SSHHook(conn_id=DataQualitySSHHook)
sshHookEtl.no_host_key_check = True 

#create dag
dag = DAG(
  'ed_data_quality_test-v0.0.3', #update version whenever you change something
  default_args=default_args,
  schedule_interval="0 0 * * *",
  dagrun_timeout=timedelta(hours=24),
  max_active_runs=1)

#create tasks
task1 = SSHExecuteOperator(
  task_id='run_remote_sp_audit_batch_register',
  bash_command="bash /opt/scripts/data_quality/EXEC_SP_AUDIT_BATCH.sh 'ED_DATA_QUALITY_MANUAL' 'REGISTER' '1900-01-01 00:00:00.000000' '2999-12-31 00:00:00.000000' ", #keep the space at the end
  ssh_hook=sshHookEtl,
  xcom_push=True,
  retries=0,
  dag=dag)

task2 = SSHExecuteOperator(
  task_id='run_remote_sp_audit_module_session_start',
  bash_command="echo {{ ti.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}",
  ssh_hook=sshHookEtl,
  retries=0,
  dag=dag)

#create dependencies
task1.set_downstream(task2)

标签: ssh airflow
2条回答
可以哭但决不认输i
2楼-- · 2019-05-14 17:28

insted of xcom_push=True , try do_xcom_push=True, It will bring all the stdout to the xcom with key return_value

查看更多
女痞
3楼-- · 2019-05-14 17:42

So the solution I have found is when task1 executes the shell script, you have to make sure the parameter you want to be captured by the XCom variable is the last thing printed by your script (using echo).

Then I was able to retrieve the XCom variable value with the following code snippet:

{{ task_instance.xcom_pull(task_ids='run_remote_sp_audit_batch_register') }}

查看更多
登录 后发表回答