Python多进程队列错误

5
我有这个Python代码来读取文件,进行一些处理,并并行写入结果:
def line_chunker(path):
    """
    Reads a file in chunks and yields each chunk.
    Each chunk is guaranteed to end at a carriage return (EOL).
    Each chunk is returned a single string.

    The number of chunks the file is split into is equal to the number of CPU cores
    available
    """
    size = os.path.getsize(path)
    cores = mp.cpu_count()
    chunksize = size/cores # gives truncated integer

    f = open(path)
    s = f.readline() # skip header
    while True:
        part = f.readlines(chunksize)
        if not part:
            f.close()
            break
        else:
            yield "".join(part)
    f.close()

def _validate(chunk, outq):
    """ Performs format validation on a given chunk of a csv file """
    rows = csv.reader(StringIO(chunk))
    vld = validation.RowValidator(rows)
    vld.check_rows()
    outq.put(vld.errors)

def _write(outq):
    """Writes lines in the outq to a text file """
    outfile = open("C:/testoutput.txt", "w")
    while True:
        result = outq.get()
        if result is None:
            outfile.close()
            break
        else:
            for line in result:
                outfile.write(line)
                outfile.write("\n")

def validate_pll(path):    
    """ Perform validation in parallel """

    pool = mp.Pool()
    outq = mp.Manager().Queue(maxsize = 8)

    writer = mp.Process(target = _write, args = (outq,))
    writer.start()
    for chunk in line_chunker(path):
        pool.apply_async(_validate, (chunk, outq))

    pool.close()
    pool.join()

它会分块读取文件,对于每个块启动一个进程进行处理。处理结果被放置在一个队列中,由另一个进程监视。
代码可以运行,但是完成后出现了奇怪的EOFError错误。
我怀疑这是因为我没有调用writer.join(),但是如果我添加这行代码,就像这样:
def validate_pll(path):    
    """ Perform validation in parallel """

    pool = mp.Pool()
    outq = mp.Manager().Queue(maxsize = 8)

    writer = mp.Process(target = _write, args = (outq,))
    writer.start()
    for chunk in line_chunker(path):
        pool.apply_async(_validate, (chunk, outq))

    pool.close()
    pool.join()
    writer.join()

代码卡住了。你有什么想法是我做错了什么吗?
给出的错误信息如下:
Process Process-10:
Traceback (most recent call last):
    File C\Anaconda\lib\multiprocessing\process.py, line 258, in _bootstrap
        self.run()
    File C\Anaconda\lib\multiprocessing\process.py line 114, in run
       self._target(*self._args, **self._kwargs)
    File C:\SVN\PortfolioInspector\trunk\parallel.py, line 114 in _write
       result = outq.get()
    File "(string)", line 2, in get
    File C\Anaconda\lib\multiprocessing\managers.py, line 759, in _callmethod
        kind, result = conn.recv()
EOFError
1个回答

9
_writer 进程在主进程结束时仍在等待写入 outq 的条目。它通过打开到管理共享 QueueManager 进程的阻塞连接来等待条目。现在,在主进程完成执行时,Manager 进程关闭,这会向 _writer 打开的连接发送 EOF,因此您会看到该异常。
要解决此问题,您需要在主进程结束之前告诉 _writer 关闭(以及扩展到 Manager 进程关闭)。实际上,您已经有了一个机制来解决此问题,只是没有使用它;发送 Noneoutq_writer 将进行有序关闭。在调用 writer.join() 之前执行此操作,事情应该可以正常工作。

当然啦。我想我之前认为队列如果没有内容会自动返回None,但是如果你在一个空的队列上调用get()方法,它只会等待有东西可以返回。我在pool.close()之前添加了一个outq.put(None),现在它可以正常工作了。 - jramm
如果我的 outq 是一个字典,那么发送 None 到我的字典仍然有效吗? - R_Moose

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接