Java 8中的PriorityBlockingQueue流是无序的。

6
这两段代码的输出顺序不同。 第一段代码:
while(!jobQueue.isEmpty()) {
    TimeoutJobRequest job = jobQueue.peek();
    if(job.isReady()) {
        execute(job);
        jobQueue.poll();
    } else {
        return;
    }
}

第二部分:

jobQueue.stream()
        .filter(TimeoutJobRequest::isReady)
        .peek(jobQueue::remove)
        .forEach(this::execute);

请注意,jobQueue 是一个 PriorityBlockingQueue
只有当this::execute比较长(比如几秒钟)时才会重新排序。

3
问题是什么? - Alex - GlassEditor.com
6
PriorityBlockingQueue#stream 方法不会按照队列优先级对元素进行排序。 - Misha
4个回答

7
PriorityBlockingQueuestream 遵循 Iterator 的顺序,根据文档所述:

在方法 iterator() 中提供的迭代器不能保证以任何特定顺序遍历 PriorityBlockingQueue 的元素。

如果您想按优先级顺序获取元素,则需要从 PriorityBlockingQueuepoll 元素。
PriorityBlockingQueue<Integer> pq = new PriorityBlockingQueue<>();
pq.add(5);
pq.add(8);
pq.add(3);

System.out.println("-- Try 1 --");
pq.stream().forEach(System.out::println);

System.out.println("-- Try 2 --");
IntStream.range(0, pq.size()).map(i -> pq.poll()).forEach(System.out::println);

输出结果(可能取决于Java实现):
-- Try 1 --
3
8
5
-- Try 2 --
3
5
8

1
请注意,stream().sorted() 也应该可以工作(并且可能比轮询每个元素更快)。 - Tagir Valeev
1
好观点!虽然采用我的方法,我们不需要关心PriorityBlockingQueue底层的顺序,因为它可以通过Comparator进行初始化。确实,在这种情况下,我们也可以使用stream().sorted(pq.comparator()),但是我们需要小心,因为sorted不喜欢null - Helder Pereira

5
如果你想创建一个遵循队列顺序的流,可以尝试以下代码(它会清空队列):
Stream.generate(jobQueue::poll).limit(jobQueue.size())

1
很遗憾,迭代顺序!=优先级顺序。
我为使用Stream API遍历按优先级顺序排序的PriorityQueue准备了两个可复制粘贴的解决方案:
static <T> Stream<T> drainToStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    return Stream.generate(queue::poll)
      .limit(queue.size());
}

static <T> Stream<T> asStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    Comparator<? super T> comparator = queue.comparator();
    return comparator != null
      ? queue.stream().sorted(comparator)
      : queue.stream().sorted();
}

drainToStream 方法会清空队列,而 asStream 方法不会改变原始队列。


非常好,谢谢Grzegorz。当asStream()被包含为实例方法时,该方法的主体可以简化为this.stream().sorted(comparator)。非常漂亮和简单。 - awwsmm

1
第一段代码与第二段不相等,当 job.isReady() 函数返回 false 时,第一段代码会终止,但第二段代码仍在运行,流的 filter 函数只是一个过滤操作。
您可以将第一段代码更改为:
while(!jobQueue.isEmpty()) {
    TimeoutJobRequest job = jobQueue.peek();
    if(job.isReady()) {
        execute(job);
        jobQueue.poll();
    } 
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接