这是我正在做的事情的一个简化版本:
import multiprocessing as mp
def worker(shared_list, out_q, lock):
# Do some processing and get an integer
result_int = some_other_code()
# Use a lock to ensure nothing is added to the list in the meantime
lock.acquire()
# This lookup can take forever when the list is large
if result_int not in shared_list:
out_q.put(result_int)
shared_list.append(result_int)
lock.release()
manager = mp.Manager()
shared_list = manager.list()
lock = manager.lock()
out_q = manager.Queue()
for i in range(8):
p = mp.Process(target=worker, args=(shared_list, out_q, lock))
p.start()
我之前尝试使用set()代替mp.Manager.list(),但似乎每个进程都有自己的内存空间,因此当我更新set时,它不会在进程之间同步。因此,我改用了当前的方法。
以下是我之前尝试使用set()的大致方式: import multiprocessing as mp
def worker(shared_set, out_q, lock):
# Do some processing and get an integer
result_int = some_other_code()
# Use a lock to ensure nothing is added to the set in the meantime
lock.acquire()
# This lookup is fast, but the set doesn't reflect additions made by other processes.
if result_int not in shared_set:
out_q.put(result_int)
shared_set.add(result_int)
lock.release()
manager = mp.Manager()
lock = manager.lock()
out_q = manager.Queue()
# This set will NOT synchronize between processes
shared_set = set()
for i in range(8):
p = mp.Process(target=worker, args=(shared_set, out_q, lock))
p.start()
注意:这些示例未经测试,只是表示我代码中相关部分的内容。
有没有一种方法可以在进程间共享集合,或以其他方式进行更快的成员身份查找?
编辑: 更多信息:out_q由另一个进程消耗,该进程将数据写入单个输出文件。不能有重复项。如果我生成一个整数并发现它是重复的,则需要返回并生成下一个最佳整数。
shared_dict.setdefault(result_int, (process_identifier, increasing_counter))
的方式避免锁定,以原子方式将result_int
添加到字典中,并确定它是否已经被添加,尽管这有点麻烦。 - user2357112out_q
消费者的提示的情况下,我根本无法猜测 - 但是,是的,那听起来最有可能。 - Tim Peters