asyncio multiple concurrent servers

2019-06-26 02:59发布

问题:

I'm trying to use Python's asyncio to run multiple servers together, passing data between them. For my specific case I need a web server with websockets, a UDP connection to an external device, as well as database and other interactions. I can find examples of pretty much any of these individually but I'm struggling to work out the correct way to have them run concurrently with data being pushed between them.

The closest I have found here is here: Communicate between asyncio protocol/servers (although I've been unable to make it run on Python 3.6)

For a more concrete example: How would I take the following aiohttp example code from https://github.com/aio-libs/aiohttp:

from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == web.MsgType.text:
            await ws.send_str("Hello, {}".format(msg.data))
        elif msg.type == web.MsgType.binary:
            await ws.send_bytes(msg.data)
        elif msg.type == web.MsgType.close:
            break

    return ws


app = web.Application()
app.router.add_get('/echo', wshandler)
app.router.add_get('/', handle)
app.router.add_get('/{name}', handle)

web.run_app(app)

and the following TCP echo server sample (http://asyncio.readthedocs.io/en/latest/tcp_echo.html):

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print("Received %r from %r" % (message, addr))

    print("Send: %r" % message)
    writer.write(data)
    await writer.drain()

    print("Close the client socket")
    writer.close()

loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

and combine them into a single script where any messages received via either websockets or the TCP echo server were sent out to all clients of either?

And how would I add a piece of code that (say) every second sent a message to all clients (for the sake of argument the current timestamp)?

回答1:

First you need to get all of your coroutines into a single event loop. You can start by avoiding convenience APIs that start the event loop for you such as run_app. Instead of web.run_app(app), write something like:

runner = aiohttp.web.AppRunner(app)
loop.run_until_complete(runner.setup())
# here you can specify the listen address and port
site = aiohttp.web.TCPSite(runner)    
loop.run_until_complete(site.start())

Then run the echo server setup, and both are ready to share the asyncio event loop. At the end of the script, start the event loop using loop.run_forever() (or in any other way that makes sense in your application).

To broadcast information to clients, create a broadcast coroutine and add it to the event loop:

# Broadcast data is transmitted through a global Future. It can be awaited
# by multiple clients, all of which will receive the broadcast. At each new
# iteration, a new future is created, to be picked up by new awaiters.
broadcast_data = loop.create_future()

async def broadcast():
    global broadcast_data
    while True:
        broadcast_data.set_result(datetime.datetime.now())
        broadcast_data = loop.create_future()
        await asyncio.sleep(1)

loop.create_task(broadcast())

Finally, await the broadcast in each coroutine created for a client, such as handle_echo:

def handle_echo(r, w):
    while True:
        data = await broadcast_data
        # data contains the broadcast datetime - send it to the client
        w.write(str(data))

It should be straightforward to modify the websockets handler coroutine to await and relay the broadcast data in the same manner.



回答2:

Based on the advice of @user4815162342 this is my "working" code. I'm posting it as an answer because it is a complete working script which achieves all the requirements of my original question, but it isn't perfect as it doesn't currently exit cleanly.

When run, it will accept web connections on port 8080 and tcp (eg telnet) connections on 8081. Any messages received via its web form or telnet will be broadcast to all connections. Additionally, every 5s the time will be broadcast.

Advice on how to exit cleanly (ctrl+C with web connections established generates multiple "Task was destroyed but it is pending!" errors) would be appreciated so I can update this answer.

(The code is quite long as it contains embedded HTML and JS for the websockets component.)

import asyncio
from aiohttp import web
import aiohttp
import datetime
import re

queues = []

loop = asyncio.get_event_loop()

# Broadcast data is transmitted through a global Future. It can be awaited
# by multiple clients, all of which will receive the broadcast. At each new
# iteration, a new future is created, to be picked up by new awaiters.
broadcast_data = loop.create_future()

def broadcast(msg):
    global broadcast_data
    msg = str(msg)
    print(">> ", msg)
    if not broadcast_data.done():
        broadcast_data.set_result(msg)
    broadcast_data = loop.create_future()

# Dummy loop to broadcast the time every 5 seconds
async def broadcastLoop():
    while True:
        broadcast(datetime.datetime.now())
#       print('#',end='',flush=True)
        await asyncio.sleep(5)

# Handler for www requests
async def wwwhandler(r):
    host = re.search('https?://([^/]+)/', str(r.url)).group(1)
    name = r.match_info.get('name', "Anonymous")
    text = """<!DOCTYPE html>
<html>
    <head>
        <title>WebSocket PHP Open Group Chat App</title>
        <!-- <link type="text/css" rel="stylesheet" href="style.css" /> -->
        <script>
var output;
var websocket;

function WebSocketSupport() {
    if (browserSupportsWebSockets() === false) {
        document.getElementById("ws_support").innerHTML = "<h2>Sorry! Your web browser does not supports web sockets</h2>";
        var element = document.getElementById("wrapper");
        element.parentNode.removeChild(element);
        return;
    }

    output = document.getElementById("chatbox");

    websocket = new WebSocket('ws:{{HOST}}/ws');
    websocket.onopen    = function(e) { writeToScreen("You have have successfully connected to the server"); };
    websocket.onmessage = function(e) { onMessage(e) };
    websocket.onerror   = function(e) { onError(e) };
}

function onMessage(e)   { writeToScreen('<span style="color: blue;"> ' + e.data + '</span>'); }
function onError(e)     { writeToScreen('<span style="color: red;">ERROR:</span> ' + e.data); }

function doSend(message) {
    var validationMsg = userInputSupplied();
    if (validationMsg !== '') {
        alert(validationMsg);
        return;
    }
    var chatname = document.getElementById('chatname').value;

//  document.getElementById('msg').value = "";
//  document.getElementById('msg').focus();
    var msg = chatname + ' says: ' + message;
    websocket.send(msg);
    writeToScreen(msg);
}

function writeToScreen(message) {
    var pre = document.createElement("p");
    pre.style.wordWrap = "break-word";
    pre.innerHTML = message;
    output.appendChild(pre);
}

function userInputSupplied() {
    var chatname = document.getElementById('chatname').value;
    var msg = document.getElementById('msg').value;
    if (chatname === '') { return 'Please enter your username'; } 
    if (msg === '') { return 'Please the message to send'; } 
    return '';
}

function browserSupportsWebSockets() {
    if ("WebSocket" in window) { return true; } else { return false; }
}
    </script>
    </head>
    <body onload="javascript:WebSocketSupport()">
        <div id="ws_support"></div>

        <div id="wrapper">
            <div id="menu">
                <h3 class="welcome">Welcome to WebSocket PHP Open Group Chat App v1</h3>
            </div>

            <div id="chatbox"></div>

            <div id ="controls">
                <label for="name"><b>Name</b></label>
                <input name="chatname" type="text" id="chatname" size="67" placeholder="Type your name here" value="MyName" />
                <input name="msg" type="text" id="msg" size="63" placeholder="Type your message here" value="Test" />
                <input name="sendmsg" type="submit"  id="sendmsg" value="Send" onclick="doSend(document.getElementById('msg').value)" />
            </div>
        </div>
    </body>
</html>"""
    text = text.replace('{{HOST}}', host)
    return web.Response(text=text, headers={'content-type':'text/html'})

# Handler for websocket connections
async def wshandler(r):
    # Get the websocket connection
    ws = web.WebSocketResponse()
    await ws.prepare(r)    
    # Append it to list so we can manage it later if needed
    r.app['websockets'].append(ws)

    try:
        # Create the broadcast task, and add it to list for later management
        echo_task = asyncio.Task(echo_loop(ws))
        r.app['tasks'].append(echo_task)

        # Tell the world we've connected
        # Note: Connecting client won't get this message, not really sure why
        broadcast('Hello {}'.format(r.remote))
#       await ws.send_str('Hello {}'.format(r.remote))

        # Loop through any messages we get from the client
        async for msg in ws:
            # .. and broadcast them
            if msg.type == web.WSMsgType.TEXT:
                print('<< ', msg.data)
                broadcast(msg.data)
                #            await ws.send_str("Hello, {}".format(msg.data))
                #        elif msg.type == web.WSMsgType.BINARY:
                #            await ws.send_bytes(msg.data)
            elif msg.type == web.WSMsgType.CLOSE:
                print('WS Connection closed')
                break
            elif msg.type == web.WSMsgType.ERROR:
                print('WS Connection closed with exception %s' % ws.exception())
                break
            else:
                print('WS Connection received unknown message type %2' % msg.type)

        # ws has stopped sending us data so broadcast goodbye 
        broadcast('Goodbye {}'.format(r.remote))
    except GeneratorExit:
        pass
    finally:
        # Close the ws and remove it from the list
        await ws.close()
        r.app['websockets'].remove(ws)

        # Cancel the task and remove it from the list
        # Note: cancel() only requests cancellation, it doesn't wait for it
        echo_task.cancel()
        r.app['tasks'].remove(echo_task)

    return ws

# ws broadcast loop: Each WS connection gets one of these which waits for broadcast data then sends it
async def echo_loop(ws):
    while True:
        msg = await broadcast_data
        await ws.send_str(str(msg))

# web app shutdown code: cancels any open tasks and closes any open websockets
# Only partially working   
async def on_shutdown(app):
    print('Shutting down:', end='')
    for t in app['tasks']:
        print('#', end='')
        if not t.cancelled():
            t.cancel()
    for ws in app['websockets']:
        print('.', end='')
        await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server Shutdown')
    print(' Done!')

# Code to handle TCP connections
async def echo_loop_tcp(writer):
    while True:
        msg = await broadcast_data
        writer.write( (msg + "\r\n").encode() )
        await writer.drain()

async def handle_echo(reader, writer):
    echo_task = asyncio.Task(echo_loop_tcp(writer))
    while True:
        data = await reader.readline()
        if not data:
            break
        message = data.decode().strip()
#       addr = writer.get_extra_info('peername')
        broadcast(message)

    print("Connection dropped")
    echo_task.cancel()

tcpServer = loop.run_until_complete(asyncio.start_server(handle_echo, '0.0.0.0', 8081, loop=loop))
print('Serving on {}'.format(tcpServer.sockets[0].getsockname()))

# The application code:
app = web.Application()
app['websockets'] = []
app['tasks'] = []
app.router.add_get('/ws', wshandler)
app.router.add_get('/', wwwhandler)
app.router.add_get('/{name}', wwwhandler)
app.on_shutdown.append(on_shutdown)

def main():
    # Kick off the 5s loop
    tLoop=loop.create_task(broadcastLoop())

    # Kick off the web/ws server
    async def start():
        global runner, site
        runner = web.AppRunner(app)
        await runner.setup()
        site = web.TCPSite(runner, '0.0.0.0', 8080)
        await site.start()

    async def end():
        await app.shutdown()

    loop.run_until_complete(start())

    # Main program "loop"
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        # On exit, kill the 5s loop
        tLoop.cancel()
        # .. and kill the web/ws server
        loop.run_until_complete( end() )

    # Stop the main event loop
    loop.close()

if __name__ == '__main__':
    main()