Asyncio如何重用一个socket。

3

如何在asyncio中重复使用连接到服务器的套接字?而不是为每个查询创建新的连接?

这是我的代码;

async def lookup(server, port, query, sema):
    async with sema as sema:
        try:
            reader, writer = await asyncio.open_connection(server, port)
        except:
            return {}
        writer.write(query.encode("ISO-8859-1"))
        await writer.drain()
        data = b""
        while True:
            d = await reader.read(4096)
            if not d:
                break
            data += d
        writer.close()
        data = data.decode("ISO-8859-1")
        return data

你连接的是什么类型的服务器?标准的43端口WHOIS协议没有任何选项来重用套接字;一旦服务器完成连接,它就会简单地关闭连接。这就是为什么要读取直到收到空响应的原因;这意味着读取器连接现在已关闭。不能再读取更多数据了。 - Martijn Pieters
对于保持连接开放的协议,将会有一种方法来检测消息何时完成(例如CRLF组合、固定大小的消息或以长度信息开头的消息)。假设您正在使用一个循环,当reader.read()返回一个空值时结束,那么您几乎肯定正在使用标准的whois连接。 - Martijn Pieters
我实际上正在连接到一个whois服务器,这很有意义,因为我无法通过一些手动测试使套接字重用工作。谢谢! - Jonathan
2个回答

4

您只需调用asyncio.open_connection(server, port)协程一次,然后继续使用读取器和写入器(当然,前提是服务器不会在其端口关闭连接)。

我建议您为连接创建一个单独的异步上下文管理器对象,并使用连接池来管理连接,以便为许多并发任务创建和重复使用套接字连接。通过使用(异步)上下文管理器,Python会在您的代码完成时通知连接,以便将连接释放回池中:

import asyncio
import contextlib

from collections import OrderedDict
from types import TracebackType
from typing import Any, List, Optional, Tuple, Type


try:  # Python 3.7
    base = contextlib.AbstractAsyncContextManager
except AttributeError:
    base = object  # type: ignore

Server = str
Port = int
Host = Tuple[Server, Port]


class ConnectionPool(base):
    def __init__(
        self,
        max_connections: int = 1000,
        loop: Optional[asyncio.AbstractEventLoop] = None,
    ):
        self.max_connections = max_connections
        self._loop = loop or asyncio.get_event_loop()

        self._connections: OrderedDict[Host, List["Connection"]] = OrderedDict()
        self._semaphore = asyncio.Semaphore(max_connections)

    async def connect(self, server: Server, port: Port) -> "Connection":
        host = (server, port)

        # enforce the connection limit, releasing connections notifies
        # the semaphore to release here
        await self._semaphore.acquire()

        connections = self._connections.setdefault(host, [])
        # find an un-used connection for this host
        connection = next((conn for conn in connections if not conn.in_use), None)
        if connection is None:
            # disconnect the least-recently-used un-used connection to make space
            # for a new connection. There will be at least one.
            for conns_per_host in reversed(self._connections.values()):
                for conn in conns_per_host:
                    if not conn.in_use:
                        await conn.close()
                        break

            reader, writer = await asyncio.open_connection(server, port)
            connection = Connection(self, host, reader, writer)
            connections.append(connection)

        connection.in_use = True
        # move current host to the front as most-recently used
        self._connections.move_to_end(host, False)

        return connection

    async def close(self):
        """Close all connections"""
        connections = [c for cs in self._connections.values() for c in cs]
        self._connections = OrderedDict()
        for connection in connections:
            await connection.close()

    def _remove(self, connection):
        conns_for_host = self._connections.get(connection._host)
        if not conns_for_host:
            return
        conns_for_host[:] = [c for c in conns_for_host if c != connection]

    def _notify_release(self):
        self._semaphore.release()

    async def __aenter__(self) -> "ConnectionPool":
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc: Optional[BaseException],
        tb: Optional[TracebackType],
    ) -> None:
        await self.close()

    def __del__(self) -> None:
        connections = [repr(c) for cs in self._connections.values() for c in cs]
        if not connections:
            return

        context = {
            "pool": self,
            "connections": connections,
            "message": "Unclosed connection pool",
        }
        self._loop.call_exception_handler(context)


class Connection(base):
    def __init__(
        self,
        pool: ConnectionPool,
        host: Host,
        reader: asyncio.StreamReader,
        writer: asyncio.StreamWriter,
    ):
        self._host = host
        self._pool = pool
        self._reader = reader
        self._writer = writer
        self._closed = False
        self.in_use = False

    def __repr__(self):
        host = f"{self._host[0]}:{self._host[1]}"
        return f"Connection<{host}>"

    @property
    def closed(self):
        return self._closed

    def release(self) -> None:
        self.in_use = False
        self._pool._notify_release()

    async def close(self) -> None:
        if self._closed:
            return
        self._closed = True
        self._writer.close()
        self._pool._remove(self)
        try:
            await self._writer.wait_closed()
        except AttributeError:  # wait_closed is new in 3.7
            pass

    def __getattr__(self, name: str) -> Any:
        """All unknown attributes are delegated to the reader and writer"""
        if self._closed or not self.in_use:
            raise ValueError("Can't use a closed or unacquired connection")
        if hasattr(self._reader, name):
            return getattr(self._reader, name)
        return getattr(self._writer, name)

    async def __aenter__(self) -> "Connection":
        if self._closed or not self.in_use:
            raise ValueError("Can't use a closed or unacquired connection")
        return self

    async def __aexit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc: Optional[BaseException],
        tb: Optional[TracebackType],
    ) -> None:
        self.release()

    def __del__(self) -> None:
        if self._closed:
            return
        context = {"connection": self, "message": "Unclosed connection"}
        self._pool._loop.call_exception_handler(context)

然后将一个池对象传递给您的查找协程;连接对象产生了读取器和写入器部分的代理:

async def lookup(pool, server, port, query):
    try:
        conn = await pool.connect(server, port)
    except (ValueError, OSError):
        return {}

    async with conn:
        conn.write(query.encode("ISO-8859-1"))
        await conn.drain()
        data = b""
        while True:
            d = await conn.read(4096)
            if not d:
                break
            data += d
        data = data.decode("ISO-8859-1")
        return data

请注意,标准WHOIS协议(RFC 3912或其前身)规定每次查询后连接都会关闭。如果您连接到端口43上的标准WHOIS服务,则没有重用套接字的必要。
在这种情况下,读取器将已经到达了EOF(reader.at_eof()为true),任何进一步的读取尝试都将简单地返回空值(reader.read(...)将始终返回一个空的b''值)。在远程方超时终止套接字连接之前,向写入器写入不会出现错误。您可以随意向连接写入,但WHOIS服务器只会忽略查询。

如果我有任何误解,请纠正我。在ConnectionPool.connect的while循环内,可以等待多个Future(fut)等待多个任务。这些Futures仅被用作可等待设备。事件循环定期检查正在等待的事物;当它发现一个特定的fut已经完成时,while循环终止,并且该futtask获得连接。如果一切正确,这是否意味着已经在事件循环中挂起了多个ConnectionPool.connect方法的实例?TIA - wwii
@wwii:是的,这就是在这里使用 futures 的目的。connect() 调用将不会返回,直到该任务可以获得一个空闲连接,并且我们可以通过其他释放连接的任务通过 future 发出信号来检测到该点,表明可能有可用的连接。 - Martijn Pieters
我很好奇为什么你要维护一个明确的获取计数和一个等待者字典。使用asyncio.Semaphore来获取锁不是更简单吗?这样可以自动完成所有工作。 - user4815162342
@user4815162342:习惯,比任何事情都重要;我提到了我的漏桶实现作为参考,以及其他连接池。aiohttp也使用他们自己的waiters,但是漏桶和aiohttp池的要求有点更复杂。在这种情况下,一个信号量就可以了。 - Martijn Pieters
谢谢更新。另一个好处是使用信号量不需要获取当前任务,因此可以删除_current_task函数(及其调用)。我已经编辑了答案以删除它们。 - user4815162342
1
@user4815162342 谢谢。我忘了用 flake8 检查最后一次编辑,糟糕。 - Martijn Pieters

2

您可以通过将读/写器对存储到全局字典中来创建连接缓存。

Original Answer: 最初的回答

# at top-level
connections = {}

然后在lookup函数中,用检查字典的代码替换对open_connection函数的调用:

将原始答案翻译成“最初的回答”。

if (server, port) in connections:
    reader, writer = connections[server, port]
else:
    reader, writer = await asyncio.open_connection(server, port)
    connections[server, port] = reader, writer

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接