How to access the response from Airflow SimpleHttp

2019-04-09 19:08发布

问题:

I'm learning Airflow and have a simple quesiton. Below is my DAG called dog_retriever

import airflow
from airflow import DAG
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.operators.sensors import HttpSensor
from datetime import datetime, timedelta
import json



default_args = {
    'owner': 'Loftium',
    'depends_on_past': False,
    'start_date': datetime(2017, 10, 9),
    'email': 'rachel@loftium.com',
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 3,
    'retry_delay': timedelta(minutes=3),
}

dag = DAG('dog_retriever',
    schedule_interval='@once',
    default_args=default_args)

t1 = SimpleHttpOperator(
    task_id='get_labrador',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breed/labrador/images',
    headers={"Content-Type": "application/json"},
    dag=dag)

t2 = SimpleHttpOperator(
    task_id='get_breeds',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breeds/list',
    headers={"Content-Type": "application/json"},
    dag=dag)

t2.set_upstream(t1)

As a means to test out Airflow, I'm simply making two GET requests to some endpoints in this very simple http://dog.ceo API. The goal is to learn how to work with some data retrieved via Airflow

The execution is working- my code successfully calls the enpoints in tasks t1 and t2, I can see them being logged in the Airflow UI, in the correct order based on the set_upstream rule I wrote.

What I cannot figure out is how to ACCESS the json response of these 2 tasks. It seems so simple, but I cannot figure it out. In the SimpleHtttpOperator I see a param for response_check, but nothing to simply print, or store, or view the json response.

Thanks.

回答1:

So since this is SimpleHttpOperator and the actual json is pushed to XCOM and you can get it from there. Here is the line of code for that action: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/http_operator.py#L87

What you need to do is set xcom_push=True, so your first t1 will be the following:

t1 = SimpleHttpOperator(
    task_id='get_labrador',
    method='GET',
    http_conn_id='http_default',
    endpoint='api/breed/labrador/images',
    headers={"Content-Type": "application/json"},
    xcom_push=True,
    dag=dag)

You should be able to find all JSON with return value in XCOM, more detail of XCOM can be found at: https://airflow.incubator.apache.org/concepts.html#xcoms