AWS IoT Python SDK and asyncio

2019-07-13 08:36发布

问题:

I need to use AWS IoT MQTT service. I am doing some experimenting with https://github.com/aws/aws-iot-device-sdk-python currently.

My application will be using websockets to communicate with another service, and then publish / subscribe to MQTT topics to forward / receive messages.

Is it likely that this library will be blocking code execution? I still try to get my head around asyncio, and not sure what things I should be looking out for. How do I know if it will cause problems?

I believe I will only need to use AWSIoTMQTTClient from above library .

This is an extract from the working code I have:

class AWSIoTClient:

    def __init__():
        ...
        self.client = AWSIoTMQTTClient(...)

    def subscribe(self, callback):
        self.client.subscribe(f'{self.TOPIC}/subscribe/', 0, callback)

    def publish(self, message):
        self.client.publish(self.TOPIC, message, 0)


class MyWSProtocol(WebSocketClientProtocol):

    def set_aws_client(self, client: AWSIoTClient):
        client.subscribe(self.customCallback)
        self.client = client

    def customCallback(self, client, userdata, message):
        # This will be called when we send message from AWS
        if message.payload:
            message = json.loads(message.payload.decode('utf-8').replace("'", '"'))
            message['id'] = self.next_id()
            self.sendMessage(json.dumps(message).encode('utf-8'))

    def onMessage(self, payload, isBinary):
        message = json.loads(payload)

        # This will forward message to AWS
        self.client.publish(str(payload))

回答1:

Is it likely that this library will be blocking code execution?

How do I know if it will cause problems?

You should not allow to having long-running blocking (synchronous) code inside any of your coroutines. It'll lead to blocking your global event loop and further blocking all of your coroutines everywhere.

async def main():
    await asyncio.sleep(3)  # async sleeping, it's ok

    time.sleep(3)           # synchronous sleeping, this freezes event loop 
                            # and all coroutines for 3 seconds, 
                            # you should avoid it!

    await asyncio.sleep(3)  # async sleeping, it's ok

If you need to run blocking code inside coroutine you should do it in executor (read here about it).

You should keep it in mind when you writing coroutines, but usually asyncio will warn you about this error if you'll enable debug mode:

import asyncio
import time


async def main():
    await asyncio.sleep(3)
    time.sleep(3)
    await asyncio.sleep(3)


loop = asyncio.get_event_loop()
loop.set_debug(True)  # debug mode
try:
    loop.run_until_complete(main())
finally:
    loop.run_until_complete(loop.shutdown_asyncgens())
    loop.close()

You'll see warning:

Executing <Handle <TaskWakeupMethWrapper object at 0x000002063C2521F8>(<Future finis...events.py:275>) created at C:\Users\gmn\AppData\Local\Programs\Python\Python36\Lib\asyncio\futures.py:348> took 3.000 seconds