更改多进程队列中的缓冲区大小

9
所以我有一个生产者和一个消费者通过一个无限大小的队列连接在一起,但如果消费者反复调用get直到抛出Empty异常,则不会清除队列。
我认为这是因为在消费者端的队列中将对象序列化到套接字的线程一旦套接字缓冲区已满就会被阻塞,因此它会等待直到缓冲区有空间,然而,消费者可能会“过快”调用get,因此它认为队列为空,而实际上另一侧的线程有更多的数据要发送,但是不能序列化得足够快以防止套接字对消费者出现为空的情况。
我认为如果我可以更改底层套接字的缓冲区大小(我基于Windows),那么这个问题就会得到缓解。据我所见,我需要做的就是:
import multiprocessing.connections as conns
conns.BUFSIZE = 2 ** 16 # is typically set as 2 ** 13 for windows
import multiprocessing.Queue as q

如果我执行上述操作,那么当多进程初始化队列时,它会使用我已经导入的multiprocessing.connections版本中设置的新缓冲区大小,这是正确的吗?
此外,我相信这只会影响Windows系统,因为BUFSIZE在Linux机器上不被使用,因为它们所有的套接字默认都设置为60千字节?
有人之前尝试过这个吗?这对Windows会产生什么副作用?在Windows上套接字缓冲区大小的基本限制是什么?
===================一个代码示例===================
# import multiprocessing.connection as conn
# conn.BUFSIZE = 2 ** 19
import sys
import multiprocessing as mp
from Queue import Empty
from time import sleep

total_length = 10**8

def supplier(q):
    print "Starting feeder"
    for i in range(total_length) :
        q.put(i)


if __name__=="__main__":

    queue = mp.Queue()

    p = mp.Process(target=supplier, args=(queue,))

    p.start()

    sleep(120)

    returned = []
    while True :
        try :
            returned.append(queue.get(block=False))
        except Empty :
            break

    print len(returned)
    print len(returned) == total_length

    p.terminate()
    sys.exit()

当在Windows上运行时,这个样例通常只会从队列中获取大约16万个项目,因为主线程可以更快地清空缓冲区,而供应者重新填充缓冲区的速度较慢,最终当缓冲区为空时,它会尝试从队列中获取数据并报告它为空。
理论上,您可以通过增加缓冲区大小来缓解这个问题。我相信,在Windows系统上,顶部的两行代码将增加管道的默认缓冲区大小。
如果您取消对它们的注释,则此脚本将在退出之前拉取更多的数据,因为它有一个更高的缓冲区大小。我的主要问题是: 1)这是否真的有效。 2)是否有办法使此代码在Windows和Linux中使用相同大小的底层缓冲区。 3)设置管道的大缓冲区大小是否有任何意想不到的副作用。
我知道一般来说,没有办法知道是否已经从队列中获取了所有数据(鉴于供应者永久运行,并且产生的数据非常不均匀),但是我正在寻找改进的方法,以尽力而为。

1
我认为提供一个演示这个问题的小例子会很有用。 - pradyunsg
1个回答

8

更新:

对于未来需要使用的Windows管道,以下链接是由OP phil_20686提供的有用链接: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx

原文:

仅当平台为win32时,BUFSIZE才起作用。

multiprocessing.Queue建立在Pipe之上,如果更改BUFSIZE,则您生成的Queue将使用更新的值。请参见下面:

class Queue(object):

    def __init__(self, maxsize=0):
        if maxsize <= 0:
            maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
        self._maxsize = maxsize
        self._reader, self._writer = Pipe(duplex=False)

当平台为win32时,管道代码将调用以下代码:
def Pipe(duplex=True):
    '''
    Returns pair of connection objects at either end of a pipe
    '''
    address = arbitrary_address('AF_PIPE')
    if duplex:
        openmode = win32.PIPE_ACCESS_DUPLEX
        access = win32.GENERIC_READ | win32.GENERIC_WRITE
        obsize, ibsize = BUFSIZE, BUFSIZE
    else:
        openmode = win32.PIPE_ACCESS_INBOUND
        access = win32.GENERIC_WRITE
        obsize, ibsize = 0, BUFSIZE

    h1 = win32.CreateNamedPipe(
        address, openmode,
        win32.PIPE_TYPE_MESSAGE | win32.PIPE_READMODE_MESSAGE |
        win32.PIPE_WAIT,
        1, obsize, ibsize, win32.NMPWAIT_WAIT_FOREVER, win32.NULL
        )

您可以看到,当 duplex 为 False 时,outbuffer 的大小为0,inbuffer 的大小为 BUFSIZE。
inbuffer 是为输入缓冲区保留的字节数。2 ** 16 = 65536,这是在不阻塞的情况下可以写入的最大字节数,但缓冲区大小的容量因系统而异,在同一系统上也会有所变化,因此很难确定设置管道的最大量时的副作用。

在 https://msdn.microsoft.com/en-us/library/windows/desktop/aa365150(v=vs.85).aspx 的备注部分有关于Windows管道行为的一些重要/好的信息。如果您添加一个链接/摘要,我将接受这个答案。 - phil_20686
@phil_20686,我更新了我的答案,并在那里发布了你提供的链接。 - Haifeng Zhang
已接受此答案,但顺便提一下,栈溢出的礼仪是要总结链接内容,以避免在未来几年中网站搬迁等情况下出现链接失效。谢谢。 - phil_20686

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接