Python -> 多进程模块

6

这是我想要实现的目标:

  1. 我有大约一百万个文件,需要解析并将已解析的内容附加到一个单独的文件中。
  2. 由于单个进程需要很长时间,因此此选项不可行。
  3. 不使用Python中的线程,因为它实际上只运行单个进程(由于GIL)。
  4. 因此使用multiprocessing模块。即生成4个子进程以利用所有原始核心能力:)

到目前为止还不错,现在我需要一个所有子进程都可以访问的共享对象。我正在使用multiprocessing模块中的Queues。此外,所有子进程都需要将其输出写入单个文件。可能需要使用锁定的地方。使用此设置运行时,我没有收到任何错误(因此父进程似乎正常),它只是停滞不前。当我按下ctrl-C时,我会看到一个回溯(每个子进程一个)。输出文件中也没有输出。以下是代码(请注意,如果没有多进程,一切都正常运行)-

import os
import glob
from multiprocessing import Process, Queue, Pool

data_file  = open('out.txt', 'w+')

def worker(task_queue):
    for file in iter(task_queue.get, 'STOP'):
        data = mine_imdb_page(os.path.join(DATA_DIR, file))
        if data:
            data_file.write(repr(data)+'\n')
    return

def main():
    task_queue = Queue()
    for file in glob.glob('*.csv'):
        task_queue.put(file)
    task_queue.put('STOP') # so that worker processes know when to stop

    # this is the block of code that needs correction.
    if multi_process:
        # One way to spawn 4 processes
        # pool = Pool(processes=4) #Start worker processes
        # res  = pool.apply_async(worker, [task_queue, data_file])

        # But I chose to do it like this for now.
        for i in range(4):
            proc = Process(target=worker, args=[task_queue])
            proc.start()
    else: # single process mode is working fine!
        worker(task_queue)
    data_file.close()
    return

我做错了什么?我也尝试在生成进程时将打开的文件对象传递给每个进程。但是没有效果。例如- Process(target=worker, args=[task_queue, data_file])。但这并没有改变任何事情。我觉得子进程由于某种原因无法写入文件。要么file_object实例没有被复制(在生成时),要么其他问题......有人有想法吗?
额外信息:还有没有办法保持一个持久的mysql_connection连接并将其传递给子进程?所以我在父进程中打开了一个mysql连接,所有子进程都可以访问打开的连接。基本上这相当于Python中的共享内存。这里有什么想法吗?

如果你不写入文件,而是使用print语句,那么它能正常工作吗?(在Linux上,我会使用python script.py > out.dat来防止屏幕泛滥)。 - extraneon
1
我认为proc.start是非阻塞的,所以你可能应该在某个地方等待一下,让进程有机会在执行datafile.close()之前完成一些工作。 - extraneon
@extraneon:发现得好,但如果程序试图写入关闭的文件,则应引发异常。 - badp
现在我正在尝试将数据写入文件,但如果由于某些原因无法成功,我想要将数据写入 MySQL(不进行读取)... - Srikar Appalaraju
我会在线程中解析文件,然后将信息发送回另一个“队列”中的主进程。然后我会在主线程中存储信息,从而避免传递文件句柄的问题。不过,我假设“mine_imdb_page”是您的瓶颈,而不是写入。哦,对于挖掘旧话题,我很抱歉。 - Opal
显示剩余2条评论
2个回答

4
虽然与Eric的讨论是富有成果的,但事后我发现了更好的方法。在multiprocessing模块中,有一个名为“Pool”的方法非常适合我的需求。
它会自动优化到我的系统核心数。也就是说,只会生成与核心数量相同的进程。当然,这是可以定制的。以下是代码,希望对其他人有所帮助-
from multiprocessing import Pool

def main():
    po = Pool()
    for file in glob.glob('*.csv'):
        filepath = os.path.join(DATA_DIR, file)
        po.apply_async(mine_page, (filepath,), callback=save_data)
    po.close()
    po.join()
    file_ptr.close()

def mine_page(filepath):
    #do whatever it is that you want to do in a separate process.
    return data

def save_data(data):
    #data is a object. Store it in a file, mysql or...
    return

我还在研究这个庞大的模块。不确定是父进程执行save_data()函数,还是派生子进程使用该函数。如果是子进程保存数据,可能会在某些情况下导致并发问题。如果有人在使用此模块方面有更多经验,欢迎分享知识...


3

多进程文档提供了几种在进程间共享状态的方法:

http://docs.python.org/dev/library/multiprocessing.html#sharing-state-between-processes

我确定每个进程都会得到一个新的解释器,然后目标(函数)和参数会被加载到其中。在这种情况下,来自您脚本的全局命名空间将绑定到您的工作函数,因此数据文件将在那里。但是,我不确定文件描述符在复制过程中会发生什么。您是否尝试将文件对象作为其中之一的参数传递?
另一种选择是传递另一个队列,用于保存工作程序的结果。工作程序将结果放入队列中,主代码获取结果并将其写入文件。

是的!我可以这样做。我可以有另一个队列,类似于进程写入的out_queue。由于父进程可以访问它,因此它可以不断读取此队列并写入文件。这可能有效!另外,我尝试将文件对象作为其中一个参数传递。它似乎不起作用。线程没有写入文件。还有Eric,有什么办法将持久的mysql连接传递给子进程吗? - Srikar Appalaraju
@Srikar,希望这能帮到你。至于mysql连接,我不确定。我会建议每个进程都使用单独的连接。即使你可以共享一个连接,我也不确定它有多“线程安全”。如果你真的必须共享一个连接,那么你可能需要做一些奇怪的事情。另外,你也可以将连接的查询/响应机制代理到队列中。然后主进程(或单独的mysql处理程序进程)从队列中获取查询,运行它们,并将结果放回...或者类似这样的操作。 - Eric Snow

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接