使用生成器进行Python多进程处理

32

我正在尝试处理一个文件(每行都是一个json文档)。该文件的大小可以达到数百mb到gb级别。因此,我编写了一个生成器代码,逐行从文件中获取每个文档。

def jl_file_iterator(file):
    with codecs.open(file, 'r', 'utf-8') as f:
        for line in f:
            document = json.loads(line)
            yield document

我的系统有4个核心,因此我想并行处理文件中的4行。目前我有这段代码,它一次取4行,并调用并行处理的代码。

threads = 4
files, i = [], 1
for jl in jl_file_iterator(input_path):
    files.append(jl)
    if i % (threads) == 0:
        # pool.map(processFile, files)
        parallelProcess(files, o)
        files = []
    i += 1

if files:
    parallelProcess(files, o)
    files = []

这是我的代码,实际的处理发生在这里。

def parallelProcess(files, outfile):
    processes = []
    for i in range(len(files)):
        p = Process(target=processFile, args=(files[i],))
        processes.append(p)
        p.start()
    for i in range(len(files)):
        processes[i].join()

def processFile(doc):
    extractors = {}
    ... do some processing on doc
    o.write(json.dumps(doc) + '\n')

你可以看到,在我发送下一批4个文件进行处理之前,我会等待所有4个行的处理完成。但我想要的是,只要一个进程处理完文件,我就想开始分配下一个可用进程来处理下一行。我该怎么做?

PS:问题在于它是一个生成器,我不能加载所有文件并使用类似于map的东西运行进程。

谢谢你的帮助


6
最简单的方法是使用队列。但是并行化可能不会带来太多收益。如果你想要并行化,因为你的文件是按行结构化的文档,只需将文件分成N份,然后运行N个副本的脚本即可。 - pvg
啊,我觉得在我的情况下将文件分成N个块会有所帮助。我会尝试一下。谢谢! - Muthu Rg
5个回答

34

正如@pvg在评论中所说,(有界)队列是调解生产者和速度不同的消费者之间最自然的方式,确保它们都尽可能保持繁忙,但不让生产者超前。

这里是一个自包含的可执行示例。队列被限制为最大大小等于工作进程数。如果消费者运行速度比生产者快得多,则让队列变得更大可能会很有意义。

针对您的特定情况,将行传递给消费者并让他们并行执行document = json.loads(line)部分可能是有意义的。

import multiprocessing as mp

NCORE = 4

def process(q, iolock):
    from time import sleep
    while True:
        stuff = q.get()
        if stuff is None:
            break
        with iolock:
            print("processing", stuff)
        sleep(stuff)

if __name__ == '__main__':
    q = mp.Queue(maxsize=NCORE)
    iolock = mp.Lock()
    pool = mp.Pool(NCORE, initializer=process, initargs=(q, iolock))
    for stuff in range(20):
        q.put(stuff)  # blocks until q below its max size
        with iolock:
            print("queued", stuff)
    for _ in range(NCORE):  # tell workers we're done
        q.put(None)
    pool.close()
    pool.join()

1
很好的例子,谢谢!有一个问题 - iolock 的目的是什么?它只是用来防止打印混淆吗? - Bruce Edge
是的!只是为了保持输出的稳定,无论平台的时间有多么混乱。当然,更普遍地说,如果需要,可以使用相同的模式来确保在完成工作的性质不同的工人和主进程之间的互斥。 - Tim Peters
很好的回答!我有一个进一步的复杂情况,我需要处理函数的返回结果。为了增加第二个复杂性,我正在处理大量文档,并且需要在恒定的内存使用情况下保持此结果(到达这个最终点之前一直是生成器)。这可能吗? - CpILL

10

所以最终我成功地运行了它。通过从我的文件中创建行块并并行运行这些行。在这里发布它,以便将来有用。

def run_parallel(self, processes=4):
    processes = int(processes)
    pool = mp.Pool(processes)
    try:
        pool = mp.Pool(processes)
        jobs = []
        # run for chunks of files
        for chunkStart,chunkSize in self.chunkify(input_path):
            jobs.append(pool.apply_async(self.process_wrapper,(chunkStart,chunkSize)))
        for job in jobs:
            job.get()
        pool.close()
    except Exception as e:
        print e

def process_wrapper(self, chunkStart, chunkSize):
    with open(self.input_file) as f:
        f.seek(chunkStart)
        lines = f.read(chunkSize).splitlines()
        for line in lines:
            document = json.loads(line)
            self.process_file(document)

# Splitting data into chunks for parallel processing
def chunkify(self, filename, size=1024*1024):
    fileEnd = os.path.getsize(filename)
    with open(filename,'r') as f:
        chunkEnd = f.tell()
        while True:
            chunkStart = chunkEnd
            f.seek(size,1)
            f.readline()
            chunkEnd = f.tell()
            yield chunkStart, chunkEnd - chunkStart
            if chunkEnd > fileEnd:
                break

5

提姆·彼得斯(Tim Peters)的答案 很好。但我的具体情况有些不同,我需要修改他的答案以适应我的需求。参考这里。
这回答了评论中 @CpILL 的问题。


在我的情况下,我使用了一系列生成器(创建流水线)。其中一个生成器正在进行大量计算,拖慢了整个流程。

类似于这样:

def fast_generator1():
    for line in file:
        yield line

def slow_generator(lines):
    for line in lines:
        yield heavy_processing(line)

def fast_generator2():
    for line in lines:
        yield fast_func(line)

if __name__ == "__main__":
    lines = fast_generator1()
    lines = slow_generator(lines)
    lines = fast_generator2(lines)
    for line in lines:
        print(line)

为了加快速度,我们需要使用多个进程执行缓慢的生成器。
修改后的代码如下:
import multiprocessing as mp

NCORE = 4

def fast_generator1():
    for line in file:
        yield line

def slow_generator(lines):
    def gen_to_queue(input_q, lines):
        # This function simply consume our generator and write it to the input queue
        for line in lines:
            input_q.put(line)
        for _ in range(NCORE):    # Once generator is consumed, send end-signal
            input_q.put(None)

    def process(input_q, output_q):
        while True:
            line = input_q.get()
            if line is None:
                output_q.put(None)
                break
            output_q.put(heavy_processing(line))


    input_q = mp.Queue(maxsize=NCORE * 2)
    output_q = mp.Queue(maxsize=NCORE * 2)

    # Here we need 3 groups of worker :
    # * One that will consume the input generator and put it into a queue. It will be `gen_pool`. It's ok to have only 1 process doing this, since this is a very light task
    # * One that do the main processing. It will be `pool`.
    # * One that read the results and yield it back, to keep it as a generator. The main thread will do it.
    gen_pool = mp.Pool(1, initializer=gen_to_queue, initargs=(input_q, lines))
    pool = mp.Pool(NCORE, initializer=process, initargs=(input_q, output_q))

    finished_workers = 0
    while True:
        line = output_q.get()
        if line is None:
            finished_workers += 1
            if finished_workers == NCORE:
                break
        else:
            yield line

def fast_generator2():
    for line in lines:
        yield fast_func(line)

if __name__ == "__main__":
    lines = fast_generator1()
    lines = slow_generator(lines)
    lines = fast_generator2(lines)
    for line in lines:
        print(line)

使用这种实现方式,我们拥有了一个多进程生成器:它的使用方式与其他生成器(如本答案的第一个示例)完全相同,但所有的繁重计算都是使用多进程完成的,从而加速了计算!


你的示例在Linux中运行良好,但在Windows中存在一些问题,因为将gen_to_queue和process函数进行pickle处理时,由于它们不在类的顶层模型中,所以会出现问题。另一方面,在Windows操作系统上无法pickle生成器,我认为这在Windows上是不可能的。 - user1814720
@user1814720 将这两个方法移动到全局命名空间中,它就可以工作了。 - sluki
我有一个非常类似的用例(用于处理内容的功能管道),现在正处于试图鼓励它执行超出单个进程的阶段。对于你来说,这个问题进展如何?它仍然是一个可行的选择吗?我的问题是管道以生成器的形式存在,因此当我走这条路时,我会得到“TypeError:无法pickle生成器对象”的错误。 - Thomas Kimber
@ThomasKimber,你试过sluki提到的解决方案了吗? - Astariul
我没有这样做,但我不确定该如何去做,是否有实际操作的示例?我的生成器是在一个类中定义的,潜在地,我可以将它们包装在模块级别定义的某些内容中,但这只是将它们表面上移动到全局命名空间,因为我不能从当前定义它们的位置实际重新定义基础代码链。 - Thomas Kimber

3

你也可以这样使用更高级别的concurrent.futures模块:

import concurrent.futures as cf
import time
from concurrent.futures import ProcessPoolExecutor

def read(infile):
    with open(infile, "r") as f:
        for line in f:
            yield line.strip()


def process(line):
    # Simulate doing some heavy processing on `line`
    time.sleep(3)
    return line.upper()


def run_parallel(num_workers, lines):
    with ProcessPoolExecutor(max_workers=num_workers) as p:
        futures = {p.submit(process, line) for line in lines}
        for future in cf.as_completed(futures):
            yield future.result()


def write(outfile, lines):
    with open(outfile, "w") as f:
        for line in lines:
            f.write(line + "\n")


NUM_WORKERS = 4

if __name__ == "__main__":
    start = time.time()
    lines = reader("infile.txt")
    lines = run_parallel(NUM_WORKERS, lines)
    write("outfile.txt", lines)
    print(time.time() - start)

输入文件:

a
b
c
d
e
f
g
h
i
j

输出文件:

A
F
B
G
E
D
C
H
I
J

标准输出:

9.016341924667358

似乎无法在Windows下运行,错误为:concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.。它提到了 return _ForkingPickler.loads(res),因此看起来这可能是一个仅适用于Linux的技术。 - Contango

1
晚到了。我有一个类似的问题。生产者和消费者基本上是这样的。就像一些人指出的那样,队列最适合解决这个问题。
您可以创建一个执行器池(线程或进程),并与信号量一起使用,以确保同时选择n个任务。如果生成器提交任何其他任务,则会阻塞,直到信号量计数器减少。
找到一个现成的解决方案。请查看此Gist

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接