Python套接字接收大量数据

80

当我尝试接收更大量的数据时,它会被截断,我必须按回车键才能获取剩余的数据。起初,我能够稍微增加一点,但仍无法接收全部数据。正如您所看到的,我已经增加了conn.recv()中的缓冲区,但仍无法获取所有数据。它在某个特定点被截断。我必须在raw_input上按回车键才能接收剩余的数据。有没有办法让我一次性获取所有数据?这是代码。

port = 7777
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.bind(('0.0.0.0', port))
sock.listen(1)
print ("Listening on port: "+str(port))
while 1:
    conn, sock_addr = sock.accept()
    print "accepted connection from", sock_addr
    while 1:
        command = raw_input('shell> ')
        conn.send(command)
        data = conn.recv(8000)
        if not data: break
        print data,
    conn.close()
13个回答

165

TCP/IP是一种基于流的协议,而非基于消息的协议。不能保证一个对等方的每个send()调用都会导致另一个对等方通过单个recv()调用接收到完全相同的数据,可能会因为分组传输而分散在多个recv()调用中。

为了区分消息边界,您需要在TCP之上定义自己的基于消息的协议。然后,要读取消息,您需要继续调用recv(),直到读取整个消息或出现错误。

发送消息的一种简单方法是在每个消息前加上其长度。然后要读取消息,您首先读取长度,然后读取该长度的字节数。以下是如何实现此操作的示例:

def send_msg(sock, msg):
    # Prefix each message with a 4-byte length (network byte order)
    msg = struct.pack('>I', len(msg)) + msg
    sock.sendall(msg)

def recv_msg(sock):
    # Read message length and unpack it into an integer
    raw_msglen = recvall(sock, 4)
    if not raw_msglen:
        return None
    msglen = struct.unpack('>I', raw_msglen)[0]
    # Read the message data
    return recvall(sock, msglen)

def recvall(sock, n):
    # Helper function to recv n bytes or return None if EOF is hit
    data = bytearray()
    while len(data) < n:
        packet = sock.recv(n - len(data))
        if not packet:
            return None
        data.extend(packet)
    return data

然后您可以使用 send_msgrecv_msg 函数发送和接收完整消息,它们不会出现在网络层分裂或合并数据包方面的任何问题。


2
我不确定我是否完全理解了这个。我知道应该发生什么,但似乎无法得到它。我得到了“异常:套接字EOF尝试接收4个字节”的错误。我正在使用以下内容:http://pastebin.com/raw.php?i=AvdN5RyW - user2585107
@user2585107:尝试使用更新版本,当流结束时使用return None而不是引发异常。 - Adam Rosenfield
5
对于大型消息,data += packet 这行代码可能导致接收速度非常慢。更好的方法是使用 data = bytearray() 然后执行 data.extend(packet) - Stan Kriventsov
@StanKriventsov:这是一个很好的观点,我已经更新了示例。我猜Python运行时(当然有许多不同的运行时实现)会针对这种情况进行优化,以避免复制并在缓冲区的引用计数为1时内部执行扩展,但这并不是一定的,我们不应该依赖它。 - Adam Rosenfield
我肯定没有尝试过每个Python运行时来完成这个任务,但至少我使用的版本中没有这种优化。 - Stan Kriventsov
显示剩余5条评论

36

您可以这样使用: data = recvall(sock)

def recvall(sock):
    BUFF_SIZE = 4096 # 4 KiB
    data = b''
    while True:
        part = sock.recv(BUFF_SIZE)
        data += part
        if len(part) < BUFF_SIZE:
            # either 0 or end of data
            break
    return data

7
这可以用于检测“文件结尾”,但不能用于保持连接并检测消息的结束。只有在对等方关闭其套接字的一部分,或至少半关闭它时,才会到达“文件结尾”。 - glglgl
10
如果接收到的字符串少于4096个字符,它将再次循环并使用sock.recv()重新检查是否有更多数据。由于没有更多的数据进来,这将会挂起。如果part的长度小于RECV_BUFFER的长度,则代码可以安全地跳出循环。 - SomeGuyOnAComputer
3
@JadedTuna似乎没有被修复。代码中的“part = sock.recv(BUFF_SIZE)”是一个阻塞调用,因此一旦完整的信息被接收,执行就会停留在这一行。 - sh37211
1
这段代码应该被修改为: 如果len(part) < BUFF_SIZE: # 可能为0或者是数据结束 break - Hungry Mind
4
这似乎错误地假设TCP套接字的一端发送一次,另一端就相应地接收相同数量的字节(例如在这里这里)。因此,即使客户端使用一个send准确发送了4kb数据,服务器可能在第一次recv时仅接收到前面的1kb,这会导致while循环中断。 - OfirD

23

被接受的答案是不错的,但是处理大型文件时速度会非常慢- string是一个不可变类,这意味着每次使用+符号时都会创建更多的对象,使用list作为堆栈结构将更加高效。

这应该会更好地工作。

while True: 
    chunk = s.recv(10000)
    if not chunk: 
        break
    fragments.append(chunk)

print "".join(fragments)

这正是让我将大型二进制文件下载速度提高了约30-50倍的原因...谢谢。 - nmz787

21

大多数答案描述了一种recvall()方法。如果您在接收数据时的瓶颈是在for循环中创建字节数组,我对在recvall()方法中分配接收到的数据的三种方法进行了基准测试:

字节字符串方法:

arr = b''
while len(arr) < msg_len:
    arr += sock.recv(max_msg_size)
列表方法:
fragments = []
while True: 
    chunk = sock.recv(max_msg_size)
    if not chunk: 
        break
    fragments.append(chunk)
arr = b''.join(fragments)

预分配 bytearray 方法:

arr = bytearray(msg_len)
pos = 0
while pos < msg_len:
    arr[pos:pos+max_msg_size] = sock.recv(max_msg_size)
    pos += max_msg_size
结果:

在此输入图片描述


5

声明: 非常少见的情况下,您才需要这样做。如果可能,请使用现有的应用层协议或自定义协议,例如在每个消息前面添加一个固定长度的整数以指示随后的数据长度,或者在每个消息结尾处加上一个 '\n' 字符。(Adam Rosenfield 的回答讲得非常好)

话虽如此,确实有一种方法可以读取套接字上可用的所有数据。然而,依赖这种通信方式是一个不好的主意,会导致数据丢失的风险。请极度谨慎地使用此解决方案,并在阅读下面的说明后再使用。

def recvall(sock):
    BUFF_SIZE = 4096
    data = bytearray()
    while True:
        packet = sock.recv(BUFF_SIZE)
        if not packet:  # Important!!
            break
        data.extend(packet)
    return data

现在 if not packet: 行是非常关键的!很多答案建议使用像 if len(packet) < BUFF_SIZE: 这样的条件语句,但这是有问题的,并且很可能会导致您过早关闭连接并丢失数据。它错误地假设TCP套接字一端的一个发送对应于另一端的一个接收或发送的字节数。实际上并非如此。即使仍有数据等待接收,非常有可能 sock.recv(BUFF_SIZE) 返回比 BUFF_SIZE 小的块。 这里有一个很好的解释 (链接)(链接)
通过使用以上方法,如果连接的另一端写入数据速度慢于您的读取速度,则仍存在数据丢失的风险。 您可能只是简单地消耗了端点上的所有数据,并在更多数据到来时退出。有一些解决方法需要使用并发编程,但这是另一个主题。

5
你可能需要调用conn.recv()多次才能接收所有数据。仅调用一次无法保证获取发送的所有数据,因为TCP流不维护帧边界(即它们仅作为原始字节流而不是结构化消息流)。请参阅this answer了解该问题的另一个描述。
请注意,这意味着您需要知道何时已接收所有数据。如果发送方始终发送确切的8000字节,则可以计算当前已接收到的字节数并从8000中减去以知道剩余数量。如果数据大小可变,则可以使用其他各种方法,例如在发送消息之前让发送方发送一个字节数头,或者如果发送的是ASCII文本,则可以寻找换行符或NUL字符。

2
使用生成器函数的变种(我认为更符合Python风格):
def recvall(sock, buffer_size=4096):
    buf = sock.recv(buffer_size)
    while buf:
        yield buf
        if len(buf) < buffer_size: break
        buf = sock.recv(buffer_size)
# ...
with socket.create_connection((host, port)) as sock:
    sock.sendall(command)
    response = b''.join(recvall(sock))

如果响应小于缓冲区大小,则该方法似乎无法正常工作。 - Shadur
@Shadur,这很有趣,你尝试过吗?你能分享一下重现问题的代码吗?按照写法,只要不为空,recvall 应该会产生每个缓冲区接收到的内容。 - yoniLavi
2
根据添加的调试语句,它在第一个块中吸入整个响应,然后挂起等待下一个块。下面的“块”答案有相同的问题,我最终通过第二个测试来修复它,以查看块的长度是否小于缓冲区大小。我将测试是否也可以修复您的解决方案。--编辑:确实可以。 - Shadur

2
你可以使用序列化来实现。
from socket import *
from json import dumps, loads

def recvall(conn):
    data = ""
    while True:
    try:
        data = conn.recv(1024)
        return json.loads(data)
    except ValueError:
        continue

def sendall(conn):
    conn.sendall(json.dumps(data))

注意:如果您想使用上面的代码共享文件,则需要将其编码/解码为base64。


1

我认为这个问题已经得到了很好的回答,但我想添加一个使用Python 3.8和新的赋值表达式(海象运算符)的方法,因为它在样式上很简单。

import socket

host = "127.0.0.1"
port = 31337
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host,port))
s.listen()
con, addr = s.accept()
msg_list = []

while (walrus_msg := con.recv(3)) != b'\r\n':
    msg_list.append(walrus_msg)

print(msg_list)

在这种情况下,从套接字接收3个字节并立即分配给。一旦套接字接收到'\r\n',它就会退出循环。将添加到中,并在退出循环后打印。该脚本基本但已经测试并可用于telnet会话。
注意: 需要在周围加上括号。如果没有这样做,则将把评估为而不是套接字上的实际数据。

0
修改Adam Rosenfield的代码:
import sys


def send_msg(sock, msg):
    size_of_package = sys.getsizeof(msg)
    package = str(size_of_package)+":"+ msg #Create our package size,":",message
    sock.sendall(package)

def recv_msg(sock):
    try:
        header = sock.recv(2)#Magic, small number to begin with.
        while ":" not in header:
            header += sock.recv(2) #Keep looping, picking up two bytes each time

        size_of_package, separator, message_fragment = header.partition(":")
        message = sock.recv(int(size_of_package))
        full_message = message_fragment + message
        return full_message

    except OverflowError:
        return "OverflowError."
    except:
        print "Unexpected error:", sys.exc_info()[0]
        raise

然而,我强烈鼓励使用原始方法。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接