Python Asyncio run_forever() and Tasks

2020-03-30 04:09发布

问题:

I adapted this code for using Google Cloud PubSub in Async Python: https://github.com/cloudfind/google-pubsub-asyncio

import asyncio
import datetime
import functools
import os

from google.cloud import pubsub
from google.gax.errors import RetryError
from grpc import StatusCode

async def message_producer():
    """ Publish messages which consist of the current datetime """
    while True:
        await asyncio.sleep(0.1)


async def proc_message(message):
    await asyncio.sleep(0.1)
    print(message)
    message.ack()


def main():
    """ Main program """
    loop = asyncio.get_event_loop()

    topic = "projects/{project_id}/topics/{topic}".format(
        project_id=PROJECT, topic=TOPIC)
    subscription_name = "projects/{project_id}/subscriptions/{subscription}".format(
        project_id=PROJECT, subscription=SUBSCRIPTION)

    subscription = make_subscription(
        topic, subscription_name)

    def create_proc_message_task(message):
        """ Callback handler for the subscription; schedule a task on the event loop """
        print("Task created!")
        task = loop.create_task(proc_message(message))

    subscription.open(create_proc_message_task)
    # Produce some messages to consume

    loop.create_task(message_producer())

    print("Subscribed, let's do this!")
    loop.run_forever()


def make_subscription(topic, subscription_name):
    """ Make a publisher and subscriber client, and create the necessary resources """
    subscriber = pubsub.SubscriberClient()
    try:
        subscriber.create_subscription(subscription_name, topic)
    except:
        pass
    subscription = subscriber.subscribe(subscription_name)

    return subscription


if __name__ == "__main__":
    main()

I basically removed the publishing code and only use the subscription code. However, initially I did not include the loop.create_task(message_producer()) line. I figured that tasks were created as they were supposed to however they never actually run themselves. Only if I add said line the code properly executes and all created Tasks run. What causes this behaviour?

回答1:

PubSub is calling the create_proc_message_task callback from a different thread. Since create_task is not thread-safe, it must only be called from the thread that runs the event loop (typically the main thread). To correct the issue, replace loop.create_task(proc_message(message)) with asyncio.run_coroutine_threadsafe(proc_message(message), loop) and message_producer will no longer be needed.

As for why message_producer appeared to fix the code, consider that run_coroutine_threadsafe does two additional things compared to create_task:

  • It operates in a thread-safe fashion, so the event loop data structures are not corrupted when this is done concurrently.
  • It ensures that the event loop wakes up at the soonest possible opportunity, so that it can process the new task.

In your case create_task added the task to the loop's runnable queue (without any locking), but failed to ensure the wakeup, because that is not needed when running in the event loop thread. The message_producer then served to force the loop to wake up in regular intervals, which is when it also checks and executes the runnable tasks.