使用Java Streams从文本文件中每次读取X行?

4

我有一个以换行符结尾的“纯文本文件”。由于某些原因,我需要每次读取和解析这个文本文件中的4(X为通用)行。

我想使用Java流来完成此任务,并且我知道可以这样将文件转换为流:

try (Stream<String> stream = Files.lines(Paths.get("file.txt""))) {
    stream.forEach(System.out::println);
} catch (IOException e) {
    e.printStackTrace();
}

但是我该如何使用Java的Stream API将文件分成4个连续行的组?


2
这通常被称为“分块”,虽然我不知道在Java流中有一个简单且标准的方法来实现它。 - yshavit
5个回答

4
有一种方法可以使用标准Java 8 Stream API将文件内容分割并处理为n大小的块。您可以使用Collectors.groupingBy()将文件内容分成块,您可以将它们收集为Collection<List<String>>或者在收集所有行时应用一些处理(例如,您可以将它们连接到单个字符串中)。
请看以下示例:
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class ReadFileWithStream {

    public static void main(String[] args) throws IOException {
        // Path to a file to read
        final Path path = Paths.get(ReadFileWithStream.class.getResource("/input.txt")‌​.toURI());
        final AtomicInteger counter = new AtomicInteger(0);
        // Size of a chunk
        final int size = 4;

        final Collection<List<String>> partitioned = Files.lines(path)
                .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / size))
                .values();

        partitioned.forEach(System.out::println);
    }
}

我的输入文件包含一些数字(每行一个),当我运行以下代码时,我会得到类似于:

[0, 0, 0, 2]
[0, -3, 2, 0]
[1, -3, -8, 0]
[2, -12, -11, -11]
[-8, -1, -8, 0]
[2, -1, 2, -1]
... and so on

Collectors.groupingBy() 还允许我使用不同的下游收集器。默认情况下会使用 Collectors.toList(),因此我的结果被累积到一个 List<String> 中,并且我得到一个 Collection<List<String>> 作为最终结果。

假设我想读取 4 个元素并对每个元素中的所有数字求和。在这种情况下,我将使用 Collectors.summingInt() 作为我的下游函数,返回的结果是 Collection<Integer>

final Collection<Integer> partitioned = Files.lines(path)
        .collect(Collectors.groupingBy(it -> counter.getAndIncrement() / size, Collectors.summingInt(Integer::valueOf)))
        .values();

输出:

2
-1
-10
-32
-17
2
-11
-49
... and so on
Collectors.groupingBy()返回一个按照特定键分组的值的map。因此,最后我们调用Map.values()来获取此map中包含的值的集合。

希望这能帮到你。


这确实可以工作,但有更好的方法 https://dev59.com/s6jja4cB1Zd3GeqP6ico#48216421,而且没有副作用... - Eugene
Paths.get(ReadFileWithStream.class.getClassLoader().getResource("input.txt").getPath()) 应该改为 Paths.get(ReadFileWithStream.class.getResource("/input.txt").toURI()) - Holger

4

这是一个使用 java.util.Scanner 的任务。在Java 9中,您可以简单地使用

try(Scanner s = new Scanner(PATH)) {
    s.findAll("(.*\\R){1,4}")
     .map(mr -> Arrays.asList(mr.group().split("\\R")))
     .forEach(System.out::println);
}

对于Java 8,你可以使用这个答案中所述的findAll的回溯。在为该方法添加一个import static后,你可以像下面这样使用它:

try(Scanner s = new Scanner(PATH)) {
    findAll(s, Pattern.compile("(.*\\R){1,4}"))
        .map(mr -> Arrays.asList(mr.group().split("\\R")))
        .forEach(System.out::println);
}

请注意,匹配操作的结果是一个包含最多四行(最后一行可能少于四行)的单个字符串。如果这对您的后续操作合适,您可以跳过将该字符串分割成单独的行的步骤。
您甚至可以使用MatchResult的属性对块进行更复杂的处理,例如:
try(Scanner s = new Scanner(PATH)) {
    findAll(s, Pattern.compile("(.*)\\R(?:(.*)\\R)?(?:(.*)\\R)?(?:(.*)\\R)?"))
        .flatMap(mr -> IntStream.rangeClosed(1, 4)
                           .mapToObj(ix -> mr.group(ix)==null? null: ix+": "+mr.group(ix)))
        .filter(Objects::nonNull)
        .forEach(System.out::println);
}

这最后一段代码...嗯,它刚刚被添加到我们的代码库中(稍作修改),但是非常棒! - Eugene

3
这里有一个简单的方法,使用Guava的Iterators.partition方法:
try (Stream<String> stream = Files.lines(Paths.get("file.txt""))) {

    Iterator<List<String>> iterator = Iterators.partition(stream.iterator(), 4);

    // iterator.next() returns each chunk as a List<String>

} catch (IOException e) {
    // handle exception properly
}

这仅适用于顺序处理,但如果您正在从磁盘读取文件,则我几乎想象不出并行处理的任何好处...


编辑:如果您希望,可以将其转换为流而不是使用迭代器:

Stream<List<String>> targetStream = StreamSupport.stream(
      Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED),
      false);

2

如果您想坚持使用流式处理,我认为唯一的解决方案是编写自己的定制收集器。虽然它不是为此目的而设计的,但您可以利用它。

private static final class CustomCollector {

    private List<String> list = new ArrayList<>();

    private List<String> acumulateList = new ArrayList<>();

    public void accept(String str) {
        acumulateList.add(str);
        if (acumulateList.size() == 4) { // acumulate 4 strings
            String collect = String.join("", acumulateList);
            // I just joined them in on string, you can do whatever you want
            list.add(collect);
            acumulateList = new ArrayList<>();
        }
    }

    public CustomCollector combine(CustomCollector other) {
        throw new UnsupportedOperationException("Parallel Stream not supported");
    }

    public List<String> finish() {
        if(!acumulateList.isEmpty()) {
            list.add(String.join("", acumulateList));
        }
        return list;
    }

    public static Collector<String, ?, List<String>> collector() {
        return Collector.of(CustomCollector::new, CustomCollector::accept, CustomCollector::combine, CustomCollector::finish);
    }
}

并这样使用它:
stream.collect(CustomCollector.collector());

2
一个稍微好一点的名字可能是PartitioningByCollector或类似的东西...同时似乎StringBuilder(或者自Java 9以来的普通连接)比acumulateList更适合。 - Eugene
2
Streams 的真正美妙之处在于您可以真正计算并行处理... 对于这种情况,您可以 ;) 参见此链接 https://dev59.com/zKLia4cB1Zd3GeqPgkeb#44357446 - Eugene
@FedericoPeraltaSchaffner 我承认我刚刚重新阅读了一遍以再次理解它))) 然后,我们都认为某些东西很聪明,直到看到这个:https://dev59.com/s6jja4cB1Zd3GeqP6ico#48225443 :) - Eugene
@FedericoPeraltaSchaffner 还有一个坦白 - 这个想法不是我自己的。实际上这是 Tagir Valeev 的。在 Java-9 中,他添加了这个 (left, right) -> { if (left.size() < right.size()) { right.addAll(left); return right; } else { left.addAll(right); return left; } } 看起来并不起眼,也不是很聪明,但它确实是一种非常有用的技巧。 - Eugene
@FedericoPeraltaSchaffner目前还在IntelliJ工作,但因为在Streams API和StreamEx方面做出了巨大的贡献,被授予了提交者权限。如果你能理解俄语,他在Stream API方面有一些非常棒的演讲... - Eugene

2

如果您愿意使用RxJava,您可以使用其buffer功能:

Stream<String> stream = Files.lines(Paths.get("file.txt"))

Observable.fromIterable(stream::iterator)
          .buffer(4)                      // Observable<List<String>>
          .map(x -> String.join(", ", x)) // Observable<String>
          .forEach(System.out::println);

buffer 创建一个 Observable,它会将元素收集到一定大小的列表中。在上面的示例中,我添加了另一个转换器通过 map 使列表更易于打印,但您可以根据需要转换 Observable。例如,如果您有一个方法 processChunk,它以 List<String> 作为参数并返回一个 String,您可以执行以下操作:

Observable<String> fileObs =
    Observable.fromIterable(stream::iterator)
              .buffer(4)
              .map(x -> processChunk(x));

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接