asyncio - 如何在不停止事件循环的情况下停止(并重新启动)服务器?

4

在这个例子中,我正在使用websockets模块。 一个典型的服务器实现如下:

import websockets
start_server = websockets.serve(counter, "localhost", 6789)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()  

但我希望能够在不停止事件循环的情况下停止和重新启动服务器。从我所看到的有关asyncio服务器的最少文档中,我不清楚如何做到这一点。而我也不知道websockets是否以相同的方式实现。

例如,如果我像这样做:

def counter():
    start_server = websockets.serve(connection_handler, 'localhost', 6789)
    this = loop.run_until_complete(start_server)
    try:
        loop.run_forever()
    finally:
        this.close()
        loop.run_until_complete(this.wait_closed())  


loop = asyncio.get_event_loop()
loop.create_task(anothertask)
startcounter = counter()

我可以调用 loop.stop() 来停止服务器。如何在不停止循环的情况下停止服务器(而不会影响运行在循环上的其他任务)?


你尝试过立即调用this.close()而不是loop.stop()来停止服务器并保持循环运行吗? - user4815162342
我的问题的一部分是被低估的第二部分。如何在已经运行的循环上启动服务器?我唯一找到的启动websockets服务器的方法是run_until_complete()。据我所知,这需要一个停止的循环。但肯定还有其他方法可以做到... - chmedly
当循环已经在运行时,可以使用asyncio.create_task()。 (我认为在它之前也可以-它将在循环启动时运行)。 run_until_complete()+ run_forever()模式来自早期的asyncio版本,这些版本还没有 serve_forever() asyncio.run()。 现在您可以在顶层使用asyncio.run()来启动异步入口点(通常定义为async def main()),并从那里执行所有操作。 然后,run_until_complete(foo)变成了简单的await foo,而run_forever()则不再需要,因为您只需等待asyncio.Event - user4815162342
谢谢!这帮助我克服了一些头脑中的障碍。这个代码结构大部分都是有效的。我稍微完善了一下,使得一个websocket客户端可以停止服务器,而你的“测试”例程会定期发送不同的事件来重新启动它。但是,我发现await server.wait_closed()这行代码会永久阻塞。简单地将其删除会导致代码按预期工作,但内存使用量会随着服务器的停止和新服务器的创建而缓慢上升。似乎服务器没有被垃圾回收。 - chmedly
我认为调用wait_closed()而不等待它实际上没有任何意义,因为它的唯一目的是等待先前执行的close()。如果您不想(或无法)等待关闭完成,则应该不要调用wait_closed()。如果您认为close()在客户端连接方面做得不对,也许您应该在websockets库上提交错误报告或在其讨论渠道上提问。 - user4815162342
显示剩余4条评论
2个回答

4
您可以使用asyncio.create_task在循环已经运行时提交任务。现在不推荐使用run_until_complete()run_forever()模式,因为它与asyncio.run不兼容,而asyncio.run现在是运行asyncio代码的首选方式。
建议的方法是在顶层使用asyncio.run来启动异步入口点(通常定义为async def main()),然后从那里开始进行其余操作。此时,run_until_complete(x)简单地变为await x,而run_forever()则不再需要,因为您可以等待诸如server.serve_forever()或您选择的asyncio.Event之类的东西。
由于似乎不存在websocket服务器的serve_forever,因此这里是带有事件的变体(未经测试):
async def connection_handler(...):
    ...

async def test(stop_request):
    # occasionally stop the server to test it
    while True:
        await asyncio.sleep(1)
        print('requesting stop')
        stop_request.set()

async def main():
    stop_request = asyncio.Event()
    asyncio.create_task(test(stop_request))
    while True:
        print('starting the server')
        server = await websockets.serve(connection_handler, 'localhost', 6789)
        await stop_request.wait()
        stop_request.clear()
        print('stopping the server')
        server.close()
        await server.wait_closed()

asyncio.run(main())

1
这个答案基于用户user4815162342提供的建议和答案,并借鉴了websockets模块文档中的一些内容。我回答的原因主要是为了提供一个更加详细的示例。我认为,使用asyncio最困难的部分之一是将其与其他代码集成而不破坏它。
关于这个实现,我想指出的一件事是,在user4815162342代码行中的"server"对象:
async def somefunction(): # requires async function
    server = await websockets.serve(handler, 'localhost', 6789)

这个“server”对象与WebSockets文档中经常出现的以下代码创建的对象相同:
def somefunction(): # non-async function
    start_server = websockets.serve(handler, 'localhost', 6789)
    server = asyncio.run_until_complete(start_server)

因此,它将响应close()方法。
此外,websocket的作者建议使用以下方式启动服务器:
async def somefunction(): # requires async function
    async with websockets.serve(handler, 'localhost', 6789)
    await something_that_says_stop

请注意,哪些可以从非异步函数调用,哪些必须发生“在循环中”。
在我的示例中(一个有点奇特的脚本),我集成了一个tkinter GUI。这是通过aiotkinter模块完成的,除了它的pypi页面之外,几乎没有其他提及,更不用说示例代码了。因此,这里是一个如何在websockets模块中使用它的示例,其中websockets服务器可以启动和停止。
请注意,我没有调用root.mainloop()。相反,在这个例子中,“阻塞行”是asyncio.run()行。还要注意set_event_loop_policy行放置的位置。
已在Mac上测试过Python 3.8.2和TclTk 8.6。感谢user4815162342的帮助。Asyncio的语言变得更加清晰,我认为我们需要发布更多使用新语言/结构的示例代码和答案……
import sys
from functools import partial
import asyncio
import aiotkinter
from tkinter import *
from tkinter import messagebox
import json
import websockets
from netifaces import interfaces, ifaddresses, AF_INET


async def register(w, label2):
    USERS.remove(w) if w in USERS else USERS.add(w)
    label2.config(text=f"User Count = {len(USERS)}")


async def connection_handler(websocket, path, stop_request,
                               ticker, start_request, label, label2):
    misterDict = {"Mr.White":"white", "Mr.Orange":"orange",
                "Mr.Blonde":"khaki", "Mr.Pink":"pink",
                "Mr.Blue":"blue", "Mr.Brown":"brown"}

    await register(websocket, label2)
    try:
        message = json.dumps("welcome")
        await websocket.send(message)
        # ignore this -> while server.get():
        async for message in websocket:
            msg = json.loads(message)
            if msg == "end":
                return
            elif msg == "stoptick":
                ticker.set(False)
            elif msg == "starttick":
                ticker.set(True)
                startTicker(ticker, label)
            elif msg == "stopserver":
                stop_request.set()
                break
            elif msg in misterDict.keys():
                color = misterDict[msg]
                changeColor(label, color)
            # ignore this -> await asyncio.sleep(.05)
    except Exception as E1:
        print(f"Error! {E1}")
    finally:
        await register(websocket, label2)

async def main():
    stop_request = asyncio.Event()
    start_request = asyncio.Event()
    # Some tkinter setup
    root = Tk()
    root.title("Color Server")
    root.minsize(250, 250)
    ticker = BooleanVar()
    ticker.set(0)
    server = BooleanVar()
    server.set(0)
    label = Label(master=root, height=4, width=20, bg="white",
                    bd=5)
    label.pack()
    buttonS = Checkbutton(master=root, text='Mr tick', variable=ticker,
                    height=3, command=partial(startTicker,
                    ticker, label))
    buttonS.pack()
    buttonW = Checkbutton(master=root, text='Websocket Server',
                    variable=server, height=3,
                    command=partial(startWeb, server,
                    label, stop_request, start_request))
    buttonW.pack()

    def on_closing():
        if messagebox.askokcancel("Quit", "Do you want to quit?"):
            QUITTER.add("yes")
            if server.get():
                stop_request.set()
            else:
                start_request.set()


    buttonX = Button(master=root, text='Shut down everything', height=2,
                    command=on_closing)
    buttonX.pack()
    label2 = Label(master=root, text='User Count = 0',
                        height=2, bg="white", bd=2)
    label2.pack()
    root.protocol("WM_DELETE_WINDOW", on_closing)

    # websocket server setup code.
    serverlist = set()
    iplist = [ifaddresses(face)[AF_INET][0]["addr"]
        for face in interfaces() if AF_INET in ifaddresses(face)]
    print(f"the interfaces found = {iplist}")
    while True:
        await start_request.wait()
        start_request.clear()
        if "yes" in QUITTER:
            break
        server.set(1)
        for ipadd in iplist: # for each IP address in the system
                                # setup a websocket server
            bound_handler = partial(connection_handler,
                                stop_request=stop_request, ticker=ticker,
                                start_request=start_request,
                                label=label, label2=label2)
            sockserver = await websockets.serve(bound_handler, ipadd, 6789)
            serverlist.add(sockserver)

        await stop_request.wait()
        stop_request.clear()
        for sockserver in serverlist:
            sockserver.close()
            await sockserver.wait_closed()
        serverlist.clear()
        if "yes" in QUITTER:
            break
        server.set(0)


def startWeb(server, label, stop_request, start_request):
    if not server.get():
        stop_request.set()
        return
    else:
        start_request.set()


def changeColor(label, color):
    label.config(bg=color)


async def mRtick(ticker, label):
    ticktock = [("tick", "w"), ("tock", "e")]
    tocker = 0
    while ticker.get():
        a,b = ticktock[tocker]
        label.config(text=a, anchor=b)
        tocker = not tocker
        await asyncio.sleep(0.5)
    label.config(text="")


def startTicker(ticker, label):
    if not ticker.get():
        return
    asyncio.create_task(mRtick(ticker, label))


QUITTER = set()
USERS = set()

asyncio.set_event_loop_policy(aiotkinter.TkinterEventLoopPolicy())
asyncio.run(main())
print("we've ended...")
sys.exit()

请注意,我发现代码中有一个不必要的while循环。现在已经将其注释掉。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接