Python async websocket client with async timer

2019-07-18 00:19发布

问题:

I need to have a long running websocket client that receives push messages from a websocket server and I need to monitor the client's connection state: if the connection goes down, I need to find out.

My approach is to periodically log a constant string, and trigger an alarm if ever the log message is not detected.

My idea: 1) have a websocket client that responds to irregularly incoming messages. And 2) at the same time have loop that stops logging a message when the websocket client throws a ConnectionClosed exeption.

I am intrigued by the new 3.5 async syntax. This websocket implementation is specifically based on the asyncio. The client in the docs look exactly like what I need.

However, I have no idea how to add a second coroutine that does my logging statements and somehow stops when the websocket connection throws the ConnectionClosed.

Here is something to start the conversation but that doesn't work because the alive method blocks the event loop. What I am looking for is an elegant solution to run both methods concurrently.

#!/usr/bin/env python

import asyncio
import logging

import websockets

logger = logging.getLogger(__name__)

is_alive = True


async def alive():
    while is_alive:
        logger.info('alive')
        await asyncio.sleep(300)


async def async_processing():
    async with websockets.connect('ws://localhost:8765') as websocket:
        while True:
            try:
                message = await websocket.recv()
                print(message)

            except websockets.exceptions.ConnectionClosed:
                print('ConnectionClosed')
                is_alive = False
                break


asyncio.get_event_loop().run_until_complete(alive())
asyncio.get_event_loop().run_until_complete(async_processing())

回答1:

Actually the run_until_complete is blocking here, since it waits until alive finish.

You can solve it with 2 steps:

  1. schedule coroutines with asyncio.ensure_future (immediately runs without waiting for results), each returning task.
  2. wait for tasks to get finished with asyncio.wait

The code like:

tasks = [
   asyncio.ensure_future(alive()),
   asyncio.ensure_future(async_processing())
]
asyncio.get_event_loop().run_until_complete(asyncio.wait(tasks))

As @Vincent mentioned wait accepts tasks, so ensure_future is needless:

asyncio.get_event_loop().run_until_complete(asyncio.wait([   
   alive(),
   async_processing()
]))