并行流调用Spliterator比其限制次数更多

4

我最近发现了一个错误,其中

StreamSupport.intStream(/* a Spliterator.ofInt */, true)
    .limit(20)

我曾经多次调用Spliterator.ofInt.tryAdvance超过20次。当我将其更改为

StreamSupport.intStream(/* a Spliterator.ofInt */, true)
    .sequential()
    .limit(20)

问题消失了。这是为什么?除了将其内置到Spliterator中,是否有方法可以在tryAdvance具有副作用时实现对并行流的严格限制?(这是为了测试一些返回无限流的方法,但是测试需要在没有“X毫秒的循环”构造的情况下最终结束。)

3
请提供一个“最小可复现代码示例”(MCVE)。 - shmosel
3个回答

5
似乎有一个关于如何交互 limittrySplit 的基本误解。假设不应该比指定的 limit 更多的调用 trySplit 是完全错误的。 trySplit 的目的是将源数据分成两部分,在最好的情况下,分成两个 ,因为 trySplit 应该尝试平衡划分。因此,如果您有一个包含一百万个元素的源数据集,成功的拆分会产生两个每个都有五十万个元素的源数据集。这与您可能已经应用到流中的 limit(20) 完全无关,除了我们预先知道,如果分裂器具有 SIZED|SUBSIZED 特征,则可以丢弃第二个数据集,因为请求的前 20 个元素只能在前五十万个元素中找到。
很容易计算出,在最好的情况下,即平衡分割的情况下,我们需要进行十五次拆分操作,每次删除上半部分,然后才能在前 20 个元素之间得到一个拆分,以便我们可以并行处理这前 20 个元素。
这可以很容易地证明:
class DebugSpliterator extends Spliterators.AbstractIntSpliterator {
    int current, fence;
    DebugSpliterator() {
        this(0, 1_000_000);
    }
    DebugSpliterator(int start, int end) {
        super(end-start, ORDERED|SIZED|SUBSIZED);
        current = start;
        fence = end;
    }
    @Override public boolean tryAdvance(IntConsumer action) {
        if(current<fence) {
            action.accept(current++);
            return true;
        }
        return false;
    }
    @Override public OfInt trySplit() {
        int mid = (current+fence)>>>1;
        System.out.println("trySplit() ["+current+", "+mid+", "+fence+"]");
        return mid>current? new DebugSpliterator(current, current=mid): null;
    }
}

StreamSupport.stream(new DebugSpliterator(), true)
    .limit(20)
    .forEach(x -> {});

在我的机器上,它打印出:
trySplit() [0, 500000, 1000000]
trySplit() [0, 250000, 500000]
trySplit() [0, 125000, 250000]
trySplit() [0, 62500, 125000]
trySplit() [0, 31250, 62500]
trySplit() [0, 15625, 31250]
trySplit() [0, 7812, 15625]
trySplit() [0, 3906, 7812]
trySplit() [0, 1953, 3906]
trySplit() [0, 976, 1953]
trySplit() [0, 488, 976]
trySplit() [0, 244, 488]
trySplit() [0, 122, 244]
trySplit() [0, 61, 122]
trySplit() [0, 30, 61]
trySplit() [0, 15, 30]
trySplit() [15, 22, 30]
trySplit() [15, 18, 22]
trySplit() [15, 16, 18]
trySplit() [16, 17, 18]
trySplit() [0, 7, 15]
trySplit() [18, 20, 22]
trySplit() [18, 19, 20]
trySplit() [7, 11, 15]
trySplit() [0, 3, 7]
trySplit() [3, 5, 7]
trySplit() [3, 4, 5]
trySplit() [7, 9, 11]
trySplit() [4, 4, 5]
trySplit() [9, 10, 11]
trySplit() [11, 13, 15]
trySplit() [0, 1, 3]
trySplit() [13, 14, 15]
trySplit() [7, 8, 9]
trySplit() [1, 2, 3]
trySplit() [8, 8, 9]
trySplit() [5, 6, 7]
trySplit() [14, 14, 15]
trySplit() [17, 17, 18]
trySplit() [11, 12, 13]
trySplit() [12, 12, 13]
trySplit() [2, 2, 3]
trySplit() [10, 10, 11]
trySplit() [6, 6, 7]

当然,这远远超过了20次的拆分尝试,但这是完全合理的,因为数据集必须被拆分到目标范围内的子范围之内,以便能够并行处理。

我们可以通过删除导致此执行策略的元信息来强制执行不同的行为:

StreamSupport.stream(new DebugSpliterator(), true)
    .filter(x -> true)
    .limit(20)
    .forEach(x -> {});

由于流API不知道谓词的行为,所以管道失去了其SIZED特性,导致

trySplit() [0, 500000, 1000000]
trySplit() [500000, 750000, 1000000]
trySplit() [500000, 625000, 750000]
trySplit() [625000, 687500, 750000]
trySplit() [625000, 656250, 687500]
trySplit() [656250, 671875, 687500]
trySplit() [0, 250000, 500000]
trySplit() [750000, 875000, 1000000]
trySplit() [250000, 375000, 500000]
trySplit() [0, 125000, 250000]
trySplit() [250000, 312500, 375000]
trySplit() [312500, 343750, 375000]
trySplit() [125000, 187500, 250000]
trySplit() [875000, 937500, 1000000]
trySplit() [375000, 437500, 500000]
trySplit() [125000, 156250, 187500]
trySplit() [250000, 281250, 312500]
trySplit() [750000, 812500, 875000]
trySplit() [281250, 296875, 312500]
trySplit() [156250, 171875, 187500]
trySplit() [437500, 468750, 500000]
trySplit() [0, 62500, 125000]
trySplit() [875000, 906250, 937500]
trySplit() [62500, 93750, 125000]
trySplit() [812500, 843750, 875000]
trySplit() [906250, 921875, 937500]
trySplit() [0, 31250, 62500]
trySplit() [31250, 46875, 62500]
trySplit() [46875, 54687, 62500]
trySplit() [54687, 58593, 62500]
trySplit() [58593, 60546, 62500]
trySplit() [60546, 61523, 62500]
trySplit() [61523, 62011, 62500]
trySplit() [62011, 62255, 62500]

这段内容涉及IT技术,展示了使用较少的trySplit调用并不能改进程序的情况。通过查看数据可以发现,现在处理了结果元素范围之外的数据(如果我们知道所有元素会通过筛选),更糟糕的是,所有结果元素都被一个分裂器完全覆盖,导致我们的结果元素无法进行并行处理。其他线程则在处理后续被丢弃的元素。

当然,我们可以很容易地通过改变方式来实现我们任务的最优划分。

int mid = (current+fence)>>>1;

to

int mid = fence>20? 20: (current+fence)>>>1;

所以
StreamSupport.stream(new DebugSpliterator(), true)
    .limit(20)
    .forEach(x -> {});

导致

trySplit() [0, 20, 1000000]
trySplit() [0, 10, 20]
trySplit() [10, 15, 20]
trySplit() [10, 12, 15]
trySplit() [12, 13, 15]
trySplit() [0, 5, 10]
trySplit() [15, 17, 20]
trySplit() [5, 7, 10]
trySplit() [0, 2, 5]
trySplit() [17, 18, 20]
trySplit() [2, 3, 5]
trySplit() [5, 6, 7]
trySplit() [15, 16, 17]
trySplit() [6, 6, 7]
trySplit() [16, 16, 17]
trySplit() [0, 1, 2]
trySplit() [7, 8, 10]
trySplit() [8, 9, 10]
trySplit() [1, 1, 2]
trySplit() [3, 4, 5]
trySplit() [9, 9, 10]
trySplit() [18, 19, 20]
trySplit() [10, 11, 12]
trySplit() [13, 14, 15]
trySplit() [11, 11, 12]
trySplit() [4, 4, 5]
trySplit() [14, 14, 15]

但这并不是一个通用的Spliterator,如果限制不是20,它的性能就会变差。

如果我们将限制纳入Spliterator或更一般地纳入流源中,我们就不会遇到这个问题。所以,你可以使用 list.subList(0, Math.min(x, list.size())).stream() 代替 list.stream().limit(x),使用 random.ints(x) 代替 random.ints().limit(x),使用 LongStream.range(0, x).mapToObj(index -> generator.get()) 或使用此答案的工厂方法代替 Stream.generate(generator).limit(x)

对于任意流源/Spliterator,应用 limit 对于并行流来说可能非常昂贵,这甚至在文档中有所记录。嗯,在 trySplit 中具有副作用本来就是个坏主意。


显然不是通用的Spliterator,但我发现这个很聪明:int mid = fence>20? 20: (current+fence)>>>1; - Eugene

2

我不认为这是任何错误,但这仍然是一个非常有趣的想法,tryAdvance 可以具有副作用。

就我所理解的情况而言,当您的 trySplit 不将其拆分为单个元素批次时,完全可以实现该目标。

例如,您有一个数组,并且希望通过 trySplit 将其拆分为每个子数组不少于 4 个元素的部分。在这种情况下,当无法再次拆分时(例如,已达到当前 Spliterator 中最少的 4 个元素),处理将开始 - forEachRemaning 将被调用;反过来,它将默认为在当前 Spliterator 中的每个元素上调用 tryAdvance,如默认实现中所示:

default void forEachRemaining(Consumer<? super T> action) {
    do { } while (tryAdvance(action));
}

很显然,由于您正在并行工作,一旦线程开始工作(读取执行它的forEachRemaning),它就不能再停止了,所以会有更多的元素被传递到tryAdvance

因此,我真的认为除了将其集成到Spliterator中,没有其他方法可以做到这一点; 我认为这应该可行:

  static class LimitingSpliterator<T> implements Spliterator<T> {

    private int limit;

    private final Supplier<T> generator;

    private LimitingSpliterator(Supplier<T> generator, int limit) {
        this.limit = limit;
        this.generator = generator;
    }

    static <T> LimitingSpliterator<T> of(Supplier<T> supplier, int limit) {
        return new LimitingSpliterator<>(supplier, limit);
    }

    @Override
    public boolean tryAdvance(final Consumer<? super T> consumer) {
        Objects.requireNonNull(consumer);
        if (limit > 0) {
            --limit;
            generator.get();
            consumer.accept(generator.get());
            return true;
        }
        return false;
    }

    @Override
    public void forEachRemaining(final Consumer<? super T> consumer) {
        while (limit > 0) {
            consumer.accept(generator.get());
            --limit;
        }
    }

    @Override
    public LimitingSpliterator<T> trySplit() {
        int half = limit >> 2;
        limit = limit - half;
        return new LimitingSpliterator<>(generator, half);
    }

    @Override
    public long estimateSize() {
        return limit << 2;
    }

    @Override
    public int characteristics() {
        return SIZED;
    }
}

或者只需使用 LongStream.range(0, limit).unordered().mapToObj(x -> generator.get())… [.parallel()…] - Holger

0

针对我的用例,解决方案是使用以下代码:

LongStream.range(0, streamSize).unordered().parallel().mapToInt(ignored -> nextInt())

注:这适用于从可能会不断重新播种的PRNG中获取的随机数流。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接