Python多进程队列实现

5

我对如何将队列(queue)实现到下面的多进程示例中感到困惑。基本上,我希望代码能够:

1)生成2个进程(已完成)

2)将我的id_list分成两个部分(已完成)

3)让每个进程迭代列表并打印出每个项目,仅在完成列表后关闭。 我知道我必须实现某种类型的队列系统,并将其传递给每个工作者,但我不确定如何做。 任何帮助都将不胜感激。

from multiprocessing import Pool,Queue
id_list = [1,2,3,4,5,6,7,8,9,10]

def mp_worker(record):
    try:  
        print record
        sleep(1)
    except: pass
    print "worker closed"

def mp_handler():
    p = Pool(processes = 2) #number of processes
    p.map(mp_worker, id_list)  #devides id_list between 2 processes, defined above
    p.close()
    p.join()

mp_handler()

注意-该代码会打印出"worker closed"10次。我希望这个语句只打印两次(每个worker打印id_list中的5个数字后,各一次)。

只是出于好奇,您需要自己的任务队列/分块实现,而不是pool.map已经为您提供的吗? - a spaghetto
代码是一个经过精简的爬虫(worker)版本,使用了Selenium WebDriver。当我使用pool.map时,我无法弄清如何将队列中的项目分配给已经打开的WebDriver(每个worker)。队列项将被分配给WebDriver,WebDriver将打开,处理队列项,关闭...然后worker将再次启动 - 这会降低我的性能,因为我必须为每个单独的队列项启动WebDriver。通过下面的解决方案,我可以为每个worker仅启动一次WebDriver,然后将队列项传递给它。如果有更好的方法,请告诉我。 - FlyingZebra1
看我的回答 - 工作进程的持续时间与池相同。如果工作进程正在重新启动,则是因为您正在关闭并重新创建池。 - a spaghetto
如果问题是保持Web驱动程序进程处于打开状态,您可以通过将init移出函数本身并移到模块的顶层来解决。然后它将为每个池工作进程运行一次。 - a spaghetto
问题在于将队列中的项目一个接一个地传递给每个工作人员的webdriver,而它是开放的。下面的解决方案正好实现了我所期望的。已测试并按预期工作。 - FlyingZebra1
4个回答

8

对我来说这很有效(在Python 3上)。我没有使用Pool,而是自己生成了两个进程:

from multiprocessing import Process, Queue
from time import sleep


id_list = [1,2,3,4,5,6,7,8,9,10]

queue = Queue()

def mp_worker(queue):

    while queue.qsize() >0 :
        record = queue.get()
        print(record)
        sleep(1)

    print("worker closed")

def mp_handler():

    # Spawn two processes, assigning the method to be executed 
    # and the input arguments (the queue)
    processes = [Process(target=mp_worker, args=(queue,)) for _ in range(2)]

    for process in processes:
        process.start()
        print('Process started')

    for process in processes:
        process.join()



if __name__ == '__main__':

    for id in id_list:
        queue.put(id)

    mp_handler()

虽然要处理的元素长度是硬编码的。但它可以作为mp_worker方法的第二个输入参数。


谢谢,这正是我想要的。我用 while queue.qsize() >0 : 替换了第一个 for _ in range(5): ,以消除一些硬编码。 - FlyingZebra1
如果你确保在调用工作进程之前填充队列,那肯定会起作用!否则,进程可能会发现队列为空并退出循环,而你想让它们获得一些额外的输入来进行计算。 - rickyalbert

1
你那里的打印语句有误导性——工作进程在函数结束时不会终止。实际上,工作进程会一直保持活动状态,直到池关闭。此外,multiprocessing 已经负责将列表分成块并为每个任务排队。
至于你的另一个问题,通常情况下,如果你想在整个列表完成后触发异步事件,你会向 map_async 传递回调函数。对每个块进行一次调用需要一些内部操作,但如果你真的想这样做,你可以:
def mapstar_custom(args):
    result = list(map(*args))
    print "Task completed"
    return result
...

pool._map_async(f, x, mapstar_custom, None, None, None).get()

编辑:我们似乎混淆了术语。当我说“worker”时,我指的是池生成的进程,而你似乎指的是Selenium从这些进程中生成的进程(这不在你的问题中)。只需一次打开webdriver就足够简单:如果您有pool.map(module.task, ...),那么在module.py中只需执行以下操作:
# ... selenium init here ...

def task(...):
    # ... use webdriver ...

该模块只会被池工作进程导入一次,无论你分派该任务多少次。因此,顶层初始化仅会发生一次。

1

使用池和队列的一种解决此问题的方法是


    from time import sleep
    from multiprocessing import Pool,Queue
    id_list = [1,2,3,4,5,6,7,8,9,10]

    def mp_worker(q):
        try:  
            print(q.get())
            sleep(.1)
        except: pass
        print ("worker closed")

    if __name__ == "__main__":
        q = Queue()
        p = Pool(processes = 2) #number of processes
        for x in id_list:
            q.put(x)
        p.map(mp_worker, id_list)  #devides id_list between 2 processes, defined above


在您的代码主要部分中使用put将值添加到队列中,在函数中使用get从队列中读取值。


如果我删除try-except块,执行将失败并显示AttributeError: 'int' object has no attribute 'get'。因此,q.get()是无法访问的。q.get从队列中获取进程,所以这是正确的行为吗? - Jane
我刚刚纠正了问题的代码,现在它通过队列工作。你的问题的答案是,mp_worker()函数可以接受任何值,如果输入的不是队列变量,它就不会将其视为队列变量。 - PyMatFlow

0

由于这是“Python多进程队列实现”的谷歌搜索结果,我将发布一个稍微更为概括的示例。

请考虑以下脚本:

import time
import math
import pprint

def main():
    print('\n' + 'starting . . .' + '\n')

    startTime = time.time()
    my_list = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
    result_list = []

    for num in my_list:
        result_list.append(squareNum(num))
    # end for

    elapsedTime = time.time() - startTime

    print('result_list: ')
    pprint.pprint(result_list)
    print('\n' + 'program took ' + '{:.2f}'.format(elapsedTime) + ' seconds' + '\n')
# end function

def squareNum(num: float) -> float:
    time.sleep(1.0)
    return math.pow(num, 2)
# end function

if __name__ == '__main__':
    main()

这个脚本声明了10个浮点数,对它们进行平方运算(每次平方都会睡眠1秒钟以模拟一些重要的过程),然后将结果收集到一个新列表中。这需要大约10秒钟才能运行完毕。
下面是使用Multiprocessing Process和Queue进行重构的版本:
from multiprocessing import Process, Queue
import time
import math
from typing import List
import pprint


def main():
    print('\n' + 'starting . . .' + '\n')

    startTime = time.time()
    my_list = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0]
    result_list = []

    multiProcQueue = Queue()

    processes: List[Process] = []
    for num in my_list:
        processes.append(Process(target=squareNum, args=(num, multiProcQueue,)))
    # end for

    for process in processes:
        process.start()
    # end for

    for process in processes:
        process.join()
    # end for

    while not multiProcQueue.empty():
        result_list.append(multiProcQueue.get())
    # end for

    elapsedTime = time.time() - startTime

    print('result_list: ')
    pprint.pprint(result_list)
    print('\n' + 'program took ' + '{:.2f}'.format(elapsedTime) + ' seconds' + '\n')
# end function

def squareNum(num: float, multiProcQueue: Queue) -> None:
    time.sleep(1.0)
    result = math.pow(num, 2)
    multiProcQueue.put(result)
# end function

if __name__ == '__main__':
    main()

这个脚本大约在1秒内运行。据我所知,这是同时让多个进程并行地写入相同数据结构的最干净的方法。我希望https://docs.python.org/3/library/multiprocessing.html 文档中有像这样的示例。

注意结果列表的顺序通常不会与输入列表的顺序匹配,如果必须维护顺序,则需要使用不同的方法。


在我看来,被接受的答案易于理解、简短且自包含 - 正是第一次设置此项的人所需要的。 - FlyingZebra1

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接