Python中的多进程队列

16

我正在尝试在Python中使用多进程库中的队列。执行以下代码后(打印语句有效),但在调用队列上的join后,进程仍未退出并且仍然处于活动状态。如何终止剩余的进程?

谢谢!

def MultiprocessTest(self):
  print "Starting multiprocess."
  print "Number of CPUs",multiprocessing.cpu_count()

  num_procs = 4
  def do_work(message):
    print "work",message ,"completed"

  def worker():
    while True:
      item = q.get()
      do_work(item)
      q.task_done()

  q = multiprocessing.JoinableQueue()
  for i in range(num_procs):
    p = multiprocessing.Process(target=worker)
    p.daemon = True
    p.start()

  source = ['hi','there','how','are','you','doing']
  for item in source:
    q.put(item)
  print "q close"
  q.join()
  #q.close()
  print "Finished everything...."
  print "num active children:",multiprocessing.active_children()
5个回答

10

尝试这个:

import multiprocessing

num_procs = 4
def do_work(message):
  print "work",message ,"completed"

def worker():
  for item in iter( q.get, None ):
    do_work(item)
    q.task_done()
  q.task_done()

q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
  procs.append( multiprocessing.Process(target=worker) )
  procs[-1].daemon = True
  procs[-1].start()

source = ['hi','there','how','are','you','doing']
for item in source:
  q.put(item)

q.join()

for p in procs:
  q.put( None )

q.join()

for p in procs:
  p.join()

print "Finished everything...."
print "num active children:", multiprocessing.active_children()

你在完成后将None放入队列的原因是什么?我认为task_done()可以帮助避免这个问题?我试图按照这个页面底部的示例来编写我的代码:http://docs.python.org/library/queue.html - aerain
不对解决方案进行评级,但提示如何运行它:将“q =”声明行移动到 def worker() 的第一次使用之前即可。;-) - Dilettant
@aerain - 但它确实有效...我将None放入队列中是有原因的。for item in iter(q.get, None): 这一行非常关键。它告诉循环在从队列获取None值后退出。这就是使实际进程退出的原因。q.join等待所有task_done调用。p.join等待进程终止,只有在工作进程中断(或者您在进程上调用terminate,但这不太理想)时才会发生。 - underrun
2
@Dilettant - 不,那并不会真正改变什么。当进程被创建时,q 在全局命名空间中可用,因此在调用它时,工作进程实际上会拥有 q 的副本。最好在调用 multiprocessing.Process 时指定 args=(q,),因为这样我们明确地共享了该项——这是一个很好的习惯,可以避免意外共享你不应或无法共享的内容。 - underrun
我将上面的代码复制并粘贴到一个文件中,然后运行它。当我在中间部分按下q.join()时,进程会无限循环地生成,导致我的机器死机。我没有看到任何代码表明会发生这种情况,但是我已经尝试了两次。这是使用Python 2.7.1版本。 - Doo Dah

6
你的工作人员需要一个哨兵来终止,否则他们将一直处于阻塞读取状态。请注意,在 Q 上使用 sleep 而不是在 P 上使用 join,可以让你显示状态信息等。
我推荐的模板是:

def worker(q,nameStr):
  print 'Worker %s started' %nameStr
  while True:
     item = q.get()
     if item is None: # detect sentinel
       break
     print '%s processed %s' % (nameStr,item) # do something useful
     q.task_done()
  print 'Worker %s Finished' % nameStr
  q.task_done()

q = multiprocessing.JoinableQueue()
procs = []
for i in range(num_procs):
  nameStr = 'Worker_'+str(i)
  p = multiprocessing.Process(target=worker, args=(q,nameStr))
  p.daemon = True
  p.start()
  procs.append(p)

source = ['hi','there','how','are','you','doing']
for item in source:
  q.put(item)

for i in range(num_procs):
  q.put(None) # send termination sentinel, one for each process

while not q.empty(): # wait for processing to finish
  sleep(1)   # manage timeouts and status updates etc.

2
当使用 while not q.empty() 判断处理是否完成时,并不是一种可靠的方式,只有当工作线程抓住最后一个要完成的任务时才能确定。坦率地说,如果你不正确地使用 JoinableQueue,就不需要一个 JoinableQueue。如果你选择不使用它,你就不需要让工作线程标记 task_done。使用这样的队列的目的是为了让你可以加入它,这正是你想在程序结束时做的,而不是等待队列变空。 - leetNightshade
是的,使用这种方法,工作会提前结束。 - Forethinker

5
这是一个针对相对简单情况的无哨兵方法,您可以将多个任务放入JoinableQueue中,然后启动消费任务的工作进程,它们在读取完队列内容后就会退出。这个技巧是使用JoinableQueue.get_nowait()而不是get()。正如其名称,get_nowait()以非阻塞方式尝试从队列中获取值,如果没有可获取的内容,则会引发queue.Empty异常。工作者通过处理此异常来退出。
以下是示范该原理的基本代码:
import multiprocessing as mp
from queue import Empty

def worker(q):
  while True:
    try:
      work = q.get_nowait()
      # ... do something with `work`
      q.task_done()
    except Empty:
      break # completely done

# main
worknum = 4
jq = mp.JoinableQueue()

# fill up the task queue
# let's assume `tasks` contains some sort of data
# that your workers know how to process
for task in tasks:
  jq.put(task)

procs = [ mp.Process(target=worker, args=(jq,)) for _ in range(worknum) ]
for p in procs:
  p.start()

for p in procs:
  p.join()

优点是你不需要将“毒丸”放在队列中,所以代码会更短。 重要提示:在更复杂的情况下,生产者和消费者在“交错”方式下使用同一队列,并且工作人员可能需要等待新任务出现,“毒丸”方法应该被使用。我上面的建议适用于简单的情况,在这种情况下,工作人员“知道”如果任务队列为空,则没有必要再挂起等待。

3

在加入进程之前,您必须清空队列,但是q.empty()不可靠。

清空队列的最好方法是计算成功获取的数量或循环直到收到哨兵值,就像具有可靠网络的套接字一样。


1
下面的代码可能与主题不太相关,但我发布它是为了听取您的意见/反馈,以便我们一起学习。谢谢!
import multiprocessing

def boss(q,nameStr):
  source = range(1024)
  for item in source:
    q.put(nameStr+' '+str(item))
  q.put(None) # send termination sentinel, one for each process

def worker(q,nameStr):
  while True:
     item = q.get()
     if item is None: # detect sentinel
       break
     print '%s processed %s' % (nameStr,item) # do something useful

q = multiprocessing.Queue()

procs = []

num_procs = 4
for i in range(num_procs):
  nameStr = 'ID_'+str(i)
  p = multiprocessing.Process(target=worker, args=(q,nameStr))
  procs.append(p)
  p = multiprocessing.Process(target=boss,   args=(q,nameStr))
  procs.append(p)

for j in procs:
  j.start()
for j in procs:
  j.join()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接