多进程队列的实现和普通队列的实现

19

我正在寻找有关Python中队列实现的更多见解,而不仅仅是文档中提供的内容。

根据我所理解的(如果我错了,请原谅我的无知):

queue.Queue():通过基本的内存数组实现,因此不能在多个进程之间共享,但可以在线程之间共享。

multiprocessing.Queue():通过管道(man 2 pipes)实现,具有大小限制(相当小:man 7 pipe在Linux上说,未调整为65536字节):

自Linux 2.6.35以来,默认管道容量为65536字节,但可以使用fcntl(2)F_GETPIPE_SZF_SETPIPE_SZ操作进行查询和设置容量。

但是,在Python中,无论何时尝试将数据写入管道中,如果其大小大于65536字节,则会正常工作 - 这种情况下可能会将内存填满:

import multiprocessing
from time import sleep

def big():
    result = ""
    for i in range(1,70000):
        result += ","+str(i)
    return result # 408888 bytes string

def writequeue(q):
    while True:
        q.put(big())
        sleep(0.1)

if __name__ == '__main__':
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=writequeue, args=(q,))
    p.start()
    while True:
        sleep(1) # No pipe consumption, we just want to flood the pipe

所以这是我的问题:

  • Python是否调整了管道限制?如果是,增加了多少?欢迎提供Python源代码。

  • Python的管道通信是否与其他非Python进程兼容?如果是,欢迎提供工作示例(最好是JS)和资源链接。


1
你可以看一下这个模块的代码 :) 这是了解事物如何运作和查看优秀Python代码的最佳方式之一。 - cgte
路易斯提供了一份详尽的答案,关于最后一部分,如果您对两个程序之间的通信感兴趣(无论它们使用什么语言),您可能需要查看实现AMQP协议rabbitmqzeromq...等)的代理。 - Adonis
消息代理在每秒需要执行数千个函数调用时比直接文件描述符消耗要慢得多...因此,人们对Python的数据格式互操作性感兴趣(我猜它会“pickle”对象...) - Fabien
1个回答

20

为什么q.put()不会阻塞??

multiprocessing.Queue创建了一个管道,如果管道已满,则会阻塞。当然,写入超过管道容量将导致write调用阻塞,直到读取端清除足够的数据。好的,那么如果管道在达到容量时阻塞,为什么q.put()一旦管道已满也不会阻塞呢?即使示例中对q.put()的第一次调用应该填满管道,所有操作都应该在那里阻塞,不是吗?

不,它不会阻塞,因为multiprocessing.Queue实现将.put()方法与管道写入分离。 .put()方法将传递给它的数据排队在内部缓冲区中,有一个单独的线程负责从此缓冲区读取并写入管道。当管道已满时,此线程将阻塞,但它不会阻止.put()将更多数据排队到内部缓冲区中。

实现.put()将数据保存到self._buffer,如果没有正在运行的线程,则启动一个线程:
def put(self, obj, block=True, timeout=None):
    assert not self._closed
    if not self._sem.acquire(block, timeout):
        raise Full

    with self._notempty:
        if self._thread is None:
            self._start_thread()
        self._buffer.append(obj)
        self._notempty.notify()

._feed()方法是从self._buffer读取数据并将其提供给管道的方法。而._start_thread()则是设置运行._feed()的线程的方法。

如何限制队列大小?

如果您想限制可以写入队列的数据量,我没有看到通过指定字节数来实现它的方法,但您可以通过向multiprocessing.Queue传递一个数字来限制在任何时候存储在内部缓冲区中的项目数:

q = multiprocessing.Queue(2)

当我使用上述参数并使用您的代码时,q.put()会将两个项目排队,并在第三次尝试时阻塞。
Python管道通信是否与其他非Python进程兼容?
这取决于情况。 multiprocessing模块提供的功能不易与其他语言交互。我认为,使multiprocessing与其他语言互操作是可能的,但实现这一目标将是一个重大的企业。该模块编写时预期涉及的进程正在运行Python代码。
如果您查看更普遍的方法,则答案是肯定的。您可以使用套接字作为两个不同进程之间的通信管道。例如,读取具有命名套接字的JavaScript进程:
var net = require("net");
var fs = require("fs");

sockPath = "/tmp/test.sock"
try {
    fs.unlinkSync(sockPath);
}
catch (ex) {
    // Don't care if the path does not exist, but rethrow if we get
    // another error.
    if (ex.code !== "ENOENT") {
        throw ex;
    }
}

var server = net.createServer(function(stream) {
  stream.on("data", function(c) {
    console.log("received:", c.toString());
  });

  stream.on("end", function() {
    server.close();
  });
});

server.listen(sockPath);

还有一个写入它的Python进程:

import socket
import time

sockfile = "/tmp/test.sock"

conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
conn.connect(sockfile)

count = 0
while True:
    count += 1
    conn.sendall(bytes(str(count), "utf-8"))
    time.sleep(1)

如果您想尝试上述方法,需要先启动JavaScript端,以便Python端有东西可以写入。这只是一个概念验证。完整的解决方案需要更多的改进。
为了将复杂结构从Python传递到其他语言,您需要找到一种方法将数据序列化为可以在两个端点读取的格式。腌制品很不幸是特定于Python的。每当我需要在语言之间进行序列化时,我通常选择JSON,或者使用自定义格式(如果JSON无法胜任)。

你的回答很有见地,我因此点赞。问题的第二部分是关于如何与其他使用不同编程语言构建的进程进行队列互操作。例如:使用类似兼容的put()get()操作将字符串从/到Python传递给Javascript进程。或者Python是否使用了不可靠的格式来传递对象?如果有必要,可以随时更新你的答案以涵盖另一个部分。 - Fabien
1
抱歉,我确实漏掉了那一部分。我已经编辑了我的答案来解决这个问题。 - Louis
这对我来说听起来相当不错,谢谢。我会等一段时间,以确保公正,然后批准答案,如果有人愿意分享自己的观点作为答案。 - Fabien
我错过了赏金截止日期,否则我会进行验证。很抱歉给您带来不便,我在现实生活中很忙。您的答案非常好,所以我相信它的价值超过了一些声望点数 :-) - Fabien
这将限制队列为两个项目,但仍不限制队列的大小。 - laocius
@laocius,答案中已经承认了这一点:“如果您想限制可以写入队列的数据量,我不认为有一种方法可以通过指定字节数来实现,但是您可以通过向multiprocessing.Queue传递数字来限制在任何时候存储在内部缓冲区中的项目数。” - Louis

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接