Python等待队列和事件

7

我有一个队列和一个事件。 当事件设置为True时,我想退出循环,但循环中有一个queue.get()会一直阻塞直到队列中有东西。

当closeEvent事件标志被设置时,如何中止等待self._commandQueue.get()?

注意:我想避免依赖队列的阻塞性,并希望基于队列和事件标志的条件进行阻塞。

def _execute(self):
    while not self._closeEvent.isSet():
        nextCommand = self._commandQueue.get()
        self._commandExecutor.execute(nextCommand)
        self._commandQueue.task_done()
4个回答

4
你需要类似于Windows的WaitForMultipleObjects()调用,但是Python事件和队列API并没有提供这样的功能(除非你只在Windows上使用win32api),所以如果你真的需要同时检查两个事件源,答案是“你不能,除非进行轮询(或者修补Event类以允许它)”。
但是,如果你有一定的灵活性,你可以通过重新定义命令队列来实现类似的功能。如果命令队列是一个PriorityQueue,你可以将普通优先级的常规作业加入队列,并使用更高的优先级让额外的进程队列中止标记一旦事件发生信号。
STOP = None

def _execute(self):
    while 1:
        nextCommand = self._commandQueue.get()[1]
        if nextCommand is STOP:
           break
        self._commandExecutor.execute(nextCommand)
        self._commandQueue.task_done()

def wait_for_stop_signal(self):
    self._closeEvent.wait()
    self._commandQueue.put((-1, STOP))

现在你在单独的线程中运行 wait_for_stop_signal 函数,这样你就可以获得想要的结果(但会浪费一个线程,而不是轮询,根据你的使用情况选择更糟糕的选项)。

是的,这就是我试图弄清楚的。很遗憾发现在Python API中没有这样的东西可用 :( - Har

4

你发布的帖子提供了另一种选择,这对于schlenk所描述的内容非常有用。非常感谢 :) - Har
这种等待私有 API 信号的方式可能不太好,而且在这种情况下也没有帮助。但是其他线程链接很有用,特别是那种(肮脏的,猴子补丁)重新定义 Event 做法的版本。 - schlenk

2
我可以提供两种方案,可能会适合您:

解决方案1

在您的队列中放置一个“魔法退出标记”。我将使用None,如果需要,您可以使用特殊的退出消息。这与使用优先级队列方法基本相同,但是 (a) 它更简单,(b) 但是队列必须清空到标记点,这可能或可能不可接受。

例如:

# Untested
# Processing thread
def run(self):
    while True:
        msg = self.main_queue.get()
        if msg == None:
            break
        ...do..queue..work...

def shutdown():
    self.main_queue.put(None)

解决方案2

不要直接调用Queue.get方法。创建一个事件对象,当以下情况之一发生时触发: * 将某些内容放入队列中。 * 想要关闭队列。

不要将队列暴露给外部,而是定义一个向队列添加元素的方法:

def add_to_queue(self, msg):
    self.main_queue.put(msg)
    self.my_event.set() # Signal the consumer

def shutdown(self, msg):
    self.quit_event.set() # Set an event / condvar / msg to indicate exit
    self.my_event.set() # Signal something has happened.




def run(self)
    while True:
        self.my_event.wait()
        if self.quit_event.is_set()
            break
        msg = self.main_queue.get(False) # Empty exception should not happen here

您不一定需要一个退出事件,因为您可以假设一个空队列和一个已发出信号的my_event意味着是时候退出了。尽管如此,我认为最好明确地使用适当的指示器 - 消息、事件、条件变量等。


为什么这个答案被低估了呢...解决方案1才是最优的!!! - Kris Jobs

2

Queue.get方法在没有参数的情况下调用时是一种阻塞方法。

根据文档:

将项目放入队列中。如果可选参数block为true且timeout为None(默认值),则必要时阻止,直到有一个空闲插槽可用。如果timeout是正数,则最多阻塞timeout秒,并在该时间内没有可用的空闲插槽时引发Full异常。否则(block为false),如果立即有一个空闲插槽,则将项目放入队列中,否则引发Full异常(在这种情况下忽略timeout)。

你需要做类似于这样的事情:

try:
    # If `False`, the program is not blocked, it will throw the Queue.Empty exception.
    my_data = queue.get(False)  
    .....Some Code......
except Queue.Empty:
    my_data = None # or whatever you want

更多选项

  1. get_nowait:

    Queue.get_nowait()相当于get(False)

  2. 使用超时:

    my_data = queue.get(True, 5)。如果获取失败(没有获取到任何内容),它将尝试获取5秒钟,然后引发相同的异常Queue.Empty


是的,这就是我想要避免尝试的。我不希望它仅基于队列阻止/不阻止并永远循环,我希望它阻止直到队列不为空closeEvent标志未设置。 - Har
1
然后在你的while条件中使用Queue.empty() - Kobi K
那样做并不能解决阻塞的问题,它仍然会绕过while循环。我希望它能一直阻塞,直到队列中有东西且事件标志未设置。我认为你所建议的会使while循环迭代但不会阻塞。另外请注意,while循环有点像while True语句。 - Har

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接