如何在Python的多进程池中创建全局锁/信号量?

27

我想限制子进程的资源访问。例如- 限制http下载磁盘io等。如何在扩展此基本代码的同时实现它?

请分享一些基本代码示例。

pool = multiprocessing.Pool(multiprocessing.cpu_count())
while job_queue.is_jobs_for_processing():
  for job in job_queue.pull_jobs_for_processing:
    pool.apply_async(do_job, callback = callback)
pool.close()
pool.join()

你想限制资源访问的方式是使用LockSemaphore吗?有没有理由不直接使用multiprocessing.Lock/multiprocessing.Semaphore - dano
@dano 怎样将 multiprocessing.Lock() 或 Semaphore() 传递给进程池(pool)?有哪些选项可以实现锁的全局共享? - Chameleon
限制资源访问的需求并不意味着进程池需要同步,而是工作任务需要。为什么不详细解释一下你想要实现什么? - Michael Foukarakis
@MichaelFoukarakis 重要的不是为什么,而是如何。我可以回答你为什么,因为随机IO比顺序IO慢 - 我回答了你的问题吗?请看统计数据 - http://goo.gl/TbC2xp。Memcache与磁盘、硬盘和闪存工作方式不同(它经常被称为磁盘,但它不是磁盘)或WWW服务器 - 有些需要信号量,有些则不需要 - 无论如何,我需要学习Python中的全局信号量模式,就像许多其他人一样。 - Chameleon
@MichaelFoukarakis 的 WWW 服务器需要使用信号量来保持礼貌,避免拒绝大量并行请求的站点 - 它不是由设计限制,而是受互联网伦理约束。 - Chameleon
2个回答

35

在创建进程池时,可以使用initializer和initargs参数来定义一个全局变量,以便在所有子进程中都能访问到它。

例如:

from multiprocessing import Pool, Lock
from time import sleep

def do_job(i):
    "The greater i is, the shorter the function waits before returning."
    with lock:
        sleep(1-(i/10.))
        return i

def init_child(lock_):
    global lock
    lock = lock_

def main():
    lock = Lock()
    poolsize = 4
    with Pool(poolsize, initializer=init_child, initargs=(lock,)) as pool:
        results = pool.imap_unordered(do_job, range(poolsize))
        print(list(results))

if __name__ == "__main__":
    main()

这段代码将按升序(作业提交的顺序)打印出0-3之间的数字,因为它使用了锁。注释掉with lock:这一行代码,可以看到它按降序打印数字。

这个解决方案在Windows和Unix上都适用。但是,在Unix系统上,由于进程可以进行fork操作,所以只需要在模块范围内声明全局变量即可。子进程会得到父进程的内存副本,其中包括仍然有效的锁对象。因此,初始化器并不是必需的,但它可以帮助说明代码的预期工作方式。当多进程能够通过fork创建进程时,以下代码也可以正常工作。

from multiprocessing import Pool, Lock
from time import sleep

lock = Lock()

def do_job(i):
    "The greater i is, the shorter the function waits before returning."
    with lock:
        sleep(1-(i/10.))
        return i

def main():
    poolsize = 4
    with Pool(poolsize) as pool:
        results = pool.imap_unordered(do_job, range(poolsize))
        print(list(results))

if __name__ == "__main__":
    main()

我正在学习第二个例子,但是看起来 lock = Lock() 不是全局的,因为没有被主程序传递 - 我错了吗? - Chameleon
如果第一个例子是同样的问题 - 我会测试它 - 代码看起来不错,但我认为在Windows下,子进程将不知道父进程的任何信息。 - Chameleon
在第二个例子中,当池创建一个子进程(在Unix上),整个父进程的内存都会被复制到子进程中(包括锁对象)。由于您正在使用Windows,请不要使用第二个例子。 - Dunes

8

如果您正在访问资源,请使用全局信号量并获取它。例如:

import multiprocessing
from time import sleep

semaphore = multiprocessing.Semaphore(2)

def do_job(id):
    with semaphore:
        sleep(1)
    print("Finished job")

def main():
    pool = multiprocessing.Pool(6)
    for job_id in range(6):
        print("Starting job")
        pool.apply_async(do_job, [job_id])
    pool.close()
    pool.join()

if __name__ == "__main__":
    main()

由于其他线程正在等待信号量,因此该程序每秒仅完成两项任务。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接