Java 8 Stream:使用多个收集器进行分组

14

我想使用Java 8 Stream和一个分类器进行分组,但有多个收集器函数。因此,在分组时,例如计算一个字段(或者另一个字段)的平均值和总和。

我尝试用一个例子简化一下:

public void test() {
    List<Person> persons = new ArrayList<>();
    persons.add(new Person("Person One", 1, 18));
    persons.add(new Person("Person Two", 1, 20));
    persons.add(new Person("Person Three", 1, 30));
    persons.add(new Person("Person Four", 2, 30));
    persons.add(new Person("Person Five", 2, 29));
    persons.add(new Person("Person Six", 3, 18));

    Map<Integer, Data> result = persons.stream().collect(
            groupingBy(person -> person.group, multiCollector)
    );
}

class Person {
    String name;
    int group;
    int age;

    // Contructor, getter and setter
}

class Data {
    long average;
    long sum;

    public Data(long average, long sum) {
        this.average = average;
        this.sum = sum;
    }

    // Getter and setter
}

结果应该是一个关联着分组结果的 Map

1 => Data(average(18, 20, 30), sum(18, 20, 30))
2 => Data(average(30, 29), sum(30, 29))
3 => ....

这个方案可以完美地使用像"Collectors.counting()"这样的一个函数,但是我想要链接多个函数(最理想的情况是从一个列表中无限链下去)。

List<Collector<Person, ?, ?>>

可以做类似这样的事情吗?


我理解的对吗,你的“Data”类只是一组flatmap函数的占位符?每个函数都必须对所有组执行相同的操作(例如首先计算组的平均年龄,其次是总年龄等)? - SME_Dev
当我理解你的意思后,我认为这只是用于将多个数据保存在一个对象中,同时仍能够识别每个数据的方法。它也可以是一个数组或一个键为函数名称、值为函数结果的映射。 - PhilippS
5个回答

18

对于求和和平均值的具体问题,可以使用collectingAndThensummarizingDouble

Map<Integer, Data> result = persons.stream().collect(
        groupingBy(Person::getGroup, 
                collectingAndThen(summarizingDouble(Person::getAge), 
                        dss -> new Data((long)dss.getAverage(), (long)dss.getSum()))));

对于更通用的问题(收集有关您的人的各种信息),您可以创建如下复杂的收集器:

// Individual collectors are defined here
List<Collector<Person, ?, ?>> collectors = Arrays.asList(
        Collectors.averagingInt(Person::getAge),
        Collectors.summingInt(Person::getAge));

@SuppressWarnings("unchecked")
Collector<Person, List<Object>, List<Object>> complexCollector = Collector.of(
    () -> collectors.stream().map(Collector::supplier)
        .map(Supplier::get).collect(toList()),
    (list, e) -> IntStream.range(0, collectors.size()).forEach(
        i -> ((BiConsumer<Object, Person>) collectors.get(i).accumulator()).accept(list.get(i), e)),
    (l1, l2) -> {
        IntStream.range(0, collectors.size()).forEach(
            i -> l1.set(i, ((BinaryOperator<Object>) collectors.get(i).combiner()).apply(l1.get(i), l2.get(i))));
        return l1;
    },
    list -> {
        IntStream.range(0, collectors.size()).forEach(
            i -> list.set(i, ((Function<Object, Object>)collectors.get(i).finisher()).apply(list.get(i))));
        return list;
    });

Map<Integer, List<Object>> result = persons.stream().collect(
        groupingBy(Person::getGroup, complexCollector)); 

映射值是列表,其中第一个元素是应用第一个收集器的结果等等。您可以使用Collectors.collectingAndThen(complexCollector, list -> ...)添加自定义完成步骤,将该列表转换为更合适的内容。


好的,这很有趣,我认为与Peter Lawrey的答案相关。但是这意味着它不适用于每种类型的函数。我想使用功能列表(收集器)。通过您的解决方案,我认为我受到summarizingDouble的限制。 - PhilippS
哇!谢谢你的回答。没想到会得到这么好的东西。我刚刚测试了一下,结果正是我所期望的。现在我只需要理解你的代码的所有内容。我认为这需要一些时间。然后我就可以稍微调整一下,以完全符合我的目标。但这真的是我一直在寻找的! - PhilippS
1
@PhilippS,它只是将个别收集器的个别操作结合起来(新供应商创建个别供应商结果列表,新累加器为每个个别列表项调用累加等)。如果对这个实现有具体问题,请随时提问。 - Tagir Valeev
我会通过将所有内容放入工厂方法中来重新设计它。collectors 可以是一个本地变量(实际上是方法参数),被 lambda 闭合。finisher lambda 还可以应用用户提供的函数,该函数(在 OP 的情况下)将获取列表并返回 Data 对象。 - Marko Topolnik
我将结合给定收集器的代码提取到一个方法中,并在https://gist.github.com/dpolivaev/50cc9eb1b75453d37195882a9fc9fb69中修复了泛型。 - dpolivaev

4
通过使用地图作为输出类型,可以有一个潜在的无限列表的减速器,每个都生成自己的统计信息并将其添加到地图中。
public static <K, V> Map<K, V> addMap(Map<K, V> map, K k, V v) {
    Map<K, V> mapout = new HashMap<K, V>();
    mapout.putAll(map);
    mapout.put(k, v);
    return mapout;
}

...

    List<Person> persons = new ArrayList<>();
    persons.add(new Person("Person One", 1, 18));
    persons.add(new Person("Person Two", 1, 20));
    persons.add(new Person("Person Three", 1, 30));
    persons.add(new Person("Person Four", 2, 30));
    persons.add(new Person("Person Five", 2, 29));
    persons.add(new Person("Person Six", 3, 18));

    List<BiFunction<Map<String, Integer>, Person, Map<String, Integer>>> listOfReducers = new ArrayList<>();

    listOfReducers.add((m, p) -> addMap(m, "Count", Optional.ofNullable(m.get("Count")).orElse(0) + 1));
    listOfReducers.add((m, p) -> addMap(m, "Sum", Optional.ofNullable(m.get("Sum")).orElse(0) + p.i1));

    BiFunction<Map<String, Integer>, Person, Map<String, Integer>> applyList
            = (mapin, p) -> {
                Map<String, Integer> mapout = mapin;
                for (BiFunction<Map<String, Integer>, Person, Map<String, Integer>> f : listOfReducers) {
                    mapout = f.apply(mapout, p);
                }
                return mapout;
            };
    BinaryOperator<Map<String, Integer>> combineMaps
            = (map1, map2) -> {
                Map<String, Integer> mapout = new HashMap<>();
                mapout.putAll(map1);
                mapout.putAll(map2);
                return mapout;
            };
    Map<String, Integer> map
            = persons
            .stream()
            .reduce(new HashMap<String, Integer>(),
                    applyList, combineMaps);
    System.out.println("map = " + map);

生成:

map = {Sum=10, Count=6}

3

不要使用收集器的链接方式,而应该构建一个聚合器来实现收集器:使用类接受一个收集器列表并将每个方法调用委托给它们。最后,您可以使用所有嵌套收集器产生的结果返回 new Data()

您可以避免创建具有所有方法声明的自定义类,而是利用Collector.of(supplier, accumulator, combiner, finisher, Collector.Characteristics... characteristics)finisher Lambda将调用每个嵌套收集器的完成器,然后返回Data实例。


感谢你的回答。特别是与其他答案结合起来,这为我如何调整代码提供了良好的背景知识。 - PhilippS

3
您可以将它们链接起来。收集器只能生成一个对象,但该对象可以容纳多个值。例如,您可以返回一个Map,其中Map的每个条目都对应于您要返回的收集器。Collectors.of(HashMap::new, accumulator, combiner);可用。
您的accumulator将具有Collectors Map,其中生成的Map的键与Collector的名称匹配。当在并行执行时,combiner需要一种组合多个结果的方法。
通常内置的收集器使用数据类型来表示复杂结果。
来自Collectors
public static <T>
Collector<T, ?, DoubleSummaryStatistics> summarizingDouble(ToDoubleFunction<? super T> mapper) {
    return new CollectorImpl<T, DoubleSummaryStatistics, DoubleSummaryStatistics>(
            DoubleSummaryStatistics::new,
            (r, t) -> r.accept(mapper.applyAsDouble(t)),
            (l, r) -> { l.combine(r); return l; }, CH_ID);
}

并且处于它自己的类别中

public class DoubleSummaryStatistics implements DoubleConsumer {
    private long count;
    private double sum;
    private double sumCompensation; // Low order bits of sum
    private double simpleSum; // Used to compute right sum for non-finite inputs
    private double min = Double.POSITIVE_INFINITY;
    private double max = Double.NEGATIVE_INFINITY;

我正在努力弄清楚这个如何帮助我。也许我已经走错了方向。这难道不只是一个内置函数,就像提到的“Collectors.counting()”一样吗?但是我想做的是使用两个或更多的这些内置函数(在编译时未知)。也许你可以再解释一下。 - PhilippS
谢谢回答。这提供了很好的背景知识,特别是理解Tagir的答案并对其进行调整。 - PhilippS

1

еңЁJava12дёӯпјҢCollectors APIе·Із»Ҹжү©еұ•дәҶдёҖдёӘйқҷжҖҒзҡ„teeting(...)еҮҪж•°пјҡ

teetingвҖӢ(Collector<? super T,вҖӢ?,вҖӢR1> downstream1, Collector<? super T,вҖӢ?,вҖӢR2> downstream2, BiFunction<? super R1,вҖӢ? super R2,вҖӢR> merger)

иҝҷжҸҗдҫӣдәҶдёҖдёӘеҶ…зҪ®еҠҹиғҪпјҢеҸҜд»ҘеңЁдёҖдёӘStreamдёҠдҪҝз”ЁдёӨдёӘ收йӣҶеҷЁпјҢ并е°Ҷз»“жһңеҗҲ并дёәдёҖдёӘеҜ№иұЎгҖӮ

д»ҘдёӢжҳҜдёҖдёӘе°ҸдҫӢеӯҗпјҢе…¶дёӯе°Ҷе‘ҳе·ҘеҲ—иЎЁеҲҶжҲҗе№ҙйҫ„з»„пјҢ并еҜ№жҜҸдёӘз»„жү§иЎҢдёӨдёӘCollectors.summarizingInt()пјҲеҜ№е№ҙйҫ„е’Ңи–Әж°ҙпјүиҝ”еӣһдҪңдёәIntSummaryStatisticsеҲ—иЎЁпјҡ

import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;

public class CollectorTeeingTest {

public static void main(String... args){

    NavigableSet<Integer> age_groups = new TreeSet<>();
    age_groups.addAll(List.of(30,40,50,60,Integer.MAX_VALUE)); //we don't want to map to null

    Function<Integer,Integer> to_age_groups = age -> age_groups.higher(age);

    List<Employee> employees = List.of( new Employee("A",21,2000),
                                        new Employee("B",24,2400),
                                        new Employee("C",32,3000),
                                        new Employee("D",40,4000),
                                        new Employee("E",41,4100),
                                        new Employee("F",61,6100)
    );

    Map<Integer,List<IntSummaryStatistics>> stats = employees.stream()
            .collect(Collectors.groupingBy(
                employee -> to_age_groups.apply(employee.getAge()),
                Collectors.teeing(
                    Collectors.summarizingInt(Employee::getAge),
                    Collectors.summarizingInt(Employee::getSalary),
                    (stat1, stat2) -> List.of(stat1,stat2))));

    stats.entrySet().stream().forEach(entry -> {
        System.out.println("Age-group: <"+entry.getKey()+"\n"+entry.getValue());
    });
}

public static class Employee{

    private final String name;
    private final int age;
    private final int salary;

    public Employee(String name, int age, int salary){
        
        this.name = name;
        this.age = age;
        this.salary = salary;
    }
    public String getName(){return this.name;}
    public int getAge(){return this.age;}
    public int getSalary(){return this.salary;}
}

}

输出:

Age-group: <2147483647
[IntSummaryStatistics{count=1, sum=61, min=61, average=61,000000, max=61}, IntSummaryStatistics{count=1, sum=6100, min=6100, average=6100,000000, max=6100}]
Age-group: <50
[IntSummaryStatistics{count=2, sum=81, min=40, average=40,500000, max=41}, IntSummaryStatistics{count=2, sum=8100, min=4000, average=4050,000000, max=4100}]
Age-group: <40
[IntSummaryStatistics{count=1, sum=32, min=32, average=32,000000, max=32}, IntSummaryStatistics{count=1, sum=3000, min=3000, average=3000,000000, max=3000}]
Age-group: <30
[IntSummaryStatistics{count=2, sum=45, min=21, average=22,500000, max=24}, IntSummaryStatistics{count=2, sum=4400, min=2000, average=2200,000000, max=2400}]

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接