I have a simple python script that uses Google pubsub to detect new files in the google cloud storage. The script simply adds new messages to a queue where another thread processes those messages:
subscriber = pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(
project, subscription_name)
subscriber.subscribe(subscription_path, callback=callback_fun)
while True:
if not message_queue:
time.sleep(60)
continue
else:
process_next_message(message_queue.pop())
Here, callback_fun simply adds the message to the queue:
def callback_fun(message):
message.ack()
message_queue.append(message)
The problem I am having is that after a while (maybe a couple of days), the subscriber stops receiving new file notifications. If I stop and restart the script, it gets all of the notifications at once.
I was wondering if anyone else is having similar issues and/or can suggest ways to troubleshoot (maybe by printing debugging messages that are normally unseen). I am now trying to stop/restart the subscriber, but I am sure that this is not the best idea for using in a production environment.
I am using google-cloud 0.32.0 and google-cloud-pubsub 0.30.1.
Thanks.
In general, there can be several reasons why a subscriber may stop receiving messages:
- If a subscriber does not ack or nack messages, the flow control limits can be reached, meaning no more messages can be delivered. This does not seem to be the case in your particular instance given that you immediately ack messages. As an aside, I would recommend against acking messages before your queue has processed them unless you are okay with the possibility of messages not being processed. Otherwise, if your app crashes after the ack, but before the message queue processes them, you will have not processed the message and will not get it redelivered since it was acked.
- If another subscriber starts up for the same subscription, it could be receiving the messages. In this scenario, one would expect the subscriber to receive a subset of the messages rather than no messages at all.
- Publishers just stop publishing messages and therefore there are no messages to receive. If you restart the subscriber and it starts receiving messages again, this probably isn't the case. You can also verify that a backlog is being built up by looking at the Stackdriver metric for subscription/backlog_bytes.
If your problem does not fall into one of those categories, it would be best to reach out to Google Cloud support with your project name, topic name, and subscription name so that they can narrow down the issue to either your user code, the client library, or the service.
Apart from the flow control suggestion I offered in my previous comment, you could also define a Cloud Function that gets triggered any time a new message is published in a Pub/Sub topic. These Cloud Functions act as subscriptions and will get notified every time a certain event (such as a message being published) occurs.
This tutorial will help you to develop a background Cloud Function that will get triggered when a message is published in a Pub/Sub topic.