多进程 - 读取大型输入数据 - 程序挂起

7

我希望能够对从文件中加载的某些输入数据进行并行计算。(由于文件可能非常大,因此我使用了生成器。)

在一定数量的项目上,我的代码运行正常,但超过这个阈值后,程序会挂起(一些工作进程无法结束)。

有什么建议吗?(我正在使用python2.7、8个CPU; 5000行还可以,7500行不行。)

首先,您需要一个输入文件。在bash中生成它:

for i in {0..10000}; do echo -e "$i"'\r' >> counter.txt; done

然后运行这个:
python2.7 main.py 100 counter.txt > run_log.txt

main.py:

#!/usr/bin/python2.7
import os, sys, signal, time
import Queue
import multiprocessing as mp

def eat_queue(job_queue, result_queue):
    """Eats input queue, feeds output queue
    """
    proc_name = mp.current_process().name
    while True:
        try:
            job = job_queue.get(block=False)
            if job == None:
                print(proc_name + " DONE")
                return
            result_queue.put(execute(job))
        except Queue.Empty:
            pass    

def execute(x):
    """Does the computation on the input data
    """
    return x*x

def save_result(result):
    """Saves results in a list
    """
    result_list.append(result)

def load(ifilename):
    """Generator reading the input file and
        yielding it row by row
    """
    ifile = open(ifilename, "r")
    for line in ifile:
        line = line.strip()
        num = int(line)
        yield (num)
    ifile.close()
    print("file closed".upper())

def put_tasks(job_queue, ifilename):
    """Feeds the job queue
    """
    for item in load(ifilename):
        job_queue.put(item)
    for _ in range(get_max_workers()):
        job_queue.put(None)

def get_max_workers():
    """Returns optimal number of processes to run
    """
    max_workers = mp.cpu_count() - 2
    if max_workers < 1:
        return 1
    return max_workers

def run(workers_num, ifilename):
    job_queue = mp.Queue()
    result_queue = mp.Queue()

    # decide how many processes are to be created
    max_workers = get_max_workers()
    print "processes available: %d" % max_workers
    if workers_num < 1 or workers_num > max_workers:
        workers_num = max_workers

    workers_list = []
    # a process for feeding job queue with the input file
    task_gen = mp.Process(target=put_tasks, name="task_gen",
                          args=(job_queue, ifilename))
    workers_list.append(task_gen)

    for i in range(workers_num):
        tmp = mp.Process(target=eat_queue, name="w%d" % (i+1),
                                      args=(job_queue, result_queue))
        workers_list.append(tmp)

    for worker in workers_list:
        worker.start()

    for worker in workers_list:
        worker.join()
        print "worker %s finished!" % worker.name

if __name__ == '__main__':
    result_list = []
    args = sys.argv
    workers_num = int(args[1])
    ifilename = args[2]
    run(workers_num, ifilename)
1个回答

8
这是因为你的代码中没有从result_queue中取出任何内容。这种行为取决于内部队列缓冲区的细节:如果等待的数据“不多”,则一切看起来都正常,但如果等待的数据“很多”,则一切都会被冻结。这里涉及内部魔法的层次,所以没有更多的解释了;-) 但文档已经发出了警告:

警告

如上所述,如果子进程已经将项目放入队列(并且它没有使用JoinableQueue.cancel_join_thread),那么该进程将不会终止,直到所有缓冲的项目都已刷新到管道中。

这意味着,如果您试图加入该进程,则可能会出现死锁,除非您确定已经消耗了放在队列中的所有项目。同样,如果子进程不是守护进程,则父进程在退出时尝试加入所有非守护进程子进程时可能会挂起。

请注意,使用管理器创建的队列不会出现此问题。请参阅编程指南。

修复这个问题的一种简单方法是:首先添加:

            result_queue.put(None)

eat_queue()返回之前添加:


count = 0
while count < workers_num:
    if result_queue.get() is None:
        count += 1

在主程序.join()之前,应先清空工作队列以确保程序正常关闭。

顺便提一句,这段代码相当奇怪:

while True:
    try:
        job = job_queue.get(block=False)
        if job == None:
            print(proc_name + " DONE")
            return
        result_queue.put(execute(job))
    except Queue.Empty:
        pass

为什么要使用非阻塞式的 get()? 当队列为空时,这会变成一个占用CPU的“忙循环”。.get() 的主要作用是提供一种有效的等待工作到来的方式。因此:

while True:
    job = job_queue.get()
    if job is None:
        print(proc_name + " DONE")
        break
    else:
        result_queue.put(execute(job))
result_queue.put(None)

做同样的事情,但效率更高。

队列大小警告

虽然您没有询问这个问题,但在它影响您之前让我们谈一下;-)默认情况下, Queue 没有大小限制。例如,如果向 Queue 添加十亿个项目,则它将需要足够的RAM来保存十亿个项目。因此,如果您的生产者可以比消费者更快地生成工作项,存储器使用量很快就会失控。

幸运的是,这很容易修复:指定最大队列大小即可。例如,

    job_queue = mp.Queue(maxsize=10*workers_num)
                         ^^^^^^^^^^^^^^^^^^^^^^^

那么job_queue.put(some_work_item)将会阻塞直到消费者减少队列大小到小于最大值。这样,您可以使用仅需要微不足道的RAM的队列处理巨大的问题。


谢谢!现在程序已经正确运行了。感谢所有的建议。(我在处理输入时实际上已经阅读了文档的这一部分,但在输出队列的情况下没有意识到这一点。)为什么我使用了 blocks=false:我只是从各种代码片段中编译了代码的方案。但我不确定它实际上是做什么的 - 我很困惑(即使在这种情况下也是如此):如果它是 false,那么要读取输入的子进程不会等待下一个项目到来?(如果是这样,会发生什么?) - galapah
1
block=False 表示调用立即返回,无论队列中是否有任何内容。一般情况下,忽略 "block" 和 "timeout" 参数。对于绝大多数任务来说,这些东西都可以完美地使用,而使用它们可能会导致各种问题。需要很多经验才能知道何时需要非阻塞或超时调用 - 即使是那些有经验的人也不一定需要。;-) - Tim Peters

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接