如何在Python多进程中共享数据?

3
我希望能够在给定的文章中搜索预定义的关键词列表,并在文章中找到关键词时将分数增加1。由于预定义的关键词列表非常大(10k个关键词),并且文章数量为100k,因此我希望使用多进程。
我看到了this这个问题,但它没有解决我的问题。
我尝试了这个实现,但结果是None
keywords = ["threading", "package", "parallelize"]

def search_worker(keyword):
    score = 0
    article = """
    The multiprocessing package also includes some APIs that are not in the threading module at all. For example, there is a neat Pool class that you can use to parallelize executing a function across multiple inputs."""

   if keyword in article:
        score += 1
    return score

我会翻译成中文:

我尝试了以下两种方法,但结果得到了三个None

方法1:

 pool = mp.Pool(processes=4)
 result = [pool.apply(search_worker, args=(keyword,)) for keyword in keywords]

方法2:

result = pool.map(search_worker, keywords)
print(result)

实际输出:[None, None, None]

期望输出:3

我考虑将预定义的关键词列表和文章一起发送给工人,但我不确定是否朝着正确的方向前进,因为我没有多进程的先前经验。

提前致谢。


为什么不使用ElasticSearch作为您的搜索引擎? - Frank AK
我不确定如何使用ElasticSearch来实现这个功能。我想针对一组关键字计算每篇文章的置信度分数,并将文章与置信度分数进行索引。 - Om Prakash
ElasticSearch很容易做到这一点!你真的应该试一试。 - Frank AK
你的情况有不同的解决方案。首先,你可以使用共享内存,例如数据库。Redis非常简单且运行非常稳定。根据你的规模计划和预期复杂度,可以采用一些Map-Reduce技术。 - Andre Pastore
2
当我运行你的代码(python3.5)时,它基本上可以正常工作(我得到了[1, 1, 1],你只需要一个全局计数或对结果求和)。你记得使用if __name__ == '__main__'来运行method 1method 2吗? - e.s.
显示剩余2条评论
2个回答

1
这是一个使用Pool的函数,你可以传递文本和关键词列表,它会起作用。你可以使用Pool.starmap来传递元组(text, keyword),但你需要处理一个包含10k个对text的引用的可迭代对象。
from functools import partial
from multiprocessing import Pool

def search_worker(text, keyword):
    return int(keyword in text)

def parallel_search_text(text, keyword_list):
    processes = 4
    chunk_size = 10
    total = 0
    func = partial(search_worker, text)
    with Pool(processes=processes) as pool:
        for result in pool.imap_unordered(func, keyword_list, chunksize=chunk_size):
            total += result

    return total

if __name__ == '__main__':
    texts = []  # a list of texts
    keywords = []  # a list of keywords
    for text in texts:
        print(parallel_search_text(text, keywords))

创建工作进程池会带来一定的开销。因此,可以通过进行 timeit 测试,将其与简单的单进程文本搜索函数进行比较,以确定是否值得使用。通过创建一个 Pool 实例并将其传递到函数中,可以加快重复调用的速度。

def parallel_search_text2(text, keyword_list, pool):
    chunk_size = 10
    results = 0
    func = partial(search_worker, text)

    for result in pool.imap_unordered(func, keyword_list, chunksize=chunk_size):
        results += result
    return results

if __name__ == '__main__':
    pool = Pool(processes=4)
    texts = []  # a list of texts
    keywords = []  # a list of keywords
    for text in texts:
        print(parallel_search_text2(text, keywords, pool))

0

用户e.s在他的评论中解决了主要问题,但我会发布一个解决方案来回应Om Prakash的评论请求:

将文章和预定义关键字列表都传递给工作方法

这里有一个简单的方法来实现。你只需要构建一个包含你想让工作程序处理的参数的元组:

from multiprocessing import Pool

def search_worker(article_and_keyword):
    # unpack the tuple
    article, keyword = article_and_keyword

    # count occurrences
    score = 0
    if keyword in article:
        score += 1

    return score

if __name__ == "__main__":
    # the article and the keywords
    article = """The multiprocessing package also includes some APIs that are not in the threading module at all. For example, there is a neat Pool class that you can use to parallelize executing a function across multiple inputs."""
    keywords = ["threading", "package", "parallelize"]

    # construct the arguments for the search_worker; one keyword per worker but same article
    args = [(article, keyword) for keyword in keywords]

    # construct the pool and map to the workers
    with Pool(3) as pool:
        result = pool.map(search_worker, args)
    print(result)

如果你使用的是较新版本的Python,我建议尝试使用starmap,这样会使代码更加简洁。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接