使用Java 8创建复杂的自定义收集器

8

我有一串对象,我想以以下方式收集它们。

假设我们正在处理论坛帖子:

class Post {
    private Date time;
    private Data data
}

我希望创建一个列表,按时间段分组文章。如果在X分钟内没有文章,则创建新的分组

class PostsGroup{
    List<Post> posts = new ArrayList<> ();
}

我希望获得一个包含按时间间隔分组的帖子List<PostGroups>

例如:10分钟的时间间隔。

帖子:

[{time:x, data:{}}, {time:x + 3, data:{}} , {time:x + 12, data:{}, {time:x + 45, data:{}}}]

我想获取一个“文章组”列表:

[
 {posts : [{time:x, data:{}}, {time:x + 3, data:{}}, {time:x + 12, data:{}]]},
{posts : [{time:x + 45, data:{}]}
]
  • 请注意第一个持续到X + 22。然后在X + 45收到了一个新的帖子

这是否可能?


1
您可以通过调用Day.getTime()将日期时间转换为long类型,它会返回当前时间的毫秒数。您还可以将10分钟转换为毫秒,即10601000。现在,在Day.getTime()上放置一些比较逻辑,以查找下一个10分钟的条目。 - Harbeer Kadian
3个回答

3
使用我的StreamEx库中的groupRuns方法,可以轻松解决这个问题。该方法链接为:groupRuns 。请参考StreamEx库。
long MAX_INTERVAL = TimeUnit.MINUTES.toMillis(10);
StreamEx.of(posts)
        .groupRuns((p1, p2) -> p2.time.getTime() - p1.time.getTime() <= MAX_INTERVAL)
        .map(PostsGroup::new)
        .toList();

我假设你有一个构造函数

class PostsGroup {
    private List<Post> posts;

    public PostsGroup(List<Post> posts) {
        this.posts = posts;
    }
}
StreamEx.groupRuns 方法接受一个 BiPredicate,应用于两个相邻的输入元素,并返回 true 如果它们必须分组在一起。此方法创建流列表,每个列表表示组。此方法是惰性的,并且与并行流很好地配合使用。

0

你需要在流条目之间保留状态,并编写一个分组分类器。像这样的东西是一个很好的开始。

class Post {

    private final long time;
    private final String data;

    public Post(long time, String data) {
        this.time = time;
        this.data = data;
    }

    @Override
    public String toString() {
        return "Post{" + "time=" + time + ", data=" + data + '}';
    }

}

public void test() {
    System.out.println("Hello");
    long t = 0;
    List<Post> posts = Arrays.asList(
            new Post(t, "One"),
            new Post(t + 1000, "Two"),
            new Post(t + 10000, "Three")
    );
    // Group every 5 seconds.
    Map<Long, List<Post>> gouped = posts
            .stream()
            .collect(Collectors.groupingBy(new ClassifyByTimeBetween(5000)));
    gouped.entrySet().stream().forEach((e) -> {
        System.out.println(e.getKey() + " -> " + e.getValue());
    });

}

class ClassifyByTimeBetween implements Function<Post, Long> {

    final long delay;
    long currentGroupBy = -1;
    long lastDateSeen = -1;

    public ClassifyByTimeBetween(long delay) {
        this.delay = delay;
    }

    @Override
    public Long apply(Post p) {
        if (lastDateSeen >= 0) {
            if (p.time > lastDateSeen + delay) {
                // Grab this one.
                currentGroupBy = p.time;
            }
        } else {
            // First time - start there.
            currentGroupBy = p.time;
        }
        lastDateSeen = p.time;
        return currentGroupBy;
    }

}

0

由于没有人提供符合原始问题陈述所需的自定义收集器解决方案,因此这里提供了一个基于提供的时间间隔对Post对象进行分组的收集器实现

问题中提到的Date类已经过时,不建议在新项目中使用。因此,将使用LocalDateTime代替。

帖子和帖子组

为了测试目的,我使用了作为Java 16 记录实现的Post(如果您将其替换为类,则整体解决方案将完全符合Java 8):

public record Post(LocalDateTime dateTime) {}

此外,我增强了PostGroup对象。我的想法是它应该能够决定是否将提供的Post添加到帖子列表中,或者根据信息专家原则拒绝它(简而言之:所有与数据的操作都应该发生在该数据所属的类内部)。
为了实现这个功能,添加了两个额外的字段:interval 类型为 Duration,来自 java.time 包,用于表示一个 最早的帖子最新的帖子 之间的最大 时间间隔,以及类型为 LocalDateTimeintervalBound 字段。该字段在第一个 帖子 添加后进行初始化,并将在方法 isWithinInterval() 内部使用,以检查所提供的 帖子 是否适合于 时间间隔
public class PostsGroup {
    private Duration interval;
    private LocalDateTime intervalBound;
    private List<Post> posts = new ArrayList<>();
    
    public PostsGroup(Duration interval) {
        this.interval = interval;
    }
    
    public boolean tryAdd(Post post) {
        if (posts.isEmpty()) {
            intervalBound = post.dateTime().plus(interval);
            return posts.add(post);
        } else if (isWithinInterval(post)) {
            return posts.add(post);
        }
        return false;
    }
    
    public boolean isWithinInterval(Post post) {
        return post.dateTime().isBefore(intervalBound);
    }
    
    @Override
    public String toString() {
        return "PostsGroup{" + posts + '}';
    }
}

我有两个假设:

  • 来源中所有帖子都按时间排序(如果不是,请在收集结果之前介绍sorted()操作);
  • 需要将帖子收集到最少的组中,因此不可能将此任务拆分并并行执行。

构建自定义收集器

我们可以通过使用静态方法Collector.of()的某个版本或定义实现Collector接口的类来创建一个自定义收集器

创建自定义收集器时必须提供这些参数

  • 供应商 Supplier<A> 的目的是提供一个可变容器来存储流中的元素。在这种情况下,ArrayDeque(作为Deque接口的实现)将成为一个方便访问最近添加的元素(即最新的PostGroup)的容器

  • 累加器 BiConsumer<A,T> 定义了如何将元素添加到供应商提供的容器中。对于此任务,我们需要提供逻辑,以允许确定来自流的下一个元素(即下一个Post)是否应该进入在Deque中的最后一个PostGroup,或者需要为其分配新的PostGroup

  • 组合器 BinaryOperator<A> combiner() 建立了一条规则,用于合并在并行执行流时获得的两个容器。由于该操作被视为不可并行化,组合器被实现为在并行执行时抛出AssertionError

  • 完成函数 Function<A,R> 的目的是通过转换可变容器来生成最终结果。下面代码中的完成函数将包含结果的deque转换为一个不可变列表

注意:Java 16方法toList()finisher函数中使用,对于Java 8,可以用collect(Collectors.toUnmodifiableList())collect(Collectors.toList())替换。

  • 特征允许提供额外的信息,例如Collector.Characteristics.UNORDERED,在这种情况下表示在并行执行时产生的减少部分结果的顺序不重要。在这种情况下,collector不需要任何特征。

下面的方法负责根据提供的interval生成collector

public static Collector<Post, ?, List<PostsGroup>> groupPostsByInterval(Duration interval) {
    
    return Collector.of(
        ArrayDeque::new,
        (Deque<PostsGroup> deque, Post post) -> {
            if (deque.isEmpty() || !deque.getLast().tryAdd(post)) { // if no groups have been created yet or if adding the post into the most recent group fails
                PostsGroup postsGroup = new PostsGroup(interval);
                postsGroup.tryAdd(post);
                deque.addLast(postsGroup);
            }
        },
        (Deque<PostsGroup> left, Deque<PostsGroup> right) -> { throw new AssertionError("should not be used in parallel"); },
        (Deque<PostsGroup> deque) -> deque.stream().collect(Collectors.collectingAndThen(Collectors.toUnmodifiableList())));
}

main() - 演示

public static void main(String[] args) {
    List<Post> posts =
        List.of(new Post(LocalDateTime.of(2022,4,28,15,0)),
                new Post(LocalDateTime.of(2022,4,28,15,3)),
                new Post(LocalDateTime.of(2022,4,28,15,5)),
                new Post(LocalDateTime.of(2022,4,28,15,8)),
                new Post(LocalDateTime.of(2022,4,28,15,12)),
                new Post(LocalDateTime.of(2022,4,28,15,15)),
                new Post(LocalDateTime.of(2022,4,28,15,18)),
                new Post(LocalDateTime.of(2022,4,28,15,27)),
                new Post(LocalDateTime.of(2022,4,28,15,48)),
                new Post(LocalDateTime.of(2022,4,28,15,54)));
    
    Duration interval = Duration.ofMinutes(10);

    List<PostsGroup> postsGroups = posts.stream()
        .collect(groupPostsByInterval(interval));
    
    postsGroups.forEach(System.out::println);
}

输出:

PostsGroup{[Post[dateTime=2022-04-28T15:00], Post[dateTime=2022-04-28T15:03], Post[dateTime=2022-04-28T15:05], Post[dateTime=2022-04-28T15:08]]}
PostsGroup{[Post[dateTime=2022-04-28T15:12], Post[dateTime=2022-04-28T15:15], Post[dateTime=2022-04-28T15:18]]}
PostsGroup{[Post[dateTime=2022-04-28T15:27]]}
PostsGroup{[Post[dateTime=2022-04-28T15:48], Post[dateTime=2022-04-28T15:54]]}

您也可以尝试一下这个在线演示


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接