Run synchronous pull in Google Cloud Pub/Sub with

2019-06-16 11:50发布

问题:

I can't find the returnImmediately flag in the Python client API. Is there any specific reason for that? Is there another way to pull queued message synchronously from a subscription in Python?

回答1:

Google doesn't provide something like this. But you can easily workaround it by implementing your own Queue

from Queue import Queue

from google.cloud import pubsub

subscriber = pubsub.SubscriberClient()
topic = "projects/newproject-xxxxx/topics/tarunlalwani"
subscription_name = 'projects/newproject-xxxxx/subscriptions/sub1'

class SynchronousSubscription(object):

    def callback(self, message):
        print(message.data)
        message.ack()
        self.pending_messages.put(message)

    def __init__(self, subscription):
        self.subscription_future = subscriber.subscribe(subscription_name, self.callback)
        self.pending_messages = Queue()

    def consume_sync(self):
        return self.pending_messages.get()

sub = SynchronousSubscription(subscription_name)
data = sub.consume_sync()

And it does work great for me when I tested



回答2:

Expanding on previous answer:

Currently there exists a function with desired functionality, here is the piece of documentation from subscriber_client.py:

def pull(self,
         subscription,
         max_messages,
         return_immediately=None,
         options=None):
    ...
Args:
    ...
      return_immediately (bool): If this field set to true, the system 
        will respond immediately even if
        it there are no messages available to return in the ``Pull`` response.
        Otherwise, the system may wait (for a bounded amount of time) until at
        least one message is available, rather than returning no messages. The
        client may cancel the request if it does not wish to wait any longer for
        the response.

But the execution, read this comment first, returns two exceptions(the one I present is an aggregate of two):

RetryError(Exception occurred in retry method that was not classified as transient, caused by <_Rendezvous of RPC that terminated with (StatusCode.INVALID_ARGUMENT, A required argument is missing in the request: (argument="max_messages").)>)

There is a related issue if you want more details.



回答3:

The Cloud Pub/Sub client library does not directly expose the pull method, instead offering an asynchronous API designed for efficiently receiving messages. If you have specific reasons to want to call the synchronous pull method (including using the returnImmediately property), then you'll need to generate the gRPC-based library. You will need to fetch the service definition and then generate the client. Alternatively, you can make an HTTP request using the REST API version of pull.



回答4:

Google cloud's previous official gcloud python library (last version was 0.18.3, available in pip) does have stable support for pull functions in idiomatic python. Although the Cloud Pub/Sub API is GA and thus this deprecated library should be stable, be aware that this library won't be getting any updates. I've used it extensively in the past two years without incident.

from gcloud import pubsub

# Connect to pubsub
client = pubsub.Client(project='myproject')

topic = client.topic('mytopic')
sub = topic.subscription('mysub')
if not topic.exists():
    topic.create()
if not sub.exists():
    sub.create()

# In your code, use a try-except for this pull and handle failures appropriately
recv = sub.pull(return_immediately=False, max_messages=1)

ack_id, msg = recv[0]

msg_attribute = msg.attributes['myattribute']
msg_data = msg.data

sub.acknowledge([ack_id, ])