Python 实现简单的并发处理

3

问题的目的: 了解在Python中实现并发的方式/进行实验。

背景: 我想统计匹配特定模式的所有文件中的所有单词数。我的想法是,我可以调用函数 count_words('/foo/bar/*.txt') ,并且所有单词(即由一个或多个空格字符分隔的字符串)将被计算。

在实现中,我正在寻找使用并发实现count_words的方法。到目前为止,我已经成功地使用了multiprocessingasyncio

您是否看到完成相同任务的其他替代方法?

由于Python GIL的限制,我没有使用threading,因为我注意到性能提升并不那么显著。

import asyncio
import multiprocessing
import time
from pathlib import Path
from pprint import pprint


def count_words(file):
    with open(file) as f:
        return sum(len(line.split()) for line in f)


async def count_words_for_file(file):
    with open(file) as f:
        return sum(len(line.split()) for line in f)


def async_count_words(path, glob_pattern):
    event_loop = asyncio.get_event_loop()
    try:
        print("Entering event loop")
        for file in list(path.glob(glob_pattern)):
            result = event_loop.run_until_complete(count_words_for_file(file))
            print(result)
    finally:
        event_loop.close()


def multiprocess_count_words(path, glob_pattern):
    with multiprocessing.Pool(processes=8) as pool:
        results = pool.map(count_words, list(path.glob(glob_pattern)))
        pprint(results)


def sequential_count_words(path, glob_pattern):
    for file in list(path.glob(glob_pattern)):
        print(count_words(file))


if __name__ == '__main__':
    benchmark = []
    path = Path("../data/gutenberg/")
    # no need for benchmark on sequential_count_words, it is very slow!
    # sequential_count_words(path, "*.txt")

    start = time.time()
    async_count_words(path, "*.txt")
    benchmark.append(("async version", time.time() - start))

    start = time.time()
    multiprocess_count_words(path, "*.txt")
    benchmark.append(("multiprocess version", time.time() - start))

    print(*benchmark)

为了模拟大量文件,我从古腾堡计划(http://gutenberg.org/)下载了一些书籍,并使用以下命令创建了几个相同文件的副本。
for i in {000..99}; do cp 56943-0.txt $(openssl rand -base64 12)-$i.txt; done
1个回答

0

async def 并不能神奇地使函数调用并发,在 asyncio 中,您需要显式地放弃执行权以允许其他协程通过在可等待对象上使用 await 并发运行。也就是说,您当前的 count_words_for_file 仍然是按顺序执行的。

您可能希望引入 aiofiles 将阻塞文件 I/O 推迟到线程中,从而允许不同协程中的并发文件 I/O。即使如此,计算单词数的 CPU 绑定代码片段仍然在同一主线程中按顺序运行。要并行化它,您仍需要多个进程和多个 CPU(或多台计算机,请查看 Celery)。

此外,您的asyncio代码存在一个问题 - for ... run_until_complete 会再次使函数调用按顺序运行。您需要使用 loop.create_task() 来同时启动它们,并使用 aysncio.wait() 来合并结果。
import aiofiles

...

async def count_words_for_file(file):
    async with aiofiles.open(file) as f:
        rv = sum(len(line.split()) async for line in f)
        print(rv)
        return rv


async def async_count_words(path, glob_pattern):
    await asyncio.wait([count_words_for_file(file)
                        for file in list(path.glob(glob_pattern))])
    # asyncio.wait() calls loop.create_task() for you for each coroutine

...

if __name__ == '__main__':

    ...

    loop = asyncio.get_event_loop()
    start = time.time()
    loop.run_until_complete(async_count_words(path, "*.txt"))
    benchmark.append(("async version", time.time() - start))

谢谢您的评论 - 我不知道aiofiles。我可以请您在答案中添加一些相关的更改代码吗? 此外,我以为event_loop.run_until_complete同时运行函数。当运行代码时,与纯顺序方法相比,我明显看到了差异。谢谢。 - Michael
已更新。我还没有尝试过,但你的时间差异真的很奇怪。 - Fantix King

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接