Java Stream GroupBy 和 Reduce

4

我有一个Item类,它包含代码、数量和金额字段以及许多项(具有相同代码)的项目列表。我想按照代码将项目分组并汇总其数量和金额。

使用流的groupingByreduce,我已经完成了其中一半。分组是成功的,但是reduce将所有分组的项目缩减为一个重复的单个项目,即不同代码(groupingBy键)下的相同项目。

在这里,reduce不应该将映射中每个代码的项目列表缩减吗?为什么它会为所有代码返回相同的组合项目?

以下是示例代码:

import java.util.List;
import java.util.Arrays;
import java.util.stream.Collectors;
import java.util.Map;

class HelloWorld {
    public static void main(String[] args) {
        List<Item> itemList = Arrays.asList(
            createItem("CODE1", 1, 12),
            createItem("CODE2", 4, 22),
            createItem("CODE3", 5, 50),
            createItem("CODE4", 2, 11),
            createItem("CODE4", 8, 20),
            createItem("CODE2", 1, 42)
        );
        
        Map<String, Item> aggregatedItems = itemList
            .stream()
            .collect(Collectors.groupingBy(
                Item::getCode,
                Collectors.reducing(new Item(), (aggregatedItem, item) -> {
                    int aggregatedQuantity = aggregatedItem.getQuantity();
                    double aggregatedAmount = aggregatedItem.getAmount();
                    
                    aggregatedItem.setQuantity(aggregatedQuantity + item.getQuantity());
                    aggregatedItem.setAmount(aggregatedAmount + item.getAmount());
                    
                    return aggregatedItem;
                })
            ));
        
        System.out.println("Map total size: " + aggregatedItems.size()); // expected 4
        System.out.println();
        aggregatedItems.forEach((key, value) -> {
            System.out.println("key: " + key);
            System.out.println("value - quantity: " + value.getQuantity() + " - amount: " + value.getAmount());
            System.out.println();
        });
    }
    
    private static Item createItem(String code, int quantity, double amount) {
        Item item = new Item();
        item.setCode(code);
        item.setQuantity(quantity);
        item.setAmount(amount);
        return item;
    }
}

class Item {
    private String code;
    private int quantity;
    private double amount;
    
    public Item() {
        quantity = 0;
        amount = 0.0;
    }
    
    public String getCode() { return code; }
    public int getQuantity() { return quantity; }
    public double getAmount() { return amount; }
    
    public void setCode(String code) { this.code = code; }
    public void setQuantity(int quantity) { this.quantity = quantity; }
    public void setAmount(double amount) { this.amount = amount; }
}

以下是输出结果。

Map total size: 4

key: CODE2
value - quantity: 21 - amount: 157.0

key: CODE1
value - quantity: 21 - amount: 157.0

key: CODE4
value - quantity: 21 - amount: 157.0

key: CODE3
value - quantity: 21 - amount: 157.0
3个回答

3

可变规约与不可变规约

在这种情况下,Collectors.reducing() 不是正确的工具,因为它适用于不可变规约,即执行折叠操作,在每个规约步骤中都会创建一个新的不可变对象。

但是,您需要更改提供作为标识的对象的状态,而不是在每个规约步骤中生成一个新对象。

因此,您会得到错误的结果,因为标识对象只会在每个线程中创建一次。这个Item的单个实例用于累加,并且它的引用最终出现在映射的每个值中。

您可以在Stream API文档中找到更详细的信息,特别是在以下部分:ReductionMutable Reduction

以下是一个简短的引用,解释了Stream.reduce()如何工作(Collectors.reducing()背后的机制相同):

累加器函数接受部分结果和下一个元素,并生成一个新的部分结果

使用可变规约

可以通过在累积映射到的时生成Item的新实例来解决该问题,但更高效的方法是改用可变规约

为此,您可以通过静态方法Collector.of()实现自定义收集器:

Map<String, Item> aggregatedItems = itemList.stream()
    .collect(Collectors.groupingBy(
        Item::getCode,
        Collector.of(
            Item::new,   // mutable container of the collector
            Item::merge, // accumulator - defines how stream data should be accumulated
            Item::merge  // combiner - mergin the two containers while executing stream in parallel
        )
    ));

为了方便起见,您可以使用merge()方法来累积两个项目的属性。这将允许避免在累加器组合器中重复相同的逻辑,并使收集器实现简洁且易于阅读。

public class Item {
    private String code;
    private int quantity;
    private double amount;
    
    // getters, constructor, etc.
    
    public Item merge(Item other) {
        this.quantity += other.quantity;
        this.amount += other.amount;
        return this;
    }
}

1
你不能修改Collectors.reducing的输入参数。 new Item()只会执行一次,所有你的规约操作将共享同一个“聚合实例”。换句话说:map将包含相同的值实例4次(你可以通过使用System.identityHashCode()或通过比较引用相等性来轻松检查:aggregatedItems.get("CODE1") == aggregatedItems.get("CODE2"))。
相反,返回一个新的结果实例:
        final Map<String, Item> aggregatedItems = itemList
            .stream()
            .collect(Collectors.groupingBy(
                Item::getCode,
                Collectors.reducing(new Item(), (item1, item2) -> {
                    final Item reduced = new Item();
                    reduced.setQuantity(item1.getQuantity() + item2.getQuantity());
                    reduced.setAmount(item1.getAmount() + item2.getAmount());
                    return reduced;
                })
            ));

输出:

Map total size: 4

key: CODE2
value - quantity: 5 - amount: 64.0

key: CODE1
value - quantity: 1 - amount: 12.0

key: CODE4
value - quantity: 10 - amount: 31.0

key: CODE3
value - quantity: 5 - amount: 50.0

1

您正在使用 reducing,这意味着它假定您不会改变传入的累加器。 reducing 不会为每个新组创建新的 Item,并期望您在 lambda 中创建新的 Item 并将其返回,如下所示:

// this works as expected
.collect(Collectors.groupingBy(
    Item::getCode,
    Collectors.reducing(new Item(), (item1, item2) -> createItem(
        item1.getCode(),
        item1.getQuantity() + item2.getQuantity(),
        item1.getAmount() + item2.getAmount()
    ))
));

如果您使用的是不可变对象(如数字或字符串),那么这非常适合。

由于您在代码中没有创建新的Item,因此reducing会继续重用同一实例,导致您看到的行为。

如果您想要改变对象,可以使用Collector.of以线程安全的方式进行可变缩减:

.collect(Collectors.groupingBy(
    Item::getCode,
    Collector.of(Item::new, (aggregatedItem, item) -> {
        int aggregatedQuantity = aggregatedItem.getQuantity();
        double aggregatedAmount = aggregatedItem.getAmount();

        aggregatedItem.setQuantity(aggregatedQuantity + item.getQuantity());
        aggregatedItem.setAmount(aggregatedAmount + item.getAmount());
    }, (item1, item2) -> createItem(
        item1.getCode(),
        item1.getQuantity() + item2.getQuantity(),
        item1.getAmount() + item2.getAmount()
    ))
));

请注意,现在您将引用传递给Item的构造函数,即创建新Item的方法,而不仅仅是单个new Item()。此外,您还提供了第三个参数,即组合器,告诉收集器如何从两个现有项创建一个新项,如果此收集器在并发情况下使用,则会使用它。(有关组合器的更多信息,请参见此处Collector.ofCollectors.reducing之间的对比与Stream.reduceStream.collect之间的对比相同。请在此处了解更多信息。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接