Java 8一次流式处理,多个映射。

5
假设我有一个巨大的 Web 服务器日志文件,它无法放入内存。我需要将此文件流式传输到 MapReduce 方法并保存到数据库中。我使用 Java 8 Stream API 完成此操作。例如,我在 MapReduce 过程之后得到一个列表,如按客户端、按 IP 和按内容消耗量。但是,我的需求不像给出的示例那样简单。由于我无法共享代码,我只想提供基本示例。
通过 Java 8 Stream API,我想一次性读取文件,在流式传输文件的同时,以并行或顺序方式同时获取 3 个列表。但并行方式更好。有没有办法做到这一点?

4
仅使用自定义收集器才能实现。Java-12提出了一个类似于“BiCollector”的实现建议(名称尚未确定),但绝对不是“TriCollector”。 - Eugene
您可以查看这个问题。在我的答案中,我使用了一个自定义的分割器来包装流。它可以并行工作(如果您不需要消费者同步运行,则可以简化它),但我不知道它是否可以轻松应用于缩减操作。 - Malte Hartwig
你需要注意的是,文件中的行通常很难并行处理。因此,如果我没记错的话,内部缓冲区最初会有1024行,然后增加到2048行,然后是3072行,以此类推...因此,如果您的文件小于1024行,则并行处理比顺序处理要差得多。 - Eugene
实际上,我正在从NoSQL数据库中获取源代码。我只是想保持示例简单。谢谢你的回复@Eugene - Yılmaz
1
有一个关于未来JDK的提案,用于这种情况 - 请参见http://marxsoftware.blogspot.com/2018/08/jdk-12-merging-collectors-naming-challenge.html上的广泛讨论。我认为你会在那里找到一些类似于你想要的代码(如果我没记错的话,它将流式传输到2个收集器中,但我相信你可以进一步扩展)。 - Brian Agnew
2个回答

7

通常情况下,通过自定义Collector,与标准API以外的任何收集都非常容易。在您的情况下,同时收集3个列表(只是一个可以编译的小例子,因为您也不能分享代码):

private static <T> Collector<T, ?, List<List<T>>> to3Lists() {
    class Acc {

        List<T> left = new ArrayList<>();

        List<T> middle = new ArrayList<>();

        List<T> right = new ArrayList<>();

        List<List<T>> list = Arrays.asList(left, middle, right);

        void add(T elem) {
            // obviously do whatever you want here
            left.add(elem);
            middle.add(elem);
            right.add(elem);
        }

        Acc merge(Acc other) {

            left.addAll(other.left);
            middle.addAll(other.middle);
            right.addAll(other.right);

            return this;
        }

        public List<List<T>> finisher() {
            return list;
        }

    }
    return Collector.of(Acc::new, Acc::add, Acc::merge, Acc::finisher);
}

并且可以通过以下方式使用:

Stream.of(1, 2, 3)
      .collect(to3Lists());

显然,这个自定义收集器没有任何有用的功能,只是一个示例,展示了你如何使用它。

谢谢您的回复。我尝试了一下,但是对于我来说很难实现这个功能。您能帮我吗?我应该编写什么样的自定义类来传递给这个方法?Stream.of(1, 2, 3).collect(Collectors.toMap(g-> { return null; }, v -> { return null; }, (t, u) -> { return t; } )); - Yılmaz
@Yılmaz 如果你能帮我,我会帮助你... 你写的那段代码几乎没有意义。 - Eugene
我创建了一个 Github 项目。这是链接 https://github.com/ftylmz1/java-stream-multiple-grouping - Yılmaz

4
我已经根据您的情况修改了这个问题的答案。自定义Spliterator将会把流分成多个按不同属性收集的流。
@SafeVarargs
public static <T> long streamForked(Stream<T> source, Consumer<Stream<T>>... consumers)
{
    return StreamSupport.stream(new ForkingSpliterator<>(source, consumers), false).count();
}

public static class ForkingSpliterator<T>
    extends AbstractSpliterator<T>
{
    private Spliterator<T>         sourceSpliterator;

    private List<BlockingQueue<T>> queues = new ArrayList<>();

    private boolean                sourceDone;

    @SafeVarargs
    private ForkingSpliterator(Stream<T> source, Consumer<Stream<T>>... consumers)
    {
        super(Long.MAX_VALUE, 0);

        sourceSpliterator = source.spliterator();

        for (Consumer<Stream<T>> fork : consumers)
        {
            LinkedBlockingQueue<T> queue = new LinkedBlockingQueue<>();
            queues.add(queue);
            new Thread(() -> fork.accept(StreamSupport.stream(new ForkedConsumer(queue), false))).start();
        }
    }

    @Override
    public boolean tryAdvance(Consumer<? super T> action)
    {
        sourceDone = !sourceSpliterator.tryAdvance(t -> queues.forEach(queue -> queue.offer(t)));
        return !sourceDone;
    }

    private class ForkedConsumer
        extends AbstractSpliterator<T>
    {
        private BlockingQueue<T> queue;

        private ForkedConsumer(BlockingQueue<T> queue)
        {
            super(Long.MAX_VALUE, 0);
            this.queue = queue;
        }

        @Override
        public boolean tryAdvance(Consumer<? super T> action)
        {
            while (queue.peek() == null)
            {
                if (sourceDone)
                {
                    // element is null, and there won't be no more, so "terminate" this sub stream
                    return false;
                }
            }

            // push to consumer pipeline
            action.accept(queue.poll());

            return true;
        }
    }
}

您可以按照以下方式使用它:
streamForked(Stream.of(new Row("content1", "client1", "location1", 1),
                       new Row("content2", "client1", "location1", 2),
                       new Row("content1", "client1", "location2", 3),
                       new Row("content2", "client2", "location2", 4),
                       new Row("content1", "client2", "location2", 5)),
             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,
                                                                           Collectors.groupingBy(Row::getContent,
                                                                                                 Collectors.summingInt(Row::getConsumption))))),
             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getClient,
                                                                           Collectors.groupingBy(Row::getLocation,
                                                                                                 Collectors.summingInt(Row::getConsumption))))),
             rows -> System.out.println(rows.collect(Collectors.groupingBy(Row::getContent,
                                                                           Collectors.groupingBy(Row::getLocation,
                                                                                                 Collectors.summingInt(Row::getConsumption))))));

// Output
// {client2={location2=9}, client1={location1=3, location2=3}}
// {client2={content2=4, content1=5}, client1={content2=2, content1=4}}
// {content2={location1=2, location2=4}, content1={location1=1, location2=8}}

请注意,您可以对流副本做任何您想要的事情。根据您的示例,我使用了一个堆叠的groupingBy收集器来按两个属性分组行,然后对int属性求和。因此结果将是一个Map<String, Map<String, Integer>>。但您也可以将其用于其他场景:
rows -> System.out.println(rows.count())
rows -> rows.forEach(row -> System.out.println(row))
rows -> System.out.println(rows.anyMatch(row -> row.getConsumption() > 3))

我尝试了你的方法。我感觉我们在同一个页面上。你的代码超出了我的Java技能范围。我创建了一个GitHub项目。请你能否检查一下我的项目?https://github.com/ftylmz1/java-stream-multiple-grouping - Yılmaz
@Yılmaz,我现在已经更新了我的代码,因为我明白你想要做什么。你应该可以复制粘贴它(可能需要从类中删除一些“static”修饰符,我把它们都写在一个文件中)。 - Malte Hartwig
非常感谢你的回答,@Malte Hartwig。我已经实现并测试了你的代码。它对我有效。使用方法正如我所期望的那样。再次感谢你 :) - Yılmaz
@MalteHartwig +1,我甚至还没来得及打开那个GitHub项目,但你做得很好! - Eugene
@MalteHartwig,我该如何使streamForked方法与三个mapreduce方法同步?我的意思是,在完成这三个作业后,streamForked方法继续执行下一行。 - Yılmaz
你的意思是这三个分组处理一行,只有当它们全部完成后,才继续下一行吗?如果是这样,请按照我的答案开头的链接查看类似问题的解决方法。在那里,你会发现如何通过让所有流访问同一个队列来使消费流等待彼此。 - Malte Hartwig

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接