Dataflow failing to push messages to BigQuery from

2019-08-19 14:08发布

问题:

I am trying to now work a data pipeline. I am using the Python client library to insert the record into PubSub. From there DataFlow is supposed to pick it up and then push into BQ. Dataflow is failing.My guess is because I don't have the right encoding for the data. My code looks like this:

data = base64.b64encode(message) publisher.publish(topic_path, data=data)

where message is a string. This is the json object which I am trying to push:

{ "current_speed" : "19.77", "_east" : "-87.654561", "_last_updt" : "2018-07-17 15:31:30.0", "_region_id" : "1", "_north" : "42.026444", "_south" : "41.997946", "region" : "Rogers Park - West Ridge", "_west" : "-87.709645", "_description" : "North of Devon. Kedzie to Lake Shore" }

I have tried couple of variations of this and I can either see the data in b64 or in json in pubsub.

When I see json I see it like this:

─────┬────────────┐ │ DATA │ MESSAGE_ID │ ATTRIBUTES │ ─────┼────────────┤ │ {u'_south': u'41.997946', u'_north': u'42.026444', u'_description': u'North of Devon. Kedzie to Lake Shore', u'_east': u'-87.654561', u'region': u'Rogers Park - West Ridge', u'_west': u'-87.709645', u'current_speed': u'21.82', u'_last_updt': u'2018-07-18 10:10:48.0', u'_region_id': u'1'} │ 154626108014988 │ │ └───────────────────────────────────────────────────────────────────────────

Notice the additional u in front of each element. That is because I was doing a UTF-8 encoding. Is that messing things up? I am using data = data.encode('utf-8') this piece of code to do utf-8 as described here: https://cloud.google.com/pubsub/docs/publisher

I am checking my content in pubsub using this command: gcloud pubsub subscriptions pull --auto-ack debug_subscription

Q. What should I see in my topic? json or binary? Is there any python sample which shows the right way to encrypt the payload so that it can picked up by the pubsub to BQ template?

回答1:

Why do you use data = base64.b64encode(message)? What is message exactly?

I tried this snippet with Pub/Sub to BigQuery provided Dataflow template, and it works:

def publish_messages(project, topic_name):
    """Publishes multiple messages to a Pub/Sub topic."""
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project, topic_name)

    for n in range(1, 6):
        data = u'{"column1": "value1","column2": "value2"}'
        # Data must be a bytestring
        data = data.encode('utf-8')
        publisher.publish(topic_path, data=data)

    print('Published messages.')

Try this without encoding in base64.

[Pub/Sub python code] [Dataflow templates]