Java的BlockingQueue没有阻塞的peek方法?

40
我有一个阻塞对象队列。
我想编写一个线程,它会一直阻塞到队列中有一个对象为止,就像BlockingQueue.take()函数提供的功能一样。
然而,由于我不知道是否能够成功处理该对象,所以我只想进行peek()操作,而不是删除该对象。只有在成功处理该对象时才想要将其删除。
所以,我需要一个阻塞的peek()函数。但目前peek()根据javadocs仅在队列为空时才返回。
我是否漏掉了什么?是否有其他方法实现这个功能?
编辑:
如果我只使用一个线程安全队列,并进行peek()和sleep()操作,您对此有何想法?
public void run() {
    while (!exit) {
        while (queue.size() != 0) {
            Object o =  queue.peek();
            if (o != null) {
                if (consume(o) == true) {
                    queue.remove();
                } else {
                    Thread.sleep(10000); //need to backoff (60s) and try again
                }
            }
        }
        Thread.sleep(1000); //wait 1s for object on queue
    }
}

请注意,我只有一个消费者线程和一个(独立的)生产者线程。我猜这不如使用BlockingQueue高效... 欢迎任何评论。

8个回答

16
您可以使用LinkedBlockingDeque并从队列中物理移除该项 (使用takeLast()),但如果处理失败,则将其重新放回队列的末尾(使用 putLast(E e))。与此同时,“生产者”将使用putFirst(E e)在队列的前面添加元素。
您始终可以将此行为封装在自己的Queue实现中,并提供一个blockingPeek()方法,在底层的LinkedBlockingDeque上执行takeLast(),然后在幕后执行putLast()。因此,从调用客户端的角度来看,元素从未从您的队列中移除。

3
这是一个很好的建议。我唯一能看到的问题是,如果队列在我处理一个项目时填满了,那么我将无法将当前项目排队回去。 - rouble
你可以通过在包装实现中使用额外的同步来解决这个问题,从而使得take + put成为一个原子操作。你也可以使用无界队列。 - Adamski
2
我建议不要删除并重新添加,因为这样会使您将队列的状态更改暴露给其他线程。也许可以使用繁忙轮询来实现peek()。或者,如果您不想轮询,可以在包装器中使用与队列相关联的信号量。 - Ustaman Sangat
@Ustaman:是的,我同意。任何将这种行为封装在自己的队列实现中的人都必须处理这个问题。这不是一个很好的解决方案,因为除了 LinkedBlockingDeque 中定义的 ReentrantLock 之外,你可能还需要全局锁(鉴于它是包私有的)。如果我在 OP 的位置上,我会想看看是否可以通过重新设计来完全避免这种 "peek" 行为。 - Adamski

6

然而,由于我不知道是否能成功处理该对象,因此我只想使用peek()方法查看对象而不删除它。只有在成功处理它后才想要删除它。

一般来说,这种方式并不是线程安全的。如果您使用peek()方法确定对象可以成功处理,但在您使用take()方法删除和处理之前,另一个线程将该对象取走了怎么办?


1
在我看来,您可以通过包装和添加一些同步逻辑来解决此问题。 - yerlilbilgin

2

您是否可以在阻塞队列中添加一个事件监听器队列,然后当有东西被添加到(阻塞)队列时,向您的监听器发送一个事件?您的线程可以一直阻塞,直到调用了它的actionPerformed方法。


2
我知道唯一可以实现这个功能的是 Apache Commons Collections 中的 BlockingBuffer

如果在空缓冲区上调用 getremove,则调用线程会等待通知,直到 addaddAll 操作完成。

get() 相当于 peek(),通过使用 BlockingBuffer 装饰一个 UnboundedFifoBuffer,可以使 BufferBlockingQueue 一样工作。

2

虽然不是一个明确的答案,但是:JDK-6653412声称这不是一个有效的用例。


你能否从这个资源中添加更多信息吗?这样会让它成为一个可以接受的答案,我认为。 - Filnor

1
快速的答案是,实际上没有一种方法可以有一个阻塞的peek,除非你自己实现一个带有阻塞peek()的阻塞队列。
“我有什么遗漏吗?” peek()在并发方面可能会有问题 -
- 如果您无法处理已经peek()的消息,则它将留在队列中,除非您有多个消费者。 - 如果您无法处理该对象,谁将把它从队列中取出? - 如果您有多个消费者,则您会在peek()和另一个线程同时处理项目之间出现竞争条件,导致重复处理或更糟糕的情况。
听起来您最好实际上删除该项并使用责任链模式进行处理。
编辑:关于您的最后一个示例:如果您只有一个消费者,则永远无法摆脱队列中的对象 - 除非在此期间更新了该对象 - 在这种情况下,您最好非常小心地考虑线程安全性,并且可能不应该将该项放入队列中。

0

看起来 BlockingQueue 本身没有您指定的功能。

不过,我可能会尝试重新构思一下问题:对于您无法“正确处理”的对象,您会怎么做?如果您只是将它们留在队列中,那么您最终必须将它们取出并处理。我建议要么找出如何处理它们(通常,如果 queue.get() 返回任何无效或错误的值,您可能可以直接将其丢弃),要么选择一个不同于 FIFO 的数据结构。


0

最简单的解决方案

在处理成功之前,不要处理下一个元素。

public void run() {

Object lastSuccessfullyProcessedElement = null;

    while (!exit) {
        Object obj =  lastSuccessfullyProcessedElement == null ? queue.take() : lastSuccessfullyProcessedElement; // blocking
        
        boolean successful = process(obj);
        
        if(!successful) {
            lastSuccessfullyProcessedElement = obj;
        } else {
            lastSuccessfullyProcessedElement = null;
        }
    }
}
  1. 调用 peek() 并检查值是否为 null 不是 CPU 效率高的做法。

当队列为空时,我曾看到过以下程序在我的系统上 CPU 使用率达到了 10%。

while (true) {
   Object o = queue.peek();
   if(o == null) continue;
   // omitted for the sake of brevity
}

2. 添加 sleep() 会增加程序的运行时间。 3. 使用 putLast 将其添加回队列中将扰乱原有的顺序。此外,这是一个需要锁定的阻塞操作。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接