Python多进程中的函数-内存使用量随每个函数调用而增加

4
我正在并行处理大量数据,第一次处理时工作正常。但是当我尝试将程序封装成函数,并使用不同的参数(例如仅处理特定年份)多次调用该函数时,内存先翻倍,然后翻三倍等,直到我的电脑内存耗尽。
我不确定发生了什么,但当我运行下面这个最简单的示例时,我会得到奇怪的多进程日志输出。基本上,如果我调用calc()函数n次,日志记录器会将每个输出显示n次。
import multiprocessing
import time
import logging

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Poison pill means shutdown
                print '%s: Exiting' % proc_name
                self.task_queue.task_done()
                break
            print '%s: %s' % (proc_name, next_task)
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return


class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
    def __str__(self):
        return '%s * %s' % (self.a, self.b)


def calc():

    multiprocessing.log_to_stderr(logging.DEBUG)
    # Establish communication queues
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Start consumers
    num_consumers = 1
    print 'Creating %d consumers' % num_consumers
    consumers = [ Consumer(tasks, results)
                  for i in xrange(num_consumers) ]
    for w in consumers:
        w.start()

    # Enqueue jobs
    num_jobs = 3
    for i in xrange(num_jobs):
        tasks.put(Task(i, i))

    # Add a poison pill for each consumer
    for i in xrange(num_consumers):
        tasks.put(None)

    # Wait for all of the tasks to finish
    tasks.join()

    # Start printing results
    while num_jobs:
        result = results.get()
        print 'Result:', result
        num_jobs -= 1


if __name__ == '__main__':
    calc()
    print '--------------------------------------------'
    print 'RUNNING SECOND TIME ALL CALLS ARE DUPLICATED'
    print '--------------------------------------------'
    calc()

记录器的输出为:
[DEBUG/MainProcess] created semlock with handle 140730532954112
[DEBUG/MainProcess] created semlock with handle 140730532921344
[DEBUG/MainProcess] created semlock with handle 140730532888576
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] created semlock with handle 140730532855808
[DEBUG/MainProcess] created semlock with handle 140730532823040
[DEBUG/MainProcess] created semlock with handle 140730532790272
[DEBUG/MainProcess] created semlock with handle 140730532757504
[DEBUG/MainProcess] created semlock with handle 140730532724736
[DEBUG/MainProcess] created semlock with handle 140730494124032
[DEBUG/MainProcess] created semlock with handle 140730494091264
[DEBUG/MainProcess] created semlock with handle 140730494058496
[DEBUG/MainProcess] Queue._after_fork()
Creating 1 consumers
[DEBUG/MainProcess] Queue._start_thread()
[DEBUG/MainProcess] doing self._thread.start()
[DEBUG/Consumer-1] Queue._after_fork()
[DEBUG/Consumer-1] Queue._after_fork()
[INFO/Consumer-1] child process calling self.run()
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()
Consumer-1: 0 * 0
[DEBUG/Consumer-1] Queue._start_thread()
[DEBUG/Consumer-1] doing self._thread.start()
[DEBUG/Consumer-1] starting thread to feed data to pipe
[DEBUG/Consumer-1] ... done self._thread.start()
Consumer-1: 1 * 1
Consumer-1: 2 * 2
Consumer-1: Exiting
[INFO/Consumer-1] process shutting down
[DEBUG/Consumer-1] running all "atexit" finalizers with priority >= 0
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 2 * 2 = 4
--------------------------------------------
RUNNING SECOND TIME ALL CALLS ARE DUPLICATED
--------------------------------------------
[DEBUG/Consumer-1] telling queue thread to quit
[DEBUG/Consumer-1] running the remaining "atexit" finalizers
[DEBUG/Consumer-1] joining queue thread
[DEBUG/Consumer-1] feeder thread got sentinel -- exiting
[DEBUG/MainProcess] created semlock with handle 140730485637120
[DEBUG/MainProcess] created semlock with handle 140730485637120
[DEBUG/MainProcess] created semlock with handle 140730485604352
[DEBUG/MainProcess] created semlock with handle 140730485604352
[DEBUG/Consumer-1] ... queue thread joined
[DEBUG/MainProcess] created semlock with handle 140730485571584
[DEBUG/MainProcess] created semlock with handle 140730485571584
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] Queue._after_fork()
[INFO/Consumer-1] process exiting with exitcode 0
[DEBUG/MainProcess] created semlock with handle 140730485538816
[DEBUG/MainProcess] created semlock with handle 140730485538816
[DEBUG/MainProcess] created semlock with handle 140730485506048
[DEBUG/MainProcess] created semlock with handle 140730485506048
[DEBUG/MainProcess] created semlock with handle 140730485473280
[DEBUG/MainProcess] created semlock with handle 140730485473280
[DEBUG/MainProcess] created semlock with handle 140730485440512
[DEBUG/MainProcess] created semlock with handle 140730485440512
[DEBUG/MainProcess] created semlock with handle 140730485407744
[DEBUG/MainProcess] created semlock with handle 140730485407744
[DEBUG/MainProcess] created semlock with handle 140730485374976
[DEBUG/MainProcess] created semlock with handle 140730485374976
[DEBUG/MainProcess] created semlock with handle 140730485342208
[DEBUG/MainProcess] created semlock with handle 140730485342208
[DEBUG/MainProcess] created semlock with handle 140730485309440
[DEBUG/MainProcess] created semlock with handle 140730485309440
[DEBUG/MainProcess] Queue._after_fork()
[DEBUG/MainProcess] Queue._after_fork()
Creating 1 consumers
[DEBUG/MainProcess] telling queue thread to quit
[DEBUG/MainProcess] telling queue thread to quit
[DEBUG/MainProcess] Queue._start_thread()
[DEBUG/Consumer-2] Queue._after_fork()
[DEBUG/MainProcess] Queue._start_thread()
[DEBUG/Consumer-2] Queue._after_fork()
[DEBUG/Consumer-2] Queue._after_fork()
[DEBUG/Consumer-2] Queue._after_fork()
[DEBUG/MainProcess] doing self._thread.start()
[INFO/Consumer-2] child process calling self.run()
[DEBUG/MainProcess] doing self._thread.start()
[INFO/Consumer-2] child process calling self.run()
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] starting thread to feed data to pipe
[DEBUG/MainProcess] ... done self._thread.start()
[DEBUG/MainProcess] ... done self._thread.start()
Consumer-2: 0 * 0
[DEBUG/MainProcess] feeder thread got sentinel -- exiting
[DEBUG/MainProcess] feeder thread got sentinel -- exiting
[DEBUG/MainProcess] joining queue thread
[DEBUG/MainProcess] joining queue thread
[DEBUG/MainProcess] ... queue thread already dead
[DEBUG/MainProcess] ... queue thread already dead
[DEBUG/Consumer-2] Queue._start_thread()
[DEBUG/Consumer-2] Queue._start_thread()
[DEBUG/Consumer-2] doing self._thread.start()
[DEBUG/Consumer-2] doing self._thread.start()
[DEBUG/Consumer-2] starting thread to feed data to pipe
[DEBUG/Consumer-2] starting thread to feed data to pipe
[DEBUG/Consumer-2] ... done self._thread.start()
[DEBUG/Consumer-2] ... done self._thread.start()
Consumer-2: 1 * 1
Consumer-2: 2 * 2
Consumer-2: Exiting
Result: 0 * 0 = 0
Result: 1 * 1 = 1
Result: 2 * 2 = 4
[INFO/Consumer-2] process shutting down
[INFO/Consumer-2] process shutting down
[DEBUG/Consumer-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Consumer-2] running all "atexit" finalizers with priority >= 0
[DEBUG/Consumer-2] telling queue thread to quit
[DEBUG/Consumer-2] telling queue thread to quit
[INFO/MainProcess] process shutting down
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] telling queue thread to quit
[DEBUG/MainProcess] telling queue thread to quit
[INFO/MainProcess] calling join() for process Consumer-2
[DEBUG/Consumer-2] running the remaining "atexit" finalizers
[INFO/MainProcess] calling join() for process Consumer-2
[DEBUG/MainProcess] feeder thread got sentinel -- exiting
[DEBUG/MainProcess] feeder thread got sentinel -- exiting
[DEBUG/Consumer-2] running the remaining "atexit" finalizers
[DEBUG/Consumer-2] feeder thread got sentinel -- exiting
[DEBUG/Consumer-2] feeder thread got sentinel -- exiting
[DEBUG/Consumer-2] joining queue thread
[DEBUG/Consumer-2] joining queue thread
[DEBUG/Consumer-2] ... queue thread joined
[DEBUG/Consumer-2] ... queue thread joined
[INFO/Consumer-2] process exiting with exitcode 0
[INFO/Consumer-2] process exiting with exitcode 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers
[DEBUG/MainProcess] running the remaining "atexit" finalizers
[DEBUG/MainProcess] joining queue thread
[DEBUG/MainProcess] joining queue thread
[DEBUG/MainProcess] ... queue thread joined
[DEBUG/MainProcess] ... queue thread joined

我需要初始化multiprocessing环境吗?还是说在主进程的循环中无法实现这一点?我使用的是Ubuntu 12.04和Python 2.7.5。


我没有看到多次调用“calc”会增加内存。 - Bakuriu
1
通常情况下,如果在某个地方有 for task in iter(tasks.get, None):,那么你会执行 tasks.put(None);否则,tasks.join() 就足够了(虽然脆弱)。 - jfs
我期望每次调用 calc() 函数都会生成相应的输出。 - jfs
@Bakuriu 不幸的是,在这个最小化的示例中,内存问题并没有真正显现出来。真正让我困惑的是,如果我有这个设置,我会认为在第一个calc()调用完成后,所有对象都将被销毁并在下一个calc()调用中重新创建。但在我的真实程序中,内存消耗在第二次运行时翻倍。如果我使用第二次运行的参数重新启动程序,它就可以正常工作,这真的很奇怪。我将尝试生成一个真正显示它的最小工作示例,但它相当复杂且涉及多个文件。 - cpaulik
1个回答

0

尝试将这行代码添加到 calc() 的末尾:

for w in consumers:
    w.join()

在可加入队列上调用join()会阻塞,直到队列中的所有内容都被消耗完,但它并不能保证子进程已经被垃圾回收。我怀疑你的子进程中有一些对象仍停留在内存中,因为它们还没有被加入。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接