Airflow: How to push xcom value from PostgreOperat

2019-02-18 18:53发布

问题:

I'm using Airflow 1.8.1 and I want to push the result of a sql request from PostgreOperator.

Here's my tasks:

check_task = PostgresOperator(
    task_id='check_task',
    postgres_conn_id='conx',
    sql="check_task.sql",
    xcom_push=True,
    dag=dag)

def py_is_first_execution(**kwargs):
    value = kwargs['ti'].xcom_pull(task_ids='check_task')
    print 'count ----> ', value
    if value == 0:
       return 'next_task'
    else:
       return 'end-flow'

check_branch = BranchPythonOperator(
    task_id='is-first-execution',
    python_callable=py_is_first_execution,
    provide_context=True,
    dag=dag)

and here is my sql script:

select count(1) from table

when i check the xcom value from check_task it retrieves none value.

回答1:

If i'm correct, airflow automatically pushes to xcom when a query returns a value. However, when you look at the code of the postgresoperator you see that it has an execute method that calls the run method of the PostgresHook (extension of dbapi_hook). Both methods do not return anything, as such it pushes nothing to xcom. What we did to fix this is create a CustomPostgresSelectOperator, a copy of the PostgresOperator, but instead of 'hook.run(..)' do 'return hook.get_records(..)'.

Hope that helps you.



回答2:

Finally, I created a new Sensor ExecuteSqlOperator in the plugin manager under $AIRFLOW_HOME/plugins.

I used CheckOperator as an example and I modified the returned value: the basic running of this operator was exactly the reverse of what I needed.

Here's the of the default ExecuteSqlOperator: CheckOperator

and here is my customized SqlSensor: ReverseSqlSensor

class SqlExecuteOperator(BaseOperator):
"""
Performs checks against a db. The ``CheckOperator`` expects
a sql query that will return a single row.

Note that this is an abstract class and get_db_hook
needs to be defined. Whereas a get_db_hook is hook that gets a
single record from an external source.
:param sql: the sql to be executed
:type sql: string
"""

template_fields = ('sql',)
template_ext = ('.hql', '.sql',)
ui_color = '#fff7e6'

@apply_defaults
def __init__(
        self, sql,
        conn_id=None,
        *args, **kwargs):
    super(SqlExecuteOperator, self).__init__(*args, **kwargs)
    self.conn_id = conn_id
    self.sql = sql

def execute(self, context=None):
    logging.info('Executing SQL statement: ' + self.sql)
    records = self.get_db_hook().get_first(self.sql)
    logging.info("Record: " + str(records))
    records_int = int(records[0])
    print (records_int)
    return records_int

def get_db_hook(self):
    return BaseHook.get_hook(conn_id=self.conn_id)