数据流无法从PubSub的邮件推送到的BigQuery(Dataflow failing to pu

2019-10-31 06:36发布

我想现在的工作数据管道。 我使用Python客户端库插入记录到PubSub的。 从那里数据流应该把它捡起来,然后推入BQ。 数据流是failing.My猜测是因为我没有对数据进行正确的编码。 我的代码如下所示:

data = base64.b64encode(message) publisher.publish(topic_path, data=data)

其中,消息是一个字符串。 这是我试图把JSON对象:

{ “current_speed”: “19.77”, “_east”: “-87.654561”, “_last_updt”: “2018年7月17日15:31:30.0”, “_region_id”: “1”, “_north”: “42.026444” “_south”: “41.997946”, “区”: “罗杰斯公园 - 西山脊”, “_west”: “-87.709645”, “_description”:} “德文Kedzie到湖岸的北方。”

我已经试过几个这样的变化,我可以看到B64或在发布 - 订阅JSON数据。

当我看到JSON我认为是这样的:

Kedzie到湖岸 'u'_east ':U'87.654561',u'region ':u'Rogers公园 - 西山脊',u'_w​​est ':U'87.709645',u'current_speed':u'21.82 ”,u'_last_updt ':u'2018-07-18 10:10:48.0',u'_region_id ':U'1'}│154626108014988││└────────────── ────────────────────────────────────────────────── ───────────

注意在每个元件的前方的其他与u。 那是因为我在做一个UTF-8编码。 是搞乱的东西了? 我使用data = data.encode('utf-8')这一段代码做UTF-8如下所述: https://cloud.google.com/pubsub/docs/publisher

我使用这个命令检查我的内容发布订阅: gcloud pubsub subscriptions pull --auto-ack debug_subscription

问:我应该怎么我的主题中看到了什么? JSON或二进制? 是否有任何Python示例这表明有效载荷进行加密,以便它可以通过PubSub的到BQ模板拿起正确的方式?

Answer 1:

为什么使用data = base64.b64encode(message) ? 什么是message到底是什么?

我想这个片段与发布/订阅BigQuery的数据流提供的模板,它的工作原理:

def publish_messages(project, topic_name):
    """Publishes multiple messages to a Pub/Sub topic."""
    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project, topic_name)

    for n in range(1, 6):
        data = u'{"column1": "value1","column2": "value2"}'
        # Data must be a bytestring
        data = data.encode('utf-8')
        publisher.publish(topic_path, data=data)

    print('Published messages.')

试试这个没有base64编码。

[ 发布/订阅Python代码 ] [ 数据流模板 ]



文章来源: Dataflow failing to push messages to BigQuery from PubSub