异步IO多个并发服务器

4
我正在尝试使用Python的asyncio来同时运行多个服务器,实现它们之间的数据传递。对于我的特定情况,我需要一个带有Websockets的Web服务器,一个与外部设备的UDP连接,以及数据库和其他交互。我可以找到几乎任何一个单独存在的示例,但我很难弄清楚正确的方式来让它们并发运行,并在它们之间推送数据。
我找到的最接近的示例在这里:Communicate between asyncio protocol/servers(虽然我无法在Python 3.6上运行它)。
以更具体的示例为例:我如何使用以下来自https://github.com/aio-libs/aiohttp的aiohttp示例代码:
from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', "Anonymous")
    text = "Hello, " + name
    return web.Response(text=text)

async def wshandler(request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)

    async for msg in ws:
        if msg.type == web.MsgType.text:
            await ws.send_str("Hello, {}".format(msg.data))
        elif msg.type == web.MsgType.binary:
            await ws.send_bytes(msg.data)
        elif msg.type == web.MsgType.close:
            break

    return ws


app = web.Application()
app.router.add_get('/echo', wshandler)
app.router.add_get('/', handle)
app.router.add_get('/{name}', handle)

web.run_app(app)

以下是TCP回显服务器示例 (http://asyncio.readthedocs.io/en/latest/tcp_echo.html):

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')
    print("Received %r from %r" % (message, addr))

    print("Send: %r" % message)
    writer.write(data)
    await writer.drain()

    print("Close the client socket")
    writer.close()

loop = asyncio.get_event_loop()
coro = asyncio.start_server(handle_echo, '127.0.0.1', 8888, loop=loop)
server = loop.run_until_complete(coro)

# Serve requests until Ctrl+C is pressed
print('Serving on {}'.format(server.sockets[0].getsockname()))
try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

# Close the server
server.close()
loop.run_until_complete(server.wait_closed())
loop.close()

将它们合并为单个脚本,使通过WebSockets或TCP回显服务器接收到的任何消息都发送给两者所有客户端?

如何添加一段代码,以便每秒向所有客户端发送一条消息(例如当前时间戳)?


你好,我很好奇你是否使用我的答案中的代码(或其他方式)解决了这个问题? - user4815162342
我还在努力中,你的答案指引了我正确的方向,但我还没有完全到达目标! - Mark Rogers
@user4815162342 我已经把我的工作代码发布为答案,尽管它目前无法干净地退出,但是它基于你的建议并且可能对某些人有用。帮助使其能够干净退出将不胜感激! - Mark Rogers
2个回答

4

首先,您需要将所有协程放入单个事件循环中。您可以开始避免使用方便的API(例如run_app)来启动事件循环。而不是使用web.run_app(app),请编写类似以下的代码:

runner = aiohttp.web.AppRunner(app)
loop.run_until_complete(runner.setup())
# here you can specify the listen address and port
site = aiohttp.web.TCPSite(runner)    
loop.run_until_complete(site.start())

然后运行回显服务器设置,并且两者都准备好共享asyncio事件循环。在脚本末尾,使用loop.run_forever()(或以任何其他在应用程序中有意义的方式)启动事件循环。

要向客户端广播信息,请创建一个广播协程并将其添加到事件循环中:

# Broadcast data is transmitted through a global Future. It can be awaited
# by multiple clients, all of which will receive the broadcast. At each new
# iteration, a new future is created, to be picked up by new awaiters.
broadcast_data = loop.create_future()

async def broadcast():
    global broadcast_data
    while True:
        broadcast_data.set_result(datetime.datetime.now())
        broadcast_data = loop.create_future()
        await asyncio.sleep(1)

loop.create_task(broadcast())

最后,在为客户端创建的每个协程中等待广播,例如handle_echo

def handle_echo(r, w):
    while True:
        data = await broadcast_data
        # data contains the broadcast datetime - send it to the client
        w.write(str(data))

修改Websockets处理协程以使广播数据按相同方式等待和转发应该很简单。


1
我只是想指出AppRunner的用法,但你的回答太棒了!谢谢。 - Andrew Svetlov
我可能暴露了我缺乏Python背景的事实(虽然我是30多年的程序员,但只用了一年Python),但我找不到AppRunner?(“NameError:name 'AppRunner'未定义,我无法弄清楚我需要导入什么才能得到它?) - Mark Rogers
@MarkRogers AppRunneraiohttp 的一部分,你可以从 aiohttp.web_runner 导入它。有关详细信息,请参阅文档 - user4815162342
@user4815162342 找到了,我安装的是旧版本的aiohttp,来自发行版存储库,这就是为什么它没有任何意义的原因。通过pip安装解决了问题。 - Mark Rogers
请注意,如果“localhost”似乎可以工作但永远挂起,则很可能缺少“run_forever”。 - user202729
显示剩余2条评论

2

根据@user4815162342的建议,以下是我的“工作”代码。我把它发布为答案,因为它是一个完整的工作脚本,可以实现我最初问题的所有要求,但它并不是完美的,因为它当前无法干净地退出。

运行时,它将接受8080端口上的Web连接和8081上的TCP(例如telnet)连接。通过其Web表单或telnet收到的任何消息都将广播到所有连接。此外,每5秒钟时间将被广播。

如何干净地退出(建立Web连接时按ctrl+C 会生成多个“Task was destroyed but it is pending!”错误)的建议将不胜感激,这样我就可以更新这个答案。

(代码相当冗长,因为它包含了用于Websockets组件的嵌入式HTML和JS。)

import asyncio
from aiohttp import web
import aiohttp
import datetime
import re

queues = []

loop = asyncio.get_event_loop()

# Broadcast data is transmitted through a global Future. It can be awaited
# by multiple clients, all of which will receive the broadcast. At each new
# iteration, a new future is created, to be picked up by new awaiters.
broadcast_data = loop.create_future()

def broadcast(msg):
    global broadcast_data
    msg = str(msg)
    print(">> ", msg)
    if not broadcast_data.done():
        broadcast_data.set_result(msg)
    broadcast_data = loop.create_future()

# Dummy loop to broadcast the time every 5 seconds
async def broadcastLoop():
    while True:
        broadcast(datetime.datetime.now())
#       print('#',end='',flush=True)
        await asyncio.sleep(5)

# Handler for www requests
async def wwwhandler(r):
    host = re.search('https?://([^/]+)/', str(r.url)).group(1)
    name = r.match_info.get('name', "Anonymous")
    text = """<!DOCTYPE html>
<html>
    <head>
        <title>WebSocket PHP Open Group Chat App</title>
        <!-- <link type="text/css" rel="stylesheet" href="style.css" /> -->
        <script>
var output;
var websocket;

function WebSocketSupport() {
    if (browserSupportsWebSockets() === false) {
        document.getElementById("ws_support").innerHTML = "<h2>Sorry! Your web browser does not supports web sockets</h2>";
        var element = document.getElementById("wrapper");
        element.parentNode.removeChild(element);
        return;
    }

    output = document.getElementById("chatbox");

    websocket = new WebSocket('ws:{{HOST}}/ws');
    websocket.onopen    = function(e) { writeToScreen("You have have successfully connected to the server"); };
    websocket.onmessage = function(e) { onMessage(e) };
    websocket.onerror   = function(e) { onError(e) };
}

function onMessage(e)   { writeToScreen('<span style="color: blue;"> ' + e.data + '</span>'); }
function onError(e)     { writeToScreen('<span style="color: red;">ERROR:</span> ' + e.data); }

function doSend(message) {
    var validationMsg = userInputSupplied();
    if (validationMsg !== '') {
        alert(validationMsg);
        return;
    }
    var chatname = document.getElementById('chatname').value;

//  document.getElementById('msg').value = "";
//  document.getElementById('msg').focus();
    var msg = chatname + ' says: ' + message;
    websocket.send(msg);
    writeToScreen(msg);
}

function writeToScreen(message) {
    var pre = document.createElement("p");
    pre.style.wordWrap = "break-word";
    pre.innerHTML = message;
    output.appendChild(pre);
}

function userInputSupplied() {
    var chatname = document.getElementById('chatname').value;
    var msg = document.getElementById('msg').value;
    if (chatname === '') { return 'Please enter your username'; } 
    if (msg === '') { return 'Please the message to send'; } 
    return '';
}

function browserSupportsWebSockets() {
    if ("WebSocket" in window) { return true; } else { return false; }
}
    </script>
    </head>
    <body onload="javascript:WebSocketSupport()">
        <div id="ws_support"></div>

        <div id="wrapper">
            <div id="menu">
                <h3 class="welcome">Welcome to WebSocket PHP Open Group Chat App v1</h3>
            </div>

            <div id="chatbox"></div>

            <div id ="controls">
                <label for="name"><b>Name</b></label>
                <input name="chatname" type="text" id="chatname" size="67" placeholder="Type your name here" value="MyName" />
                <input name="msg" type="text" id="msg" size="63" placeholder="Type your message here" value="Test" />
                <input name="sendmsg" type="submit"  id="sendmsg" value="Send" onclick="doSend(document.getElementById('msg').value)" />
            </div>
        </div>
    </body>
</html>"""
    text = text.replace('{{HOST}}', host)
    return web.Response(text=text, headers={'content-type':'text/html'})

# Handler for websocket connections
async def wshandler(r):
    # Get the websocket connection
    ws = web.WebSocketResponse()
    await ws.prepare(r)    
    # Append it to list so we can manage it later if needed
    r.app['websockets'].append(ws)

    try:
        # Create the broadcast task, and add it to list for later management
        echo_task = asyncio.Task(echo_loop(ws))
        r.app['tasks'].append(echo_task)

        # Tell the world we've connected
        # Note: Connecting client won't get this message, not really sure why
        broadcast('Hello {}'.format(r.remote))
#       await ws.send_str('Hello {}'.format(r.remote))

        # Loop through any messages we get from the client
        async for msg in ws:
            # .. and broadcast them
            if msg.type == web.WSMsgType.TEXT:
                print('<< ', msg.data)
                broadcast(msg.data)
                #            await ws.send_str("Hello, {}".format(msg.data))
                #        elif msg.type == web.WSMsgType.BINARY:
                #            await ws.send_bytes(msg.data)
            elif msg.type == web.WSMsgType.CLOSE:
                print('WS Connection closed')
                break
            elif msg.type == web.WSMsgType.ERROR:
                print('WS Connection closed with exception %s' % ws.exception())
                break
            else:
                print('WS Connection received unknown message type %2' % msg.type)

        # ws has stopped sending us data so broadcast goodbye 
        broadcast('Goodbye {}'.format(r.remote))
    except GeneratorExit:
        pass
    finally:
        # Close the ws and remove it from the list
        await ws.close()
        r.app['websockets'].remove(ws)

        # Cancel the task and remove it from the list
        # Note: cancel() only requests cancellation, it doesn't wait for it
        echo_task.cancel()
        r.app['tasks'].remove(echo_task)

    return ws

# ws broadcast loop: Each WS connection gets one of these which waits for broadcast data then sends it
async def echo_loop(ws):
    while True:
        msg = await broadcast_data
        await ws.send_str(str(msg))

# web app shutdown code: cancels any open tasks and closes any open websockets
# Only partially working   
async def on_shutdown(app):
    print('Shutting down:', end='')
    for t in app['tasks']:
        print('#', end='')
        if not t.cancelled():
            t.cancel()
    for ws in app['websockets']:
        print('.', end='')
        await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server Shutdown')
    print(' Done!')

# Code to handle TCP connections
async def echo_loop_tcp(writer):
    while True:
        msg = await broadcast_data
        writer.write( (msg + "\r\n").encode() )
        await writer.drain()

async def handle_echo(reader, writer):
    echo_task = asyncio.Task(echo_loop_tcp(writer))
    while True:
        data = await reader.readline()
        if not data:
            break
        message = data.decode().strip()
#       addr = writer.get_extra_info('peername')
        broadcast(message)

    print("Connection dropped")
    echo_task.cancel()

tcpServer = loop.run_until_complete(asyncio.start_server(handle_echo, '0.0.0.0', 8081, loop=loop))
print('Serving on {}'.format(tcpServer.sockets[0].getsockname()))

# The application code:
app = web.Application()
app['websockets'] = []
app['tasks'] = []
app.router.add_get('/ws', wshandler)
app.router.add_get('/', wwwhandler)
app.router.add_get('/{name}', wwwhandler)
app.on_shutdown.append(on_shutdown)

def main():
    # Kick off the 5s loop
    tLoop=loop.create_task(broadcastLoop())

    # Kick off the web/ws server
    async def start():
        global runner, site
        runner = web.AppRunner(app)
        await runner.setup()
        site = web.TCPSite(runner, '0.0.0.0', 8080)
        await site.start()

    async def end():
        await app.shutdown()

    loop.run_until_complete(start())

    # Main program "loop"
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        # On exit, kill the 5s loop
        tLoop.cancel()
        # .. and kill the web/ws server
        loop.run_until_complete( end() )

    # Stop the main event loop
    loop.close()

if __name__ == '__main__':
    main()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接