Java 8流的分区

73

如何在Java 8 Stream上实现“分区”操作?所谓的分区是指将一个流分成给定大小的子流。它与Guava Iterators.partition()方法相同,只是希望分区是惰性评估的流而不是列表。


9
根据我的经验,惰性评估分区通常是不可行的。如果你维护了对几个分区的引用并且按照错误的顺序访问它们,那么会发生什么呢? - Jon Skeet
3
@JonSkeet - 特别是如果它们是并行的。 - OldCurmudgeon
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - Trader001
我认为如果我更关注内存而不是并行性,我会想要使用“延迟计算流”的流。 - AlikElzin-kilaka
10个回答

57

对于任意的源数据流来说,将其划分为固定大小的批次是不可能的,因为这样会破坏并行处理。在并行处理时,您可能不知道拆分后第一个子任务中有多少元素,所以在第一个子任务完全处理之前无法为下一个子任务创建分区。

但是,可以从可随机访问的List创建分区流。例如,在我的StreamEx库中提供了此功能:

List<Type> input = Arrays.asList(...);

Stream<List<Type>> stream = StreamEx.ofSubLists(input, partitionSize);

或者如果你真的想要流式的流:

Stream<Stream<Type>> stream = StreamEx.ofSubLists(input, partitionSize).map(List::stream);

如果你不想依赖第三方库,可以手动实现ofSubLists方法:

public static <T> Stream<List<T>> ofSubLists(List<T> source, int length) {
    if (length <= 0)
        throw new IllegalArgumentException("length = " + length);
    int size = source.size();
    if (size <= 0)
        return Stream.empty();
    int fullChunks = (size - 1) / length;
    return IntStream.range(0, fullChunks + 1).mapToObj(
        n -> source.subList(n * length, n == fullChunks ? size : (n + 1) * length));
}

这个实现看起来有点长,但考虑了一些边角情况,比如接近MAX_VALUE的列表大小。


如果你想要无序流的并行友好解决方案(因此您不关心哪些流元素将组合成单个批次),您可以使用类似于以下方式的收集器(感谢@sibnick的启发):

public static <T, A, R> Collector<T, ?, R> unorderedBatches(int batchSize, 
                   Collector<List<T>, A, R> downstream) {
    class Acc {
        List<T> cur = new ArrayList<>();
        A acc = downstream.supplier().get();
    }
    BiConsumer<Acc, T> accumulator = (acc, t) -> {
        acc.cur.add(t);
        if(acc.cur.size() == batchSize) {
            downstream.accumulator().accept(acc.acc, acc.cur);
            acc.cur = new ArrayList<>();
        }
    };
    return Collector.of(Acc::new, accumulator,
            (acc1, acc2) -> {
                acc1.acc = downstream.combiner().apply(acc1.acc, acc2.acc);
                for(T t : acc2.cur) accumulator.accept(acc1, t);
                return acc1;
            }, acc -> {
                if(!acc.cur.isEmpty())
                    downstream.accumulator().accept(acc.acc, acc.cur);
                return downstream.finisher().apply(acc.acc);
            }, Collector.Characteristics.UNORDERED);
}

使用示例:

List<List<Integer>> list = IntStream.range(0,20)
                                    .boxed().parallel()
                                    .collect(unorderedBatches(3, Collectors.toList()));

结果:

[[2, 3, 4], [7, 8, 9], [0, 1, 5], [12, 13, 14], [17, 18, 19], [10, 11, 15], [6, 16]]

这种收集器是完全线程安全的,可以为顺序流生成有序批次。

如果您想对每个批次应用中间转换,可以使用以下版本:

public static <T, AA, A, B, R> Collector<T, ?, R> unorderedBatches(int batchSize,
        Collector<T, AA, B> batchCollector,
        Collector<B, A, R> downstream) {
    return unorderedBatches(batchSize, 
            Collectors.mapping(list -> list.stream().collect(batchCollector), downstream));
}

例如,您可以通过以下方式即时对批次中的数字进行求和:

List<Integer> list = IntStream.range(0,20)
        .boxed().parallel()
        .collect(unorderedBatches(3, Collectors.summingInt(Integer::intValue), 
            Collectors.toList()));

我非常希望看到像并行解决方案这样的东西被添加到StreamEx中(它已经成为我的项目中像Guava和Lombok一样必不可少的工具)。这并不是因为我关心并行性,而是因为它适用于流 - StreamEx.ofSubLists要求您已经拥有一个已折叠的列表,而我的用例通常是持续的流,我不想将其折叠成集合并一次性存储在内存中。 - Torque
有没有一个适用于有序流的并行友好解决方案?例如:将对象列表转换为按有序变量分组的列表流?例如:一旦有序变量发生变化,就将列表发送到下游? - TheJeff

13
我找到了一个优雅的解决方案:Iterable parts = Iterables.partition(stream::iterator, size)

8
可迭代对象源自Guava:https://github.com/google/guava/blob/master/guava/src/com/google/common/collect/Iterables.java 一些开发人员可能有不使用它的充分理由。您应该提及使用的第三方库。 - gouessej

10

如果您想顺序使用 Stream,可以对 Stream 进行分区(以及执行类似于分窗的相关函数-我认为这正是您在这种情况下真正想要的)。支持标准 Streams 的两个库分别是cyclops-react(我是作者)和 jOOλ,其中 cyclops-react 扩展了 jOOλ(添加了窗口分割等功能)。

对于操作 Java Streams,cyclops-streams 中有一组静态函数StreamUtils,以及一系列函数(如 splitAt、headAndTail、splitBy、partition)用于分区。

要将 Stream 划分为大小为 30 的嵌套流的流,可以使用 window 方法。

对于提问者的观点,在流处理术语中,将流拆分为给定大小的多个流是一个窗口操作(而不是分区操作)。

  Stream<Streamable<Integer>> streamOfStreams = StreamUtils.window(stream,30);

有一个名为ReactiveSeq的Stream扩展类,继承自jool.Seq并添加了窗口功能,这可能会使代码更加简洁。

  ReactiveSeq<Integer> seq;
  ReactiveSeq<ListX<Integer>> streamOfLists = seq.grouped(30);

正如Tagir在上面指出的,这并不适用于并行流。如果您想要对流进行窗口化或批处理以便以多线程方式执行,请考虑使用cyclops-react中的LazyFutureStream(窗口化在待办列表中,但简单的批处理现在已经可用)。

在这种情况下,数据将从执行流的多个线程传递到Multi-Producer/Single-Consumer无等待队列中,然后可以在再次分配给线程之前对该队列的顺序数据进行窗口化。

  Stream<List<Data>> batched = new LazyReact().range(0,1000)
                                              .grouped(30)
                                              .map(this::process);

6

正如Jon Skeet在他的评论中所展示的那样,似乎不可能使分区变成懒加载。 对于非懒加载的分区,我已经有了这段代码:

public static <T> Stream<Stream<T>> partition(Stream<T> source, int size) {
    final Iterator<T> it = source.iterator();
    final Iterator<Stream<T>> partIt = Iterators.transform(Iterators.partition(it, size), List::stream);
    final Iterable<Stream<T>> iterable = () -> partIt;

    return StreamSupport.stream(iterable.spliterator(), false);
}

23
虽然这是一个老话题,但我认为值得提一下——这不是纯Java 8:Iterators类来自Guava。 - Tomasz Stanczak

5
这是一个纯Java解决方案,采用惰性求值的方式评估,而不是使用List。
public static <T> Stream<List<T>> partition(Stream<T> stream, int batchSize){
    List<List<T>> currentBatch = new ArrayList<List<T>>(); //just to make it mutable 
    currentBatch.add(new ArrayList<T>(batchSize));
    return Stream.concat(stream
      .sequential()                   
      .map(new Function<T, List<T>>(){
          public List<T> apply(T t){
              currentBatch.get(0).add(t);
              return currentBatch.get(0).size() == batchSize ? currentBatch.set(0,new ArrayList<>(batchSize)): null;
            }
      }), Stream.generate(()->currentBatch.get(0).isEmpty()?null:currentBatch.get(0))
                .limit(1)
    ).filter(Objects::nonNull);
}

该方法返回 Stream<List<T>> 以提高灵活性。您可以通过 partition(something, 10).map(List::stream) 轻松地将其转换为 Stream<Stream<T>>

2
我找到了这个问题最优雅和纯粹的Java 8解决方案:
public static <T> List<List<T>> partition(final List<T> list, int batchSize) {
return IntStream.range(0, getNumberOfPartitions(list, batchSize))
                .mapToObj(i -> list.subList(i * batchSize, Math.min((i + 1) * batchSize, list.size())))
                .collect(toList());
}

//https://dev59.com/XGAg5IYBdhLWcg3wvNCF
private static <T> int getNumberOfPartitions(List<T> list, int batchSize) {
    return (list.size() + batchSize- 1) / batchSize;
}

1

这是一种高效的方式

import java.util.AbstractList;
import java.util.ArrayList;
import java.util.List;

public final class Partition<T> extends AbstractList<List<T>> {

private final List<T> list;
private final int chunkSize;

public Partition(List<T> list, int chunkSize) {
    this.list = new ArrayList<>(list);
    this.chunkSize = chunkSize;
}

public static <T> Partition<T> ofSize(List<T> list, int chunkSize) {
    return new Partition<>(list, chunkSize);
}

@Override
public List<T> get(int index) {
    int start = index * chunkSize;
    int end = Math.min(start + chunkSize, list.size());

    if (start > end) {
        throw new IndexOutOfBoundsException("Index " + index + " is out of the list range <0," + (size() - 1) + ">");
    }

    return new ArrayList<>(list.subList(start, end));
}

@Override
public int size() {
    return (int) Math.ceil((double) list.size() / (double) chunkSize);
}

}

使用方法
Partition<String> partition = Partition.ofSize(paCustomerCodes, chunkSize);

for (List<String> strings : partition) {
}

1
我认为在内部进行某种黑客攻击是有可能的:
创建批处理实用程序类:
public static class ConcurrentBatch {
    private AtomicLong id = new AtomicLong();
    private int batchSize;

    public ConcurrentBatch(int batchSize) {
        this.batchSize = batchSize;
    }

    public long next() {
        return (id.getAndIncrement()) / batchSize;
    }

    public int getBatchSize() {
        return batchSize;
    }
}

和方法:

public static <T> void applyConcurrentBatchToStream(Consumer<List<T>> batchFunc, Stream<T> stream, int batchSize){
    ConcurrentBatch batch = new ConcurrentBatch(batchSize);
    //hack java map: extends and override computeIfAbsent
    Supplier<ConcurrentMap<Long, List<T>>> mapFactory = () -> new ConcurrentHashMap<Long, List<T>>() {
        @Override
        public List<T> computeIfAbsent(Long key, Function<? super Long, ? extends List<T>> mappingFunction) {
            List<T> rs = super.computeIfAbsent(key, mappingFunction);
            //apply batchFunc to old lists, when new batch list is created
            if(rs.isEmpty()){
                for(Entry<Long, List<T>> e : entrySet()) {
                    List<T> batchList = e.getValue();
                    //todo: need to improve
                    synchronized (batchList) {
                        if (batchList.size() == batch.getBatchSize()){
                            batchFunc.accept(batchList);
                            remove(e.getKey());
                            batchList.clear();
                        }
                    }
                }
            }
            return rs;
        }
    };
    stream.map(s -> new AbstractMap.SimpleEntry<>(batch.next(), s))
            .collect(groupingByConcurrent(AbstractMap.SimpleEntry::getKey, mapFactory, mapping(AbstractMap.SimpleEntry::getValue, toList())))
            .entrySet()
            .stream()
            //map contains only unprocessed lists (size<batchSize)
            .forEach(e -> batchFunc.accept(e.getValue()));
}

你关于未记录的事实是正确的,这也是我称其为“hack”的原因。此外,你对于非原子性的computeIfAbsent也是正确的。我会很快编辑代码。但为什么它不是懒惰的呢?它不会在处理一个批次之前分配所有列表。而且并发批处理通常是无序的。 - sibnick
2
对于并行流,它根本不起作用。applyConcurrentBatchToStream(System.out::println, IntStream.range(0,100).boxed().parallel(), 3)会打印垃圾(随机收集的组,一些元素重复,甚至运行之间组数的数量都不同)。对于仅顺序流,有更简单和更有效的解决方案(例如OP提出的解决方案)。 - Tagir Valeev
但您也展示了错误的源头:非原子性的 computeIfAbsent - sibnick
2
关于排序,同时批处理无序并不常见。甚至有特殊的Stream API方法unordered(),您可以使用它来明确表示您不关心顺序。在许多情况下,您确实关心顺序。而且,我相信有一种更简单的替代方案来创建固定大小的无序批次... - Tagir Valeev
我已经添加了多线程问题的修复,但我还需要再考虑一下。 - sibnick
2
我发布了自己的无序并行解决方案。 - Tagir Valeev

0

这里有一个abacus-common的快速解决方案。

IntStream.range(0, Integer.MAX_VALUE).split(size).forEach(s -> N.println(s.toArray()));

声明:我是 abacus-common 的开发者。


0
这是一个纯Java 8解决方案 - 顺序和并行均可:
  public <T> Collection<List<T>> chunk(Collection<T> collection, int chunkSize) {
    final AtomicInteger index = new AtomicInteger();
    return collection.stream()
        .map(v -> new SimpleImmutableEntry<>(index.getAndIncrement() / chunkSize, v))
        // LinkedHashMap is used here just to preserve order
        .collect(groupingBy(Entry::getKey, LinkedHashMap::new, mapping(Entry::getValue, toList())))
        .values();
  }

  public <T> Collection<List<T>> chunkParallel(Collection<T> collection, int chunkSize) {
    final AtomicInteger index = new AtomicInteger();
    return collection.parallelStream()
        .map(v -> new SimpleImmutableEntry<>(index.getAndIncrement() / chunkSize, v))
        // So far it is parallel processing ordering cannot be preserved,
        // but we have to make it thread safe - using e.g. ConcurrentHashMap
        .collect(groupingBy(Entry::getKey, ConcurrentHashMap::new, mapping(Entry::getValue, toList())))
        .values();
  }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接