Python套接字接收/发送多线程

7
我正在编写一个Python程序,在主线程中通过TCP套接字不断(循环)接收数据,使用recv函数。在回调函数中,我使用sendall函数通过同一套接字发送数据。触发回调的原因并不重要。我已将套接字设置为阻塞模式。
我的问题是,这样做是否安全?我的理解是,回调函数在单独的线程(而非主线程)上调用。Python套接字对象是否线程安全?从我的研究中,我得到了相互矛盾的答案。

这里有几个你试图一次解决的隐藏问题。希望我给了你一个详细的答案,但如果你不明白,请随时问。 - Bharel
考虑针对您的用例使用异步 I/O,这样您就不需要线程。 - nosklo
1个回答

10

在Python中,套接字不是线程安全的。

您试图一次解决几个问题:

  1. 套接字不是线程安全的。
  2. recv是阻塞的,会阻塞主线程。
  3. sendall正在从另一个线程中使用。

您可以通过使用asyncio或以类似asyncio内部的方式解决这些问题:通过与socketpair一起使用select.select,并使用传入数据的队列。

import select
import socket
import queue

# Any data received by this queue will be sent
send_queue = queue.Queue()

# Any data sent to ssock shows up on rsock
rsock, ssock = socket.socketpair()

main_socket = socket.socket()

# Create the connection with main_socket, fill this up with your code

# Your callback thread
def different_thread():
    # Put the data to send inside the queue
    send_queue.put(data)

    # Trigger the main thread by sending data to ssock which goes to rsock
    ssock.send(b"\x00")

# Run the callback thread

while True:
    # When either main_socket has data or rsock has data, select.select will return
    rlist, _, _ = select.select([main_socket, rsock], [], [])
    for ready_socket in rlist:
        if ready_socket is main_socket:
            data = main_socket.recv(1024)
            # Do stuff with data, fill this up with your code
        else:
            # Ready_socket is rsock
            rsock.recv(1)  # Dump the ready mark
            # Send the data.
            main_socket.sendall(send_queue.get())

我们在这里使用多个结构体,您需要使用您选择的代码填充空白空间。至于解释:

首先,我们创建一个send_queue,它是要发送数据的队列。然后,我们创建一对连接的套接字(socketpair())。稍后我们需要这个套接字对,以便唤醒主线程,因为我们不希望recv()阻塞并防止写入套接字。

然后,我们连接main_socket并启动回调线程。现在,这里就有了魔力:

在主线程中,我们使用select.select来知道rsockmain_socket是否有任何数据。如果它们中的一个有数据,主线程就会被唤醒。

向队列添加数据时,我们通过信号ssock唤醒主线程,该线程又唤醒rsock,从而从select.select返回。

为了完全理解这一点,您将需要阅读select.select()socketpair()queue.Queue()


@tobias.mcnulty在评论中提出了一个很好的问题:为什么我们应该使用Queue而不是通过套接字发送所有数据?

您也可以使用socketpair发送数据,这样做有其优点,但基于多个原因,通过队列发送可能更可取:

  1. 通过套接字发送数据是一项昂贵的操作。它需要系统调用,需要在系统缓冲区内来回传递数据,并涉及对TCP堆栈的完全使用。使用Queue可以保证只有1次调用 - 用于单字节信号 - 而不是多次调用(除了队列的内部锁定外,但该锁定非常便宜)。通过socketpair发送大量数据将导致多个系统调用。提示一下,您可以使用collections.deque,因为CPython保证它由于GIL而是线程安全的。这样您就不必要求任何系统调用除了socketpair
  2. 从架构上讲,使用队列允许您以稍后希望的任何类型发送数据,并在之后进行解码。这允许主循环变得更加智能,并可以帮助您创建更简单的接口。
  3. 您没有大小限制。它可能是一个错误或一个功能。我相信不鼓励更改系统缓冲区大小,这会自然地限制您可以发送的数据量。它可能是一种好处,但应用程序可能希望自行控制它。使用“自然”特性将导致调用线程挂起。
  4. 对于大数据,与socketpair.recv系统调用一样,您还将通过多个select调用传递。TCP没有消息边界。您要么必须创建人工边界,将套接字设置为非阻塞并处理异步套接字,或将其视为流并连续通过

我实际上正在使用MicroPython,但似乎MicroPython不支持socket.socketpair()。有没有什么解决方法?也许可以创建一个类似的函数或者只是创建两个额外的套接字?谢谢! - Software Dev
@Alon 请记住,在Unix上,select.select也适用于任何其他文件描述符,而不仅仅是套接字。如果有帮助的话,您可以使用不同的fd,但我认为这样做没有优势(我猜管道会更有效率,但不值得为此破坏跨平台兼容性)。 - Bharel
使用Queue存储数据并将其传递到底层套接字,是否比直接写入(和从)socketpair更好?据我所知,后者既更简单,还提供了一些合理的限制来确保底层套接字跟上正在发送的数据(取决于net.core.rmem_max)... - tobias.mcnulty
1
@tobias.mcnulty 很好的问题,已在上面回答了。 - Bharel
1
@Bharel,非常感谢您的出色回答!所有观点都很好,尤其是第5点。我可以看到,如果多个线程同时直接通过socketpair发送数据,那么非常容易发送无意义的数据到真实的套接字中。而且为每个线程维护一个socketpair听起来很麻烦。再次感谢您对这个问题的回答和响应! - tobias.mcnulty
显示剩余4条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接