使用流在列表的列表中查找对象

7

我正在尝试编写一个方法,找到列表中对象的索引并利用并行处理。这是我的代码。

// returns [i, j] where lists.get(i).get(j) equals o, or null if o is not present.
public static int[] indices(List<? extends List<?>> lists, Object o) {
    return IntStream.range(0, lists.size())
                    .boxed()
                    .flatMap(i -> IntStream.range(0, lists.get(i).size()).mapToObj(j -> new int[]{i, j}))
                    .parallel()
                    .filter(a -> {
                        System.out.println(Arrays.toString(a));     // For testing only
                        return Objects.equals(o, lists.get(a[0]).get(a[1]));
                    })
                    .findAny()
                    .orElse(null);
}

当我运行以下代码时:
List<List<String>> lists = Arrays.asList(
        Arrays.asList("A", "B", "C"),
        Arrays.asList("D", "E", "F", "G"),
        Arrays.asList("H", "I"),
        Collections.nCopies(5, "J")
);
System.out.println("Indices are " + Arrays.toString(indices(lists, "J")));

输出结果大致如下
[0, 0]
[0, 1]
[0, 2]
[3, 0]
[3, 1]
[3, 2]
[3, 3]
[2, 0]
[3, 4]
[1, 0]
[1, 1]
[2, 1]
[1, 2]
[1, 3]
Indices are [3, 0]

换句话说,即使找到了目标对象,搜索仍在继续。难道findAny不应该是一项短路操作吗?我漏掉了什么?此外,在迭代列表或嵌套数组时利用并行性的最佳方法是什么?

编辑

根据@Sotirios答案中的想法,我得到了一个输出

Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 0]
Thread[main,5,main] [2, 0]
Thread[main,5,main] [2, 1]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 0]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 1]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 2]
Thread[ForkJoinPool.commonPool-worker-1,5,main] [1, 3]
Thread[main,5,main] [0, 0]
Thread[main,5,main] [0, 1]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 1]
Thread[main,5,main] [0, 2]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 2]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 3]
Thread[ForkJoinPool.commonPool-worker-3,5,main] [3, 4]
Indices are [3, 0]

请注意
Thread[ForkJoinPool.commonPool-worker-3,5,main]

即使找到答案,也会继续搜索。


请使用findFirst()代替。 - Tahar Bakir
@TaharBakir 它仍在继续搜索。 - Paul Boddington
1
此外,并行性可能需要一些时间,才能让一个线程通知其他线程它们不需要继续执行。 - Louis Wasserman
@LouisWasserman 当我按照Sotirios的答案打印线程时,我发现即使在同一个线程中,在找到对象之后搜索仍然会继续。 - Paul Boddington
3个回答

7

短路操作不能保证只拉取最少的元素来生成结果。它们可能会这样做,但不是必须的。

flatMap的当前实现总是将子流的整个内容下推到下游。因此,即使您的流不是并行的,您也可能看到比满足findAny所需的更多元素通过流。


看起来这个答案是正确的,flatMap().filter().findAny() 基本上不会短路。我不知道为什么会这样实现。 - Paul Boddington
1
“短路计算”仅意味着它可能在检查整个流之前终止。 它没有任何其他保证。 - Misha

2
关于“为什么要这样实现”,问题根源在Stream API的实现中。flatMap体内经常会创建一个带有某些中间操作(例如.flatMap(list -> list.stream().map(...).filter(...)))的流。可以在flatMap实现内部使用stream.spliterator()并多次调用tryAdvance直到请求取消。但是,当流包含中间操作时,spliterator()调用返回的是一种人工的spliterator(如果没有,则返回原始流spliterator)。这种人工spliterator的tryAdvance()实现不太高效,因此与消耗整个flatMapped流相比,使用该实现可能被认为是更糟糕的性能缺陷。在许多情况下,您可以对一些短流进行flatMap,因此在当前实现中,您可能会获得性能提升。

1

它并不是一直在运行,而是已经分派了各种线程来尝试查找结果,并等待这些线程完成后才返回结果。

换句话说,findAny 终端操作会将“搜索”任务提交给多个线程。这些任务只是应用 filter Predicate 并在某个值返回 true 时返回。findAny 可能会等待其中一个返回值。它没有办法真正取消已经提交的任何内容,而且似乎该实现将阻塞直到整批返回。它只能停止提交任何未来的批次。

您可以通过记录当前线程来验证此操作:

System.out.println(Thread.currentThread() + " " + Arrays.toString(a)); // For testing only

我有点困,可能这是个愚蠢的问题,但如果一堆工作线程被提前分配任务,并且整个方法直到它们全部完成才能返回,那么什么是短路运算? - Paul Boddington
1
@PaulBoddington 我不认为是“全部”,我认为是一些子集。 - Sotirios Delimanolis
1
例如,我启动了5个线程进行搜索。这5个线程可能都会返回结果。但在做出决定之前,我必须等待所有5个线程完成。(实际上,你只需要等待其中一个,但你无法取消其他线程。而且这种实现似乎想要等待这5个任务全部完成后再汇总。) - Sotirios Delimanolis
2
@PaulBoddington 我认为这是Misha所说的flatMap的结果。我得赶上公交车,一会儿回来。 - Sotirios Delimanolis

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接