使用队列实现Python多进程同时写入同一文件

9

我知道在 Stack Exchange 上有许多与将多进程写入单个文件相关的帖子,我在阅读了这些帖子后开发了我的代码。我想要实现的是并行运行'RevMapCoord'函数,并使用multiprocess.queue将其结果写入一个单一文件中。但是我在将作业加入队列时遇到了问题。我的代码:

def RevMapCoord(list):
    "Read a file, Find String and Do something"

def feed(queue, parlist):
    for par in parlist:
        print ('Echo from Feeder: %s' % (par))
        queue.put(par)
    print ('**Feeder finished queing**')

def calc(queueIn, queueOut):
     print ('Worker function started')
     while True:
         try:
             par = queueIn.get(block = False)
             res = RevMapCoord(final_res)
             queueOut.put((par,res))
         except:
             break

def write(queue, fname):
    fhandle = open(fname, "w")
    while True:
         try:
            par, res = queue.get(block = False)
            print >>fhandle, par, res
         except:
            break
    fhandle.close()


feedProc = Process(target = feed , args = (workerQueue, final_res))
calcProc = [Process(target = calc , args = (workerQueue, writerQueue)) for i in range(nproc)]
writProc = Process(target = write, args = (writerQueue, sco_inp_extend_geno))

feedProc.start()
print ('Feeder is joining')
feedProc.join ()
for p in calcProc:
    p.start()
for p in calcProc:
    p.join()
writProc.start()
writProc.join ()

当我运行这段代码时,脚本会在“feedProc.start()”这一步卡住。屏幕上最后几行输出显示了“feedProc.start()”末尾的打印语句。
Echo from Feeder: >AK779,AT61680,50948-50968,50959,6,0.406808,Ashley,Dayne
Echo from Feeder: >AK832,AT30210,1091-1111,1102,7,0.178616,John,Caine
**Feeder finished queing**

但在执行下一行"feedProc.join()"之前挂起了。代码没有报错,但什么也没做(挂起)。请告诉我我犯了什么错误。

2个回答

11

我认为你应该将示例简化到基础部分。例如:

from multiprocessing import Process, Queue

def f(q):
    q.put('Hello')
    q.put('Bye')
    q.put(None)

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    with open('file.txt', 'w') as fp:
        while True:
            item = q.get()
            print(item)
            if item is None:
                break
            fp.write(item)
    p.join()

我这里有两个进程(主进程和一个p)。p将字符串放入队列中,由主进程检索。当主进程找到None(我正在使用的哨兵来表示:“我完成了”),它会退出循环。

将其扩展到多个进程(或线程)非常简单。


2
你应该尝试运行你的示例(它会出现错误)。你不能以这种方式将多个项目放入队列中。实际上,你只是放了一个单独的项目 - 一个列表。 - Gerrat
类型错误:期望字符缓冲区对象。我遇到了错误 :| - b1_
1
@b1- 新的(并且正确的,感谢Gerrat)版本适用于Python 2.7.5和3.2.3。试试看吧! - Hernan

0

我使用Python3中的'map_async'函数将多进程写入结果到单个文件中。以下是我编写的函数:

def PPResults(module,alist):##Parallel processing
    npool = Pool(int(nproc))    
    res = npool.map_async(module, alist)
    results = (res.get())###results returned in form of a list 
    return results

所以,我提供了一个带有'a_list'参数列表的函数,并且'module'是一个进行处理并返回结果的函数。上面的函数会持续收集结果并以列表形式返回,直到所有来自'a_list'的参数都被处理完毕。结果可能不是正确的顺序,但由于顺序对我来说并不重要,这个方法很有效。'result'列表可以迭代,将各个结果写入文件中,例如:
fh_out = open('./TestResults', 'w')
for i in results:##Write Results from list to file
    fh_out.write(i)

为了保持结果的顺序,我们可能需要使用“队列”,类似于我在问题中提到的(上面)。虽然我能够修复代码,但我相信没有必要在这里提及。
谢谢
AK

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接