如何在asyncio服务器中实现超时?

5
以下是一个简单的回显服务器。但如果客户端在10秒内没有发送任何内容,我希望关闭连接。
import asyncio


async def process(reader: asyncio.StreamReader, writer: asyncio.StreamWriter):
    print("awaiting for data")
    line = await reader.readline()
    print(f"received {line}")
    writer.write(line)
    print(f"sent {line}")
    await writer.drain()
    print(f"Drained")


async def timeout(task: asyncio.Task, duration):
    print("timeout started")
    await asyncio.sleep(duration)
    print("client unresponsive, cancelling")
    task.cancel()
    print("task cancelled")


async def new_session(reader, writer):
    print("new session started")
    task = asyncio.create_task(process(reader, writer))
    timer = asyncio.create_task(timeout(task, 10))
    await task
    print("task complete")
    timer.cancel()
    print("timer cancelled")
    writer.close()
    print("writer closed")


async def a_main():
    server = await asyncio.start_server(new_session, port=8088)
    await server.serve_forever()


if __name__ == '__main__':
    asyncio.run(a_main())

如果客户端发送消息,一切正常。但在另一种情况下,即客户端保持沉默时,它就无法工作。
当客户端发送消息时:
new session started
awaiting for data
timeout started
received b'slkdfjsdlkfj\r\n'
sent b'slkdfjsdlkfj\r\n'
Drained
task complete
timer cancelled
writer closed

当客户端在打开连接后保持静默

new session started
awaiting for data
timeout started
client unresponsive, cancelling
task cancelled

没有 任务完成计时器取消写入器关闭

  1. 以上代码有什么问题?
  2. 有更好的实现超时的方法吗?

更新

找到了问题,看起来任务已经被取消了,但异常被静默忽略了,通过捕获 CancelledError 解决了这个问题。

async def new_session(reader, writer):
    print("new session started")
    task = asyncio.create_task(process(reader, writer))
    timer = asyncio.create_task(timeout(task, 10))
    try:
        await task
    except asyncio.CancelledError:
        print(f"Task took too long and was cancelled by timer")
    print("task complete")
    timer.cancel()
    print("timer cancelled")
    writer.close()
    print("writer closed")

第二部分仍然存在。有没有更好的方法来实现超时?


更新2

使用wait_for的完整代码。不再需要超时代码。检查下面接受的解决方案

async def new_session(reader, writer):
    print("new session started")
    try:
        await asyncio.wait_for(process(reader, writer), timeout=5)
    except asyncio.TimeoutError as te:
        print(f'time is up!{te}')
    finally:
        writer.close()
        print("writer closed")

2
你可以使用 asyncio.wait_for 代替 timeout - user4815162342
@user4815162342 谢谢。你想把它添加为答案吗?我会接受的。 - balki
2个回答

10

在建立连接时,我使用以下代码。我建议你也类似地在你的代码中使用wait_for。

fut = asyncio.open_connection( self.host, self.port, loop=self.loop )
try:
   r, w = await asyncio.wait_for(fut, timeout=self.connection_timeout)
except asyncio.TimeoutError:
   pass

“open_connection” 是客户端的操作。这个问题是关于服务器的。 - balki
1
这是一个使用wait_for的示例,它是Pythonic解决方案,可以实现您想要做的事情。 - MarkReedZ
1
我没有意识到它可以用于任何可等待的对象。已点赞。 - balki

3

有更好的实现超时的方法吗?

您可以使用 asyncio.wait_for 替代 timeout。它具有类似的语义,但已经随着 asyncio 一起提供。此外,您可以等待其返回的 future 来检测是否发生了超时。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接