Python进程之间能否共享set()?

6
我正在使用Python 2.7中的多进程来处理一个非常大的数据集。每个进程运行时都会将整数添加到共享的mp.Manager.Queue(),但只有在其他进程尚未添加相同整数时才会添加。由于您无法对队列进行“in”样式的成员资格测试,所以我采用的方法是检查每个整数是否属于共享的mp.Manager.list()。该列表最终将有约3000万个条目,因此成员资格测试将非常缓慢,抵消了多进程的优势。
这是我正在做的事情的一个简化版本:
import multiprocessing as mp

def worker(shared_list, out_q, lock):
    # Do some processing and get an integer
    result_int = some_other_code()

    # Use a lock to ensure nothing is added to the list in the meantime
    lock.acquire()
    # This lookup can take forever when the list is large
    if result_int not in shared_list:
        out_q.put(result_int)
        shared_list.append(result_int)
    lock.release()

manager = mp.Manager()
shared_list = manager.list()
lock = manager.lock()
out_q = manager.Queue()

for i in range(8):
   p = mp.Process(target=worker, args=(shared_list, out_q, lock))
   p.start()

我之前尝试使用set()代替mp.Manager.list(),但似乎每个进程都有自己的内存空间,因此当我更新set时,它不会在进程之间同步。因此,我改用了当前的方法。

以下是我之前尝试使用set()的大致方式: import multiprocessing as mp

def worker(shared_set, out_q, lock):
    # Do some processing and get an integer
    result_int = some_other_code()

    # Use a lock to ensure nothing is added to the set in the meantime
    lock.acquire()
    # This lookup is fast, but the set doesn't reflect additions made by other processes.
    if result_int not in shared_set:
        out_q.put(result_int)
        shared_set.add(result_int)
    lock.release()

manager = mp.Manager()
lock = manager.lock()
out_q = manager.Queue()

# This set will NOT synchronize between processes
shared_set = set()


for i in range(8):
   p = mp.Process(target=worker, args=(shared_set, out_q, lock))
   p.start()

注意:这些示例未经测试,只是表示我代码中相关部分的内容。
有没有一种方法可以在进程间共享集合,或以其他方式进行更快的成员身份查找?
编辑: 更多信息:out_q由另一个进程消耗,该进程将数据写入单个输出文件。不能有重复项。如果我生成一个整数并发现它是重复的,则需要返回并生成下一个最佳整数。
1个回答

12

一个显而易见的改进是使用mp.Manager.dict()代替集合,然后使用任意值(例如,将the_dict[result_int] = 1设置为表示集合成员)。顺便说一句,在Python添加set类型之前,“每个人”都是这样实现集合的,而且现在字典和集合在内部基本上使用相同的代码实现。

后来补充道:我承认我不理解为什么在原始代码中同时使用了集合和列表,因为集合的键与列表的内容相同。如果进入顺序不重要,为什么不完全放弃列表?然后,您还可以删除原始列表中需要保持集合和列表同步的锁定层。

具体实现,使用字典建议后,整个函数将变得如下:

def worker(shared_dict):
    # Do some processing and get an integer
    result_int = some_other_code()
    shared_dict[result_int] = 1

其他进程可以使用shared_dict.pop()获取一个值(尽管它们无法像等待队列的.get()那样等待.pop())。

还有一个问题:考虑使用本地(进程本地)集合?它们将运行得更快。然后,每个工作进程将不会添加任何已知的重复项,但是可能会在进程之间存在重复项。您的代码没有给出关于out_q消费者的任何提示,但如果只有一个消费者,则该消费者中的本地集合也可以消除跨进程的重复项。或者内存负担过高了?从这里无法猜测;-)

重大修改

我要建议一种不同的方法:根本不要使用mp.Manager。大多数时候,我看到人们使用它后都感到后悔,因为它并没有做他们认为它正在做的事情。他们所认为的:它提供物理共享对象。它做的事情是:它提供语义上共享的对象。物理上,它们位于“另一个”处,在幕后的进程中,对象的操作被转发到该进程,该进程在其自己的地址空间中执行这些操作。它根本不是物理上共享的。因此,虽然它非常方便,但即使是最简单的操作也会产生实质性的进程间开销。

因此,我建议改为在一个进程中使用单个普通集合,该进程将是唯一关注消除重复项的代码。工作进程以不考虑重复项的方式生成整数-它们只是传递整数。对于此任务,mp.Queue很好用(再次强调:真的没有必要用mp.Manager.Queue)。

像这样,这是一个完整的可执行程序:

N = 20

def worker(outq):
    from random import randrange
    from time import sleep
    while True:    
        i = randrange(N)
        outq.put(i)
        sleep(0.1)

def uniqueifier(inq, outq):
    seen = set()
    while True:
        i = inq.get()
        if i not in seen:
            seen.add(i)
            outq.put(i)

def consumer(inq):
    for _ in range(N):
        i = inq.get()
        print(i)

if __name__ == "__main__":
    import multiprocessing as mp
    q1 = mp.Queue()
    q2 = mp.Queue()
    consume = mp.Process(target=consumer, args=(q2,))
    consume.start()
    procs = [mp.Process(target=uniqueifier, args=(q1, q2))]
    for _ in range(4):
        procs.append(mp.Process(target=worker, args=(q1,)))
    for p in procs:
        p.start()
    consume.join()
    for p in procs:
        p.terminate()

传递给uniqueifier的第二个队列发挥着原始队列的作用:它仅提供唯一的整数。没有尝试“共享内存”,因此没有因此产生的任何成本。唯一的进程间通信是通过简单明了的mp.Queue操作进行的。由于只有一个集合,并且它没有以任何方式共享,因此它运行得尽可能快。

实际上,这只是设置了一个简单的管道,但具有多个输入。


2
如果其他进程正在同时从队列中获取数据,那么需要使用单独的队列和字典来确保不会出现整数被添加、删除和再次添加的情况。你仍然可以通过类似 shared_dict.setdefault(result_int, (process_identifier, increasing_counter)) 的方式避免锁定,以原子方式将 result_int 添加到字典中,并确定它是否已经被添加,尽管这有点麻烦。 - user2357112
在没有任何关于out_q消费者的提示的情况下,我根本无法猜测 - 但是,是的,那听起来最有可能。 - Tim Peters
@TimPeters 我在原始代码中实际上没有同时使用set和list。那些只是同一代码的两个不同版本。你的第二个解决方案非常有道理,我会尝试一下。感谢帮助我理解mp正在发生的事情! - AJSmyth

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接