使用Java 8处理和拆分大文件

4
我刚开始使用NIO包处理文件,对Java 8不太熟悉。我需要帮助处理大文件,每个文件的行数在100,000到1,000,000之间,将每一行转换为特定格式并将格式化后的行写入新文件。生成的新文件每个最多只能包含100,000行。所以:
  • 如果我有一个500,000行的文件需要处理,则必须将这些行转换并分布到5个新文件中。
  • 如果我有一个745,000行的文件需要处理,则必须将这些行转换并打印到8个新文件中。

我很难想出一个能有效利用Java 8新功能的方法。我已经确定了基于大文件行数生成的新文件数量,并创建了这些新的空文件:

Path largFile = Path.get("path\to\file");
long recordCount = Files.lines(file).count();
int maxRecordOfNewFiles = 100000;
int numberOfNewFiles =  1;
if (recordCount > maxRecordOfNewFiles) {
    numberOfNewFiles = Math.toIntExact(recordCount / maxRecordOfNewFiles);
if (Math.toIntExact(recordCount % maxRecordOfNewFiles) > 0) {
    numberOfNewFiles ++;
}
}

IntStream.rangeClosed(1, numberOfNewFiles).forEach((i) 
    -> {
        try {
            Path newFile = Paths.get("path\to\newFiles\newFile1.txt");
                        Files.createFile(cdpFile);
         } catch (IOException iOex) {
         }
        });

但是,当我通过Files.lines(largeFile).forEach(())来遍历largeFile的行时,我不知道如何格式化前10万行,并确定新文件的第一行并将其打印在该文件上,然后将第二批10万行写入第二个新文件,以此类推。

任何帮助都将不胜感激。 :)


你可以逐行阅读和格式化文本,直到达到100k,然后保存新的“part”文件。 - kazbeel
@WoozyCoder,你好! :) 这基本上就是我在帖子的最后部分所说的。哈哈。我的问题是如何使用Java 8来实现这一点?代码片段会非常有帮助。 :) - BubeyReb
你可以使用 List<String> 来存储格式化后的数据(虽然我不太清楚你的意思)。当达到100k或文件结尾时,创建part文件,保存包含在 ArrayList<String> 中的数据,并继续处理下一批行。这里的问题是,你已经尝试过什么了? - kazbeel
@WoozyCoder 尝试了你的想法,对于较小的文件可以正常工作。不确定在处理大文件时会如何表现。我还将尝试 Tunaki 和 Jatin 的答案,以测试哪种想法更快速和内存效率更高,因为我将在生产规模环境中使用它。 :) - BubeyReb
2个回答

3
当你开始考虑批处理时,我认为你应该考虑使用一个专门的框架。你可能想处理重新启动、调度... Spring Batch 在这方面非常出色,并且已经提供了你所需要的:MultiResourceItemWriter,可将数据按最大行数写入多个文件;FlatFileItemReader 从文件读取数据。
在这种情况下,你需要循环遍历输入文件的每一行,并将每行进行转换后写入多个输出文件。
一种方法是创建一个输入文件行的流,对每行进行映射并将其发送到自定义的写入器。这个自定义的写入器将实现在达到每个文件的最大行数时切换写入器的逻辑。
在以下代码中,MyWriter 打开一个向文件的 BufferedWriter。当达到 maxLines(其倍数)时,此写入器将关闭并打开另一个写入器,增加 currentFile。这样,对于读取器来说,我们的写入操作就是透明的。
public static void main(String[] args) throws IOException {
    try (
        MyWriter writer = new MyWriter(10);
        Stream<String> lines = Files.lines(Paths.get("path/to/file"));
    ) {
        lines.map(l -> /* do transformation here */ l).forEach(writer::write);
    }
}

private static class MyWriter implements AutoCloseable {

    private long count = 0, currentFile = 1, maxLines = 0;
    private BufferedWriter bw = null;

    public MyWriter(long maxLines) {
        this.maxLines = maxLines;
    }

    public void write(String line) {
        try {
            if (count % maxLines == 0) {
                close();
                bw = Files.newBufferedWriter(Paths.get("path/to/newFiles/newFile" + currentFile++ + ".txt"));
            }
            bw.write(line);
            bw.newLine();
            count++;
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    @Override
    public void close() throws IOException {
        if (bw != null) bw.close();
    }
}

谢谢,@Tunaki! :) 我会尝试你的答案,并更新情况。 :) 我有些犹豫是否使用另一个框架,因为我想尝试一下Java 8单独解决问题的效果。您认为在内存和速度方面,使用Spring Batch比使用Java的本机库更有效吗? :) - BubeyReb
@BubeyReb,这并不是效率的问题,Spring Batch仅提供了批处理过程的预构建工具(您可以在此处阅读有关其信息),例如事务管理、作业处理统计、作业重启、跳过和资源管理...因此,如果您需要这些内容(并且打算编写多个批次),可以记住它;)。对于这个特定的简单示例来说,这绝对太多了。 - Tunaki

2

根据我的理解,一个简单的方法是:

BufferedReader buff = new BufferedReader(new FileReader(new File("H:\\Docs\\log.txt")));
Pair<Integer, BufferedWriter> ans = buff.lines().reduce(new Pair<Integer, BufferedWriter>(0, null), (count, line) -> {
    try {
        BufferedWriter w;
        if (count.getKey() % 1000 == 0) {
            if (count.getValue() != null) count.getValue().close();
            w = new BufferedWriter(new FileWriter(new File("f" + count.getKey() + ".txt")));
        } else w = count.getValue();
        w.write(line + "\n"); //do something
        return new Pair<>(count.getKey() + 1, w);
    } catch (IOException e) {
        throw new UncheckedIOException(e);
    }
}, (x, y) -> {
    throw new RuntimeException("Not supproted");
});
ans.getValue().close();

嗨@Jatin!Java 8让我感到困惑的是它的复杂性,以及对像我这样的一些开发人员来说,它使代码变得非常难以阅读。看了你的答案,有点难以理解你的代码正在尝试实现什么。 <br/> 尝试研究.reduce()函数,但更加困惑了。 :( 也尝试运行你的代码,但BufferedReader不接受FileReader作为有效的构造函数参数。BufferedWriter也是如此。 <br/> ans.getValue().close()的目的也很令人困惑。也许您可以尝试分解您的答案? - BubeyReb
@BubeyReb 这段代码编译和运行都很好。BufferedReader确实可以将Reader作为参数。因此,FileReader是有效的构造函数参数。同样的道理也适用于BufferedWriter,它需要Writer。所以这个想法是:对于每一行,如果计数没有超过1000,就使用可用的writer将其写入。如果超过了,则重置计数,创建一个新的writer并继续进行,并关闭前一个writer。不断重复这个过程。 现在,在最后,关闭剩余的writer。 - Jatin
顺便提一下,您还需要导入javafx.util.Pair。 当然您也可以使用其他的Pair类。 我需要一个元组,这个感觉挺好。 - Jatin

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接