我已经根据您的情况修改了
这个问题的答案。自定义Spliterator将会把流分成多个按不同属性收集的流。
@SafeVarargs
public static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers)
{
return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();
}
public static class ForkingSpliterator<T>
extends AbstractSpliterator<T>
{
private Spliterator<T> sourceSpliterator;
private List<BlockingQueue<T>> queues = new ArrayList<>();
private boolean sourceDone;
@SafeVarargs
private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)
{
super(Long.MAX_VALUE, 0);
sourceSpliterator = source.spliterator();
for (Consumer<Stream<T>> fork : consumers)
{
LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
queues.add(queue);
new Thread(() -> fork.accept(StreamSupport.stream(new ForkedConsumer(queue), false))).start();
}
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
sourceDone = !sourceSpliterator.tryAdvance(t -> queues.forEach(queue -> queue.offer(t)));
return !sourceDone;
}
private class ForkedConsumer
extends AbstractSpliterator<T>
{
private BlockingQueue<T> queue;
private ForkedConsumer(BlockingQueue<T> queue)
{
super(Long.MAX_VALUE, 0);
this.queue = queue;
}
@Override
public boolean tryAdvance(Consumer<? super T> action)
{
while (queue.peek() == null)
{
if (sourceDone)
{
return false;
}
}
action.accept(queue.poll());
return true;
}
}
}
您可以按照以下方式使用它:
streamForked(Stream.of(new Row("content1", "client1", "location1", 1),
new Row("content2", "client1", "location1", 2),
new Row("content1", "client1", "location2", 3),
new Row("content2", "client2", "location2", 4),
new Row("content1", "client2", "location2", 5)),
rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,
Collectors.groupingBy(Row::getContent,
Collectors.summingInt(Row::getConsumption))))),
rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,
Collectors.groupingBy(Row::getLocation,
Collectors.summingInt(Row::getConsumption))))),
rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getContent,
Collectors.groupingBy(Row::getLocation,
Collectors.summingInt(Row::getConsumption))))));
请注意,您可以对流副本做任何您想要的事情。根据您的示例,我使用了一个
堆叠的groupingBy
收集器来按两个属性分组行,然后对int属性求和。因此结果将是一个
Map<String, Map<String, Integer>>
。但您也可以将其用于其他场景:
rows -> System.out.println(rows.count())
rows -> rows.forEach(row -> System.out.println(row))
rows -> System.out.println(rows.anyMatch(row -> row.getConsumption() > 3))