I can't find the returnImmediately flag in the Python client API. Is there any specific reason for that? Is there another way to pull queued message synchronously from a subscription in Python?
问题:
回答1:
Google doesn't provide something like this. But you can easily workaround it by implementing your own Queue
from Queue import Queue
from google.cloud import pubsub
subscriber = pubsub.SubscriberClient()
topic = "projects/newproject-xxxxx/topics/tarunlalwani"
subscription_name = 'projects/newproject-xxxxx/subscriptions/sub1'
class SynchronousSubscription(object):
def callback(self, message):
print(message.data)
message.ack()
self.pending_messages.put(message)
def __init__(self, subscription):
self.subscription_future = subscriber.subscribe(subscription_name, self.callback)
self.pending_messages = Queue()
def consume_sync(self):
return self.pending_messages.get()
sub = SynchronousSubscription(subscription_name)
data = sub.consume_sync()
And it does work great for me when I tested
回答2:
Expanding on previous answer:
Currently there exists a function with desired functionality, here is the piece of documentation from subscriber_client.py:
def pull(self,
subscription,
max_messages,
return_immediately=None,
options=None):
...
Args:
...
return_immediately (bool): If this field set to true, the system
will respond immediately even if
it there are no messages available to return in the ``Pull`` response.
Otherwise, the system may wait (for a bounded amount of time) until at
least one message is available, rather than returning no messages. The
client may cancel the request if it does not wish to wait any longer for
the response.
But the execution, read this comment first, returns two exceptions(the one I present is an aggregate of two):
RetryError(Exception occurred in retry method that was not classified as transient, caused by <_Rendezvous of RPC that terminated with (StatusCode.INVALID_ARGUMENT, A required argument is missing in the request: (argument="max_messages").)>)
There is a related issue if you want more details.
回答3:
The Cloud Pub/Sub client library does not directly expose the pull method, instead offering an asynchronous API designed for efficiently receiving messages. If you have specific reasons to want to call the synchronous pull method (including using the returnImmediately property), then you'll need to generate the gRPC-based library. You will need to fetch the service definition and then generate the client. Alternatively, you can make an HTTP request using the REST API version of pull.
回答4:
Google cloud's previous official gcloud
python library (last version was 0.18.3, available in pip) does have stable support for pull functions in idiomatic python. Although the Cloud Pub/Sub API is GA and thus this deprecated library should be stable, be aware that this library won't be getting any updates. I've used it extensively in the past two years without incident.
from gcloud import pubsub
# Connect to pubsub
client = pubsub.Client(project='myproject')
topic = client.topic('mytopic')
sub = topic.subscription('mysub')
if not topic.exists():
topic.create()
if not sub.exists():
sub.create()
# In your code, use a try-except for this pull and handle failures appropriately
recv = sub.pull(return_immediately=False, max_messages=1)
ack_id, msg = recv[0]
msg_attribute = msg.attributes['myattribute']
msg_data = msg.data
sub.acknowledge([ack_id, ])