I'm using Airflow 1.8.1 and I want to push the result of a sql request from PostgreOperator.
Here's my tasks:
check_task = PostgresOperator(
task_id='check_task',
postgres_conn_id='conx',
sql="check_task.sql",
xcom_push=True,
dag=dag)
def py_is_first_execution(**kwargs):
value = kwargs['ti'].xcom_pull(task_ids='check_task')
print 'count ----> ', value
if value == 0:
return 'next_task'
else:
return 'end-flow'
check_branch = BranchPythonOperator(
task_id='is-first-execution',
python_callable=py_is_first_execution,
provide_context=True,
dag=dag)
and here is my sql script:
select count(1) from table
when i check the xcom value from check_task
it retrieves none
value.
If i'm correct, airflow automatically pushes to xcom when a query returns a value. However, when you look at the code of the postgresoperator you see that it has an execute method that calls the run method of the PostgresHook (extension of dbapi_hook). Both methods do not return anything, as such it pushes nothing to xcom. What we did to fix this is create a CustomPostgresSelectOperator, a copy of the PostgresOperator, but instead of 'hook.run(..)' do 'return hook.get_records(..)'.
Hope that helps you.
Finally, I created a new Sensor
ExecuteSqlOperator
in the plugin manager under$AIRFLOW_HOME/plugins
.I used
CheckOperator
as an example and I modified the returned value: the basic running of this operator was exactly the reverse of what I needed.Here's the of the default
ExecuteSqlOperator
: CheckOperatorand here is my customized
SqlSensor
:ReverseSqlSensor