如何“创造” /“分配”为谷歌云PubSub的伐木处理?(How do I “create”/“as

2019-10-29 05:14发布

从发展的前一个线程发现提出的问题时,假设是题外话(子实际上不会造成问题),所以我正在做一个更集中的岗位。

我的错误信息:

没有处理程序可以为记录器“google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager”中找到

我的意图:

通过在谷歌PubSub的消息属性为Python的变量,以在以后的代码再利用。

我的代码:

import time
import logging

from google.cloud import pubsub_v1

project_id = "redacted"
subscription_name = "redacted"

def receive_messages_with_custom_attributes(project_id, subscription_name):
    """Receives messages from a pull subscription."""
    # [START pubsub_subscriber_sync_pull_custom_attributes]

    subscriber = pubsub_v1.SubscriberClient()
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name)

    def callback(message):
        print('Received message: {}'.format(message.data))
        if message.attributes:
            #print('Attributes:')
            for key in message.attributes:
                value = message.attributes.get(key);
                #commented out to not print to terminal
                #which should not be necessary
                #print('{}: {}'.format(key, value))
        message.ack()

        print("this is before variables")
        dirpath = "~/subfolder1/"
        print(dirpath)
        namepath = message.data["name"]
        print(namepath)
        fullpath = dirpath + namepath
        print(fullpath)
        print("this is after variables")


    subscriber.subscribe(subscription_path, callback=callback)
    # The subscriber is non-blocking, so we must keep the main thread from
    # exiting to allow it to process messages in the background.
    print('Listening for messages on {}'.format(subscription_path))
    while True:
        time.sleep(60)
    # [END pubsub_subscriber_sync_pull_custom_attributes]

receive_messages_with_custom_attributes(project_id, subscription_name)

运行上面的代码我全控制台输出:

Listening for messages on projects/[redacted]
Received message: {
  "kind": "storage#object",
  "id": "[redacted]/0.testing/1548033442364022",
  "selfLink": "https://www.googleapis.com/storage/v1/b/[redacted]/o/BSD%2F0.testing",
  "name": "BSD/0.testing",
  "bucket": "[redacted]",
  "generation": "1548033442364022",
  "metageneration": "1",
  "contentType": "application/octet-stream",
  "timeCreated": "2019-01-21T01:17:22.363Z",
  "updated": "2019-01-21T01:17:22.363Z",
  "storageClass": "MULTI_REGIONAL",
  "timeStorageClassUpdated": "2019-01-21T01:17:22.363Z",
  "size": "0",
  "md5Hash": "1B2M2Y8AsgTpgAmY7PhCfg==",
  "mediaLink": "https://www.googleapis.com/download/storage/v1/b/[redacted]/o/BSD%2F0.testing?generation=1548033442364022&alt=media",
  "crc32c": "AAAAAA==",
  "etag": "CPb0uvvZ/d8CEAE="
}

this is before variables
/home/[redacted]
No handlers could be found for logger "google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager"

正如可以看到,第一串和串定义-AS-可变印,但在试图从刚刚生成的词典定义变量的代码断裂,并没有进一步print()被处死秒。

可能相关的线程 ,该用户用cron作业发布,发现从crontab中envpaths一个解决方法,但我的情况是接收和不使用任何cron作业,但可能会暗示另一层背后/蟒之内?

任何人都可以请帮我添加一个处理程序,使这个代码运行按预期?

Answer 1:

首先,如果我理解正确的,你都出现在你的输出什么,你正在使用的Pub / Sub通知每当您更改云存储对象发送消息。 这些信息可能是有益的。

现在, message.data["name"]是行不通的,因为message.data是一个bytes对象 。 因此,不能被索引为一个字典。

把它当作一个快译通,你首先必须将它为base64(解码import base64 )。 在这之后,你留下是一个字符串,它看起来像JSON格式。 然后,使用json.load() 不要忘记import json这个字符串转换成字典。 现在,你可以索引的消息。

该代码,这将是:

print("This is before variables")
dirpath = "/subfolder1/"
print(dirpath)

#Transform the bytes object into a string by decoding it
namepath = base64.b64decode(message.data).decode('utf-8')

#Transform the json formated string into a dict
namepath = json.loads(namepath)

print(namepath["name"])
fullpath = dirpath + namepath["name"]
print(fullpath)
print("this is after variables")

现在,如果你的目的是只读属性,它们是正确的顶部定义如下:

    if message.attributes:
        print('Attributes:')
        for key in message.attributes:
            value = message.attributes.get(key)
            print('{}: {}'.format(key, value))

所以,你可以使用:

    print("this is before variables")
    dirpath = "~/subfolder1/"
    print(dirpath)
    namepath = message.attributes["objectId"]
    print(namepath)
    fullpath = dirpath + namepath
    print(fullpath)
    print("this is after variables")

请记住,对于这种特殊情况下, "objectId"是文件的名称,因为它是从发布/订阅的通知云存储使用的属性。 如果你假装发送自定义消息,更改"objectId"到你想要的属性名称。



Answer 2:

如纳乌艾尔乌和tripleee解释的那样,问题是与消息是字节而不是字符串。 然而,他们的代码没有完全工作,而且还抛出了错误,我不知道为什么。 通过交叉参考与PubSub的AppEngine上的网站谷歌的示例代码,和几个小时的反复试验,我发现下面的代码是工作。 可能是不雅和/或有不良行为,在这种情况下,请编辑它,使其更加坚固。

#Continues from after message.ack(), above code remains unchanged
#except needing to <import json>

    #this makes a message.data a true python dict with strings.
    payload = json.loads(message.data.decode('utf-8')) 

    #this finds the value of the dict with key "name"
    namepath = payload["name"]

    #this is just a static string to pre-pend to the file path
    dirpath = "/home/[redacted]/"

    #combine them into a single functioning path
    fullpath = dirpath + namepath

    #currently type 'unicode', so convert them to type 'str'
    fullpath = fullpath.encode("utf-8")

和结束时,我们将具有被纯粹键入“STR”到由后面的功能/命令可使用FULLPATH。



文章来源: How do I “create”/“assign” a logging handler for Google Cloud Pubsub?