根据@user4815162342的建议,以下是我的“工作”代码。我把它发布为答案,因为它是一个完整的工作脚本,可以实现我最初问题的所有要求,但它并不是完美的,因为它当前无法干净地退出。
运行时,它将接受8080端口上的Web连接和8081上的TCP(例如telnet)连接。通过其Web表单或telnet收到的任何消息都将广播到所有连接。此外,每5秒钟时间将被广播。
如何干净地退出(建立Web连接时按ctrl+C 会生成多个“Task was destroyed but it is pending!”错误)的建议将不胜感激,这样我就可以更新这个答案。
(代码相当冗长,因为它包含了用于Websockets组件的嵌入式HTML和JS。)
import asyncio
from aiohttp import web
import aiohttp
import datetime
import re
queues = []
loop = asyncio.get_event_loop()
broadcast_data = loop.create_future()
def broadcast(msg):
global broadcast_data
msg = str(msg)
print(">> ", msg)
if not broadcast_data.done():
broadcast_data.set_result(msg)
broadcast_data = loop.create_future()
async def broadcastLoop():
while True:
broadcast(datetime.datetime.now())
await asyncio.sleep(5)
async def wwwhandler(r):
host = re.search('https?://([^/]+)/', str(r.url)).group(1)
name = r.match_info.get('name', "Anonymous")
text = """<!DOCTYPE html>
<html>
<head>
<title>WebSocket PHP Open Group Chat App</title>
<!-- <link type="text/css" rel="stylesheet" href="style.css" /> -->
<script>
var output;
var websocket;
function WebSocketSupport() {
if (browserSupportsWebSockets() === false) {
document.getElementById("ws_support").innerHTML = "<h2>Sorry! Your web browser does not supports web sockets</h2>";
var element = document.getElementById("wrapper");
element.parentNode.removeChild(element);
return;
}
output = document.getElementById("chatbox");
websocket = new WebSocket('ws:{{HOST}}/ws');
websocket.onopen = function(e) { writeToScreen("You have have successfully connected to the server"); };
websocket.onmessage = function(e) { onMessage(e) };
websocket.onerror = function(e) { onError(e) };
}
function onMessage(e) { writeToScreen('<span style="color: blue;"> ' + e.data + '</span>'); }
function onError(e) { writeToScreen('<span style="color: red;">ERROR:</span> ' + e.data); }
function doSend(message) {
var validationMsg = userInputSupplied();
if (validationMsg !== '') {
alert(validationMsg);
return;
}
var chatname = document.getElementById('chatname').value;
// document.getElementById('msg').value = "";
// document.getElementById('msg').focus();
var msg = chatname + ' says: ' + message;
websocket.send(msg);
writeToScreen(msg);
}
function writeToScreen(message) {
var pre = document.createElement("p");
pre.style.wordWrap = "break-word";
pre.innerHTML = message;
output.appendChild(pre);
}
function userInputSupplied() {
var chatname = document.getElementById('chatname').value;
var msg = document.getElementById('msg').value;
if (chatname === '') { return 'Please enter your username'; }
if (msg === '') { return 'Please the message to send'; }
return '';
}
function browserSupportsWebSockets() {
if ("WebSocket" in window) { return true; } else { return false; }
}
</script>
</head>
<body onload="javascript:WebSocketSupport()">
<div id="ws_support"></div>
<div id="wrapper">
<div id="menu">
<h3 class="welcome">Welcome to WebSocket PHP Open Group Chat App v1</h3>
</div>
<div id="chatbox"></div>
<div id ="controls">
<label for="name"><b>Name</b></label>
<input name="chatname" type="text" id="chatname" size="67" placeholder="Type your name here" value="MyName" />
<input name="msg" type="text" id="msg" size="63" placeholder="Type your message here" value="Test" />
<input name="sendmsg" type="submit" id="sendmsg" value="Send" onclick="doSend(document.getElementById('msg').value)" />
</div>
</div>
</body>
</html>"""
text = text.replace('{{HOST}}', host)
return web.Response(text=text, headers={'content-type':'text/html'})
async def wshandler(r):
ws = web.WebSocketResponse()
await ws.prepare(r)
r.app['websockets'].append(ws)
try:
echo_task = asyncio.Task(echo_loop(ws))
r.app['tasks'].append(echo_task)
broadcast('Hello {}'.format(r.remote))
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
print('<< ', msg.data)
broadcast(msg.data)
elif msg.type == web.WSMsgType.CLOSE:
print('WS Connection closed')
break
elif msg.type == web.WSMsgType.ERROR:
print('WS Connection closed with exception %s' % ws.exception())
break
else:
print('WS Connection received unknown message type %2' % msg.type)
broadcast('Goodbye {}'.format(r.remote))
except GeneratorExit:
pass
finally:
await ws.close()
r.app['websockets'].remove(ws)
echo_task.cancel()
r.app['tasks'].remove(echo_task)
return ws
async def echo_loop(ws):
while True:
msg = await broadcast_data
await ws.send_str(str(msg))
async def on_shutdown(app):
print('Shutting down:', end='')
for t in app['tasks']:
print('#', end='')
if not t.cancelled():
t.cancel()
for ws in app['websockets']:
print('.', end='')
await ws.close(code=aiohttp.WSCloseCode.GOING_AWAY, message='Server Shutdown')
print(' Done!')
async def echo_loop_tcp(writer):
while True:
msg = await broadcast_data
writer.write( (msg + "\r\n").encode() )
await writer.drain()
async def handle_echo(reader, writer):
echo_task = asyncio.Task(echo_loop_tcp(writer))
while True:
data = await reader.readline()
if not data:
break
message = data.decode().strip()
broadcast(message)
print("Connection dropped")
echo_task.cancel()
tcpServer = loop.run_until_complete(asyncio.start_server(handle_echo, '0.0.0.0', 8081, loop=loop))
print('Serving on {}'.format(tcpServer.sockets[0].getsockname()))
app = web.Application()
app['websockets'] = []
app['tasks'] = []
app.router.add_get('/ws', wshandler)
app.router.add_get('/', wwwhandler)
app.router.add_get('/{name}', wwwhandler)
app.on_shutdown.append(on_shutdown)
def main():
tLoop=loop.create_task(broadcastLoop())
async def start():
global runner, site
runner = web.AppRunner(app)
await runner.setup()
site = web.TCPSite(runner, '0.0.0.0', 8080)
await site.start()
async def end():
await app.shutdown()
loop.run_until_complete(start())
try:
loop.run_forever()
except KeyboardInterrupt:
pass
finally:
tLoop.cancel()
loop.run_until_complete( end() )
loop.close()
if __name__ == '__main__':
main()