使用多进程写入文件

16
我在Python中遇到了如下问题。我需要并行进行一些计算,将结果顺序地写入文件。因此,我创建了一个函数,该函数接收一个multiprocessing.Queue和一个文件句柄,执行计算并将结果打印到文件中:
import multiprocessing
from multiprocessing import Process, Queue
from mySimulation import doCalculation   

# doCalculation(pars) is a function I must run for many different sets of parameters and collect the results in a file

def work(queue, fh):
while True:
    try:
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
    except:
        break


if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fh = open("foo", "w")
    workQueue = Queue()
    parList = # list of conditions for which I want to run doCalculation()
    for x in parList:
        workQueue.put(x)
    processes = [Process(target = writefh, args = (workQueue, fh)) for i in range(nthreads)]
    for p in processes:
       p.start()
    for p in processes:
       p.join()
    fh.close()

脚本运行后,文件却变成了空的。我尝试修改worker()函数为:

def work(queue, filename):
while True:
    try:
        fh = open(filename, "a")
        parameter = queue.get(block = False)
        result = doCalculation(parameter) 
        print >>fh, string
        fh.close()
    except:
        break

将文件名作为参数传递。然后它按照我预期的方式工作。当我尝试顺序执行相同的操作,而没有使用多进程时,它也正常工作。

为什么第一个版本没有起作用?我看不出问题所在。

另外:我能保证两个进程不会同时尝试写入文件吗?


编辑:

谢谢。现在我明白了。这是可行的版本:

import multiprocessing
from multiprocessing import Process, Queue
from time import sleep
from random import uniform

def doCalculation(par):
    t = uniform(0,2)
    sleep(t)
    return par * par  # just to simulate some calculation

def feed(queue, parlist):
    for par in parlist:
            queue.put(par)

def calc(queueIn, queueOut):
    while True:
        try:
            par = queueIn.get(block = False)
            print "dealing with ", par, "" 
            res = doCalculation(par)
            queueOut.put((par,res))
        except:
            break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
        try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
        except:
            break
    fhandle.close()

if __name__ == "__main__":
    nthreads = multiprocessing.cpu_count()
    fname = "foo"
    workerQueue = Queue()
    writerQueue = Queue()
    parlist = [1,2,3,4,5,6,7,8,9,10]
    feedProc = Process(target = feed , args = (workerQueue, parlist))
    calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nthreads)]
    writProc = Process(target = write, args = (writerQueue, fname))


    feedProc.start()
    for p in calcProc:
        p.start()
    writProc.start()

    feedProc.join ()
    for p in calcProc:
        p.join()
    writProc.join ()

3
请专注于一组代码,只保留必要的并且相关的代码,避免使用“编辑”,确保问题清晰、完整和一致。 - S.Lott
3个回答

20

你确实应该使用两个队列和三种不同的处理方式。

  1. 将东西放入队列 #1。

  2. 从队列 #1 中取出东西并进行计算,将东西放入队列 #2。由于它们能够安全地从一个队列中获取,并将数据放入另一个队列中,因此您可以有很多这样的操作。

  3. 从队列 #2 中取出东西并将其写入文件。必须恰好有一个此类操作,不能再多。它“拥有”该文件,保证原子访问,并绝对确保文件被清洁、一致地编写。


1
工作者和消费者队列都要加1。记得在队列上设置maxsize,否则你的工作者可能会耗尽内存并使写入程序停滞。 - Bittrance
1
哦,算了那个多次运行的问题吧...我太蠢了,没有注意到我启动了feedProc和writProc多次。¬¬ 我已经纠正了代码。但是我仍然得到了一个空文件。 - Rafael S. Calsaverini
好的。我想我明白了。在我有任何结果之前,我启动了写入文件的进程...现在它可以工作了。谢谢。 - Rafael S. Calsaverini

9
如果有人正在寻找一种简单的方法来做同样的事情,这可以帮助你。我不认为用这种方式做有任何缺点,如果有,请告诉我。
import multiprocessing 
import re

def mp_worker(item):
    # Do something
    return item, count

def mp_handler():
    cpus = multiprocessing.cpu_count()
    p = multiprocessing.Pool(cpus)
    # The below 2 lines populate the list. This listX will later be accessed parallely. This can be replaced as long as listX is passed on to the next step.
    with open('ExampleFile.txt') as f:
        listX = [line for line in (l.strip() for l in f) if line]
    with open('results.txt', 'w') as f:
        for result in p.imap(mp_worker, listX):
            # (item, count) tuples from worker
            f.write('%s: %d\n' % result)

if __name__=='__main__':
    mp_handler()

来源: Python:使用多进程池时使用队列写入单个文件


这篇文章讨论了如何在使用Python的multiprocessing Pool模块时,将多个进程写入同一个文件。使用队列和锁可以确保线程安全,避免数据丢失或重复。

这是我发现的一般使用 multiprocessing 最简单的方法。非常感谢你! - blehblehblecksheep

0

写 worker 代码中存在错误,如果 block 为 false,则 worker 永远无法获取任何数据。应如下所示:

par, res = queue.get(block = True)

您可以通过添加行来检查它

 print "QSize",queueOut.qsize()

queueOut.put((par,res))之后

如果使用block=False,队列的长度会不断增加直到填满,而使用block=True时,始终只有“1”。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接