如何在Python中判断连接是否已断开

73

我希望我的Python应用程序能够判断另一端的socket是否已经关闭。有没有相关的方法可以实现这个功能?


你是在读取(并获取EOF)吗?还是在写入(并获取I/O错误)?或者只是使用select等待? - S.Lott
6个回答

50

简短回答:

使用非阻塞 recv(),或带有极短超时的阻塞 recv() / select()。

详细回答:

处理套接字连接的方法是在需要时读取或写入,并准备好处理连接错误。

TCP 区分了三种“断开”连接的形式:超时、重置、关闭。

其中,超时实际上无法检测到,TCP 只会告诉您时间还没有过期。但即使它告诉了你,时间可能仍然会在此之后过期。

同时请记住,使用 shutdown(),您或者您的对等体(连接的另一端)可以仅关闭传入的字节流并保持传出的字节流运行,或者关闭传出的字节流并保持传入的字节流运行。

因此,严格来说,您需要检查读取流是否关闭,或写入流是否关闭,或两者是否都关闭。

即使连接已经“断开”,您仍应该能够读取网络缓冲区中仍存在的任何数据。只有在缓冲区为空时,您才会从 recv() 中收到一个断开通知。

检查连接是否已经断开就像问“在读取当前缓冲区中的所有数据之后,我将收到什么?”要找出答案,您只需要读取当前缓冲区中的所有数据。

我能理解“读取所有缓冲数据”以达到其末端可能对某些人来说是个问题,这些人仍然认为 recv() 是一个阻塞函数。使用阻塞 recv() 时,“检查”一个已经为空的缓冲区会阻塞,这就破坏了“检查”的目的。

在我看来,任何被记录为有可能无限期地阻塞整个进程的功能都是设计上的缺陷,但我想出于历史原因而仍然存在,因为当像常规文件描述符一样使用套接字时,这是个很酷的想法。

你可以:

  • 将套接字设置为非阻塞模式,但这时你会得到一个系统依赖错误,表示接收缓冲区为空或发送缓冲区已满
  • 坚持使用阻塞模式,但设置非常短的套接字超时时间。这将允许您使用recv()“ping”或“检查”套接字,基本上就是您想要做的。
  • 使用select()调用或asyncore模块并设置非常短的超时时间。错误报告仍然是特定于系统的。

对于问题的写部分,保持读缓冲区为空几乎就解决了。在非阻塞读取尝试后,您会发现连接“断开”,并且在读取返回关闭通道后,您可以选择停止发送任何内容。

我想确定发送的数据已到达另一端(而不是仍在发送缓冲区中)的唯一方法是:

  • 在同一套接字上接收到与您发送的确切消息对应的正确响应。基本上,您正在使用更高级别的协议提供确认。
  • 在套接字上执行成功的shutdown()和close()操作

Python socket howto指出,如果通道关闭,send()将返回0个写入字节。您可以使用非阻塞或带超时的socket.send(),如果它返回0,则无法再在该套接字上发送数据。但是,如果它返回非零值,您已经发送了一些数据,请祝您好运:)

此外,在此处我没有考虑OOB(带外)套接字数据作为解决问题的手段,但我认为OOB不是您的意思。


请注意,这样做将会删除一些可供读取的数据。如果您在另一个线程从套接字中读取数据时执行此操作,那么读取的数据中将会缺少一小块。 - byxor
超时并不是TCP“断开连接”的一种形式。它仅表示在超时期间没有数据到达。连接仍然存在,可能仍然可用。 - user207421

42

这取决于你所说的“断开连接”的含义。对于TCP套接字,如果另一端通过close()或进程终止关闭连接,您将通过读取文件结束或获取读取错误来发现,通常errno被设置为操作系统中'connection reset by peer'的任何内容。对于Python,当您尝试从套接字读取或写入时,您将读取零长度字符串,或者将抛出socket.error。


16

从Jweede发布的链接中:

异常 socket.timeout:

This exception is raised when a timeout occurs on a socket
which has had timeouts enabled via a prior call to settimeout().
The accompanying value is a string whose value is currently
always “timed out”.

以下是来自Python文档socket模块的演示服务器和客户端程序:

# Echo server program
import socket

HOST = ''                 # Symbolic name meaning all available interfaces
PORT = 50007              # Arbitrary non-privileged port
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((HOST, PORT))
s.listen(1)
conn, addr = s.accept()
print 'Connected by', addr
while 1:
    data = conn.recv(1024)
    if not data: break
    conn.send(data)
conn.close()

还有客户端:

# Echo client program
import socket

HOST = 'daring.cwi.nl'    # The remote host
PORT = 50007              # The same port as used by the server
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((HOST, PORT))
s.send('Hello, world')
data = s.recv(1024)
s.close()
print 'Received', repr(data)

在文档示例页面中,我选取了这些内容。还有一些更复杂的例子采用了这个方法,但这里是一个简单的答案:

假设你正在编写客户端程序,只需将所有使用可能会被断开的套接字的代码放入 try 块中...

try:
    s.connect((HOST, PORT))
    s.send("Hello, World!")
    ...
except socket.timeout:
    # whatever you need to do when the connection is dropped

谢谢。这正是我需要的,以便回答一个相关但不同的问题... - Deacon
send() 不会触发超时,只有 connect()recv() 会触发超时。 - user207421

4

我将这篇博客文章中的代码示例翻译成了Python: 如何检测客户端何时关闭连接?,并且它对我运行良好:

from ctypes import (
    CDLL, c_int, POINTER, Structure, c_void_p, c_size_t,
    c_short, c_ssize_t, c_char, ARRAY
)


__all__ = 'is_remote_alive',


class pollfd(Structure):
    _fields_ = (
        ('fd', c_int),
        ('events', c_short),
        ('revents', c_short),
    )


MSG_DONTWAIT = 0x40
MSG_PEEK = 0x02

EPOLLIN = 0x001
EPOLLPRI = 0x002
EPOLLRDNORM = 0x040

libc = CDLL('libc.so.6')

recv = libc.recv
recv.restype = c_ssize_t
recv.argtypes = c_int, c_void_p, c_size_t, c_int

poll = libc.poll
poll.restype = c_int
poll.argtypes = POINTER(pollfd), c_int, c_int


class IsRemoteAlive:  # not needed, only for debugging
    def __init__(self, alive, msg):
        self.alive = alive
        self.msg = msg

    def __str__(self):
        return self.msg

    def __repr__(self):
        return 'IsRemoteAlive(%r,%r)' % (self.alive, self.msg)

    def __bool__(self):
        return self.alive


def is_remote_alive(fd):
    fileno = getattr(fd, 'fileno', None)
    if fileno is not None:
        if hasattr(fileno, '__call__'):
            fd = fileno()
        else:
            fd = fileno

    p = pollfd(fd=fd, events=EPOLLIN|EPOLLPRI|EPOLLRDNORM, revents=0)
    result = poll(p, 1, 0)
    if not result:
        return IsRemoteAlive(True, 'empty')

    buf = ARRAY(c_char, 1)()
    result = recv(fd, buf, len(buf), MSG_DONTWAIT|MSG_PEEK)
    if result > 0:
        return IsRemoteAlive(True, 'readable')
    elif result == 0:
        return IsRemoteAlive(False, 'closed')
    else:
        return IsRemoteAlive(False, 'errored')

你的解决方案看起来非常好,但是出现了以下错误: File "C:\Python37\lib\ctypes_init_.py", line 356, in init self._handle = _dlopen(self._name, mode) TypeError: LoadLibrary() argument 1 must be str, not None?? - JDOaktown
你有解决方法吗?它在这里出错了:libc = CDLL(None) - JDOaktown
我正在使用64位的Windows 10操作系统。 - JDOaktown
1
@JDOaktown,在Linux中,您可以使用CDLL('libc.so.6')。我不知道在Windows的Linux子系统中是否相同。很抱歉,现在我没有Windows安装来测试它。 - Kijewski

4
如果我没记错的话,通常是通过超时来处理这个问题的。

1

试图改进@kay的回答。我制作了一个更符合Python风格的版本。

(注意,它尚未在“真实环境”中进行测试,仅在Linux上进行了测试)

这可以检测到远程端是否关闭了连接,而不实际消耗数据:

import socket
import errno


def remote_connection_closed(sock: socket.socket) -> bool:
    """
    Returns True if the remote side did close the connection

    """
    try:
        buf = sock.recv(1, socket.MSG_PEEK | socket.MSG_DONTWAIT)
        if buf == b'':
            return True
    except BlockingIOError as exc:
        if exc.errno != errno.EAGAIN:
            # Raise on unknown exception
            raise
    return False

以下是一个来自异步io回显服务器的简单示例:

import asyncio


async def handle_echo(reader, writer):
    addr = writer.get_extra_info('peername')
    sock = writer.get_extra_info('socket')
    print(f'New client: {addr!r}')

    # Initial of client command
    data = await reader.read(100)
    message = data.decode()

    print(f"Received {message!r} from {addr!r}")

    # Simulate a long async process
    for _ in range(10):
        if remote_connection_closed(sock):
            print('Remote side closed early')
            return
        await asyncio.sleep(1)

    # Write the initial message back
    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()
    writer.close()


async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()


if __name__ == '__main__':
    asyncio.run(main())

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接