如何使用joblib并行写入文件?JoinableQueue问题

4

我正在尝试将100k+个文件的计算结果写入单个文件。处理一个文件大约需要1秒钟,并将一行内容写入输出文件,问题本身是 "令人尴尬地并行" ,我只是在努力正确地将结果写入文件(例如CSV)。以下方式曾经在较长时间前 (Python 3.4?) 适用于我:

import os
from multiprocessing import Process, JoinableQueue
from joblib import Parallel, delayed

def save_to_file(q):
    with open('test.csv', 'w') as out:
        while True:
            val = q.get()
            if val is None: break
            out.write(val + '\n')
        q.task_done()

def process(x):
    q.put(str(os.getpid()) + '-' + str(x**2))

if __name__ == '__main__':
    q = JoinableQueue()
    p = Process(target=save_to_file, args=(q,))
    p.start()
    Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
    q.put(None) 
    p.join() 

现在(在Python 3.6+上),它会产生以下异常:

joblib.externals.loky.process_executor._RemoteTraceback: 
"""
(...)
RuntimeError: JoinableQueue objects should only be shared between processes through inheritance
"""

如何正确使用joblib向单个文件写入数据?
1个回答

1
原来实现该任务的一种方法是通过 multiprocessing.Manager,代码如下:
import os
from multiprocessing import Process, Manager
from joblib import Parallel, delayed

def save_to_file(q):
    with open('test.csv', 'w') as out:
        while True:
            val = q.get()
            if val is None: break
            out.write(val + '\n')

def process(x):
    q.put(str(os.getpid()) + '-' + str(x**2))

if __name__ == '__main__':
    m = Manager()
    q = m.Queue()
    p = Process(target=save_to_file, args=(q,))
    p.start()
    Parallel(n_jobs=-1)(delayed(process)(i) for i in range(100))
    q.put(None)
    p.join()

我们让Manager管理上下文,其余保持不变(除了使用普通的Queue代替JoinableQueue)。
如果有更好/更清晰的方法,我会很乐意接受它作为答案。

在代码行 q.put(str(os.getpid()) + '-' + str(x**2)) 中,q 的作用域是什么?Joblib 是否处理将整个外部作用域传递给 process,从而使得可以访问 q 对象? - Bar
跟进:我尝试传递一个文件对象而不是每次需要写入时打开“out”文件,但似乎我没有得到任何输出。有什么想法为什么会发生这种情况? - Bar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接