Python - 队列、管理器和多进程出现奇怪行为

3
在我的一个宠物项目中,我使用了multiprocessing模块进行同时下载。在使用multiprocessing.Manager()对象生成的Queue对象时,我遇到了一种奇怪的行为。
根据我将通过Manager生成的Queue对象放置在另一个Queue对象(也是通过Manager生成)内的方式,我会得到不同的行为,但在我看来,这两种方式本质上是相同的。以下是一个最小化的示例:
import multiprocessing
import Queue

def work(inbound_queue, keep_going):
    while keep_going.value == 1:
        try:
            outbound_queue = inbound_queue.get(False) # this fails in case 3
            #do some work
            outbound_queue.put("work done!")
        except Queue.Empty:
            pass #this is busy wait of course, it's just an example

class Weird:
    def __init__(self):
        self.manager = multiprocessing.Manager()
        self.queue = self.manager.Queue()
        self.keep_going = multiprocessing.Value("i", 1)
        self.worker = multiprocessing.Process(target = work, args = (self.queue, self.keep_going))
        self.worker.start()
    def stop(self): #close and join the second process
        self.keep_going.value = 0
        self.worker.join()
    def queueFromOutside(self, q):
        self.queue.put(q)
        return q
    def queueFromNewManager(self):
        q = multiprocessing.Manager().Queue()
        self.queue.put(q)
        return q
    def queueFromOwnManager(self):
        q = self.manager.Queue()
        self.queue.put(q)
        return q

if __name__ == '__main__':
    instance = Weird()
    # CASE 1
    queue = multiprocessing.Manager().Queue()
    q1 = instance.queueFromOutside(queue) # Works fine
    print "1: ", q1.get()

    # CASE 2
    q2 = instance.queueFromNewManager()   # Works fine
    print "2: ", q2.get()

    # CASE 3
    q3 = instance.queueFromOwnManager()   # Error
    print "3: ", q3.get()

    instance.stop() #sadly never called :(

并输出(Python 2.7.10 x86,Windows)。

主要输出:

1:  work done!
2:  work done!
3:

如果工作进程崩溃,则会导致q3.get()挂起。

工作进程的输出:

Process Process-2:
Traceback (most recent call last):
  File "C:\Python27\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "C:\Python27\lib\multiprocessing\process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "J:\Dropbox\Python\queues2.py", line 7, in work
    outbound_queue = inbound_queue.get(False) # this fails in case 3
  File "<string>", line 2, in get
  File "C:\Python27\lib\multiprocessing\managers.py", line 774, in _callmethod
    raise convert_to_error(kind, result)
RemoteError:
---------------------------------------------------------------------------
Unserializable message: ('#RETURN', <Queue.Queue instance at 0x025A22B0>)
---------------------------------------------------------------------------

那么问题是:为什么第三种情况会导致RemoteError?提供的示例与实际项目中的代码结构不太相似,但如果我使用方法1和2发送队列到运行中的进程,则可以正常工作。使用方法3会很好,因为它可以节省每次获取Manager的麻烦,这可能需要很长时间(在我现在使用的机器上大约100毫秒)。这个问题出于好奇,因为我仍在学习模块中所有酷炫的东西。
更新,试图澄清问题:在第3种情况下(queueFromOwnManager),为什么self.manager.Queue()创建的队列一旦放入self.queue中,就不能使用self.queue.get()检索,而使用multiprocessing.Manager().Queue()创建的队列可以检索?执行3种情况的顺序无关紧要。理想情况下,在3个示例中的任何一个方法调用之前和之后,instance.queue都将为空。
更新2:使示例更类似于我实际在代码中所做的事情

在主程序中,从q1、q2、q3的命名方式改为从q0、q1、q2开始,例如queueFromNewManager()。 - user1749431
1个回答

0

更新的答案,我已经在主函数中添加了代码,用于从列表ls中填充和打印放置在队列self.queue中的项目。我还添加了一条语句,可以在外部函数中用于检索来自ls的self.queue中的项目。

import multiprocessing
import Queue

def work(inbound_queue, keep_going):
    while keep_going.value == 1:
        try:
            pass
            #outbound_queue = inbound_queue.get(False) # this fails in case 3 #<--- an error here, wherefrom does it get truthvalues?
            #do some work
            #outbound_queue.put("work done!")
        except: #Queue.Empty:                            <--- self.queue.Empty(): when instantiated below
            pass #this is busy wait of course, it's just an example

class Weird:
    def __init__(self):
        self.manager = multiprocessing.Manager()
        self.queue = self.manager.Queue()
        self.queue2 = multiprocessing.Manager().Queue()
        self.keep_going = multiprocessing.Value("i", 1)
        self.worker = multiprocessing.Process(target = work, args = (self.queue, self.keep_going))
        self.worker.start()
    def stop(self): #close and join the second process
        self.keep_going.value = 0
        #self.worker.join()
    def queueFromOutside(self, q):

        ls = [1,2,3,4,5]
        # populate self.queue with elements from list ls
        for i in ls:
          self.queue.put(i)
        return self.queue
    def queueFromNewManager(self):
         #q = multiprocessing.Manager().Queue()  <---- note that you state that manager and not "queue"
                                                      # is the name of the queue in this step,
                                                      # therefor errormsg, at this step manager
                                                     # is empty and self.queue has been given ls


        ls = [5,6,7,8]
        # populate self.queue with elements from list ls
        for i in ls:

          self.queue.put(i)
        return self.queue


    def queueFromOwnManager(self):
        q = self.manager.Queue()


        ls = [5,6,7,8]
        # populate self.queue with elements from list ls
        for i in ls:
          self.queue.put(i)
        return self.queue

    def wait_completion(self):     #<---- function that waits until tasks are done, and joins all
                                            # tasks as a last step, check docs how to add tasks and data to manager
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    instance = Weird()
    # CASE 1

    q1 = instance.queueFromOutside(instance.queue2) # Works fine
    print "1: ", q1.get()

    #this code gets data from instance.queue in external functions
    if not instance.queue.empty():

        item = instance.queue.get(True)
        print item,"item"

    # CASE 2
    q2 = instance.queueFromNewManager()   # Works fine
    print "2: ", q2.get()

    # CASE 3
    q3 = instance.queueFromOwnManager()   # Error
    print "3: ", q3.get()

    instance.stop() #sadly never called :(

    # when all tasks done  wait_completion(self) can be printed here to join all tasks

看起来我用了一种奇怪的方式提出问题,因为这并没有真正回答它,看看更新后的问题 :) - ytsejam_sih
好的,根据我理解你的第一个问题是关于那个错误的,但无论如何,“cannot be retrieved with self.queue.get()”,你可以通过在主类中循环对象队列来检索self.queue.get()。没有具体的例子很难回答,但是你是否编写了join()函数或者你在哪里加入了任务? - user1749431
将示例稍微修改以反映我在项目中所做的内容。问题在于,如果使用保存为类“Weird”的属性的管理器创建队列,则工作进程无法检索队列,但我想不出任何解释这种行为的原因。 - ytsejam_sih
我已更新答案和描述,包括可用于从self.queue检索数据的代码。 - user1749431
忘记了导入;导入队列 - user1749431
针对第三种情况的解答是:请注意,使用for循环填充队列,因此queue(put)无法工作并填充队列。multiprocessing.Manager().Queue()是另一种类型的队列,其工作方式与self.queue不同,它是一个常规的Queue类实例。像这样填充管理器;ls = manager.list([1,[1],[1]])或di = manager.dict({0: 1, 1: [1], 2:[1]}) print 'before', ns, ls, di p = multiprocessing.Process(target=f, args=(ns, ls, di)),请参见https://dev59.com/iGox5IYBdhLWcg3wGwlo - user1749431

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接