Python多进程:写入同一个Excel文件

5

我是 Python 的新手,我正在尝试将五个不同进程的结果保存到一个 Excel 文件中(每个进程都写入不同的工作表)。我已经阅读了这里的不同帖子,但仍然无法完成,因为我对于 pool.map、队列和锁非常困惑,而且我不确定在这里需要什么来完成此任务。 以下是目前我的代码:

list_of_days = ["2017.03.20", "2017.03.21", "2017.03.22", "2017.03.23", "2017.03.24"]
results = pd.DataFrame()

if __name__ == '__main__':
    global list_of_days
    writer = pd.ExcelWriter('myfile.xlsx', engine='xlsxwriter')
    nr_of_cores = multiprocessing.cpu_count()
    l = multiprocessing.Lock()
    pool = multiprocessing.Pool(processes=nr_of_cores, initializer=init, initargs=(l,))
    pool.map(f, range(len(list_of_days)))
    pool.close()
    pool.join()

def init(l):
    global lock
    lock = l

def f(k):
    global results

    *** DO SOME STUFF HERE***

    results = results[ *** finished pandas dataframe *** ]

    lock.acquire()
    results.to_excel(writer, sheet_name=list_of_days[k])
    writer.save()
    lock.release()

结果是只有一个工作表被创建在Excel中(我认为这是最后完成的进程)。关于这段代码,有一些问题:
  • 如何避免定义全局变量?
  • 是否可能传递数据框?
  • 我应该将锁定移到主要部分吗?
非常感谢您的参与,因为我认为掌握多进程对我至关重要。谢谢

如果您同时启动多个进程,您将遇到文件锁定问题,每个进程都试图同时访问同一个文件。 它并不重要它们是不同的表格,它仍然是同一个文件。 此外,您的代码编写方式会导致每次都覆盖myfile.xlsx - James
是的,当然。我需要将锁放置在正确的位置,以便只有一个进程向文件写入。关于您的第二点:您是正确的,我已经将写入器从f(k)中移除了,现在它只在主函数中,这应该可以防止每次覆盖文件,但是输出仍然只包含一个表格。 - SuperMartingale
1个回答

3

1) 为什么在第二种方法中你在几个地方都使用了time.sleep?

__main__中,time.sleep(0.1)是为了给启动的process一个时间片来启动。
f2(fq, q)中,是为了给queue一个时间片来刷新所有缓冲数据到管道中,并且使用了q.get_nowait()
w(q)中,只是用于测试模拟长时间运行writer.to_excel(...),我已经删除了这个。

2) pool.map和pool = [mp.Process( . )]之间有什么区别?

使用pool.map不需要Queue,不需要传递参数,代码较短。 worker_process必须立即返回result并终止。 pool.map在所有iteration完成时启动一个新的进程。 然后必须处理results

使用pool = [mp.Process( . )]会启动nprocessesprocess将在queue.Empty时终止。

你能想到一个情况,在这种情况下你会更喜欢其中一种方法吗?

方法1:快速设置,串行化,只关心结果以继续执行。
方法2:如果您想要并行处理所有工作负载。


在进程中不能使用global writer
writer实例必须属于一个process

例如使用mp.Pool

def f1(k):
  # *** DO SOME STUFF HERE***
  results = pd.DataFrame(df_)
  return results

if __name__ == '__main__':
    pool = mp.Pool()
    results = pool.map(f1, range(len(list_of_days)))

    writer = pd.ExcelWriter('../test/myfile.xlsx', engine='xlsxwriter')
    for k, result in enumerate(results):
        result.to_excel(writer, sheet_name=list_of_days[k])

    writer.save()
    pool.close()

这导致在__main__进程中按顺序调用.to_excel(...)

如果您想要并行使用.to_excel(...),则需要使用mp.Queue()
例如:

worker进程:

# mp.Queue exeptions have to load from
try:
    # Python3
    import queue
except:
    # Python 2
    import Queue as queue

def f2(fq, q):
    while True:
        try:
            k = fq.get_nowait()
        except queue.Empty:
            exit(0)

        # *** DO SOME STUFF HERE***

        results = pd.DataFrame(df_)
        q.put( (list_of_days[k], results) )
        time.sleep(0.1)  

writer 进程:

def w(q):
    writer = pd.ExcelWriter('myfile.xlsx', engine='xlsxwriter')
    while True:
        try:
            titel, result = q.get()
        except ValueError:
            writer.save()
            exit(0)

        result.to_excel(writer, sheet_name=titel)
< p > __main__进程:
if __name__ == '__main__':
    w_q = mp.Queue()
    w_p = mp.Process(target=w, args=(w_q,))
    w_p.start()
    time.sleep(0.1)

    f_q = mp.Queue()
    for i in range(len(list_of_days)):
        f_q.put(i)

    pool = [mp.Process(target=f2, args=(f_q, w_q,)) for p in range(os.cpu_count())]
    for p in pool:
        p.start()
        time.sleep(0.1)

    for p in pool:
        p.join()

    w_q.put('STOP')
    w_p.join()

测试过的Python版本:3.4.2 - pandas版本:0.19.2 - xlsxwriter版本:0.9.6


感谢@stovfl!你的两种方法都很好用。有趣的是,我们可以通过使用队列来避免锁定。问题:1)为什么在第二种方法中你在几个地方都实现了time.sleep? 2)pool.map和pool = [mp.Process( . )]之间有什么区别?你能想到一个情况,在这两种方法中你更喜欢哪一种吗? - SuperMartingale
1
@SuperMartingale:更新回答。请汇报一下:您有多少个“进程”,在没有使用多进程的情况下,您的节约时间有多大。请在问题中更新此信息。 - stovfl

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接