将Java流按谓词拆分为流的流

5

我有数百个大型(6GB)的gzip压缩日志文件,我正在使用GZIPInputStream读取并希望解析。假设每个文件都具有以下格式:

Start of log entry 1
    ...some log details
    ...some log details
    ...some log details
Start of log entry 2
    ...some log details
    ...some log details
    ...some log details
Start of log entry 3
    ...some log details
    ...some log details
    ...some log details

我正在通过 BufferedReader.lines() 按行流式传输gzip文件内容。 流看起来像:

[
    "Start of log entry 1",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
    "Start of log entry 2",
    "    ...some log details",
    "    ...some log details",
    "    ...some log details",
]

每个日志条目的开头都可以通过谓词进行识别:line -> line.startsWith("Start of log entry")。我想根据此谓词将此Stream<String>转换为Stream<Stream<String>>。每个“子流”应在谓词为真时开始,并在谓词为假时收集行,直到下一次谓词为真,这表示该子流的结束和下一个子流的开始。结果应如下所示:
[
    [
        "Start of log entry 1",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 2",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
    [
        "Start of log entry 3",
        "    ...some log details",
        "    ...some log details",
        "    ...some log details",
    ],
]

从那里,我可以将每个子流通过new LogEntry(Stream<String> logLines)映射,以便将相关的日志行聚合到LogEntry对象中。

以下是大致的实现方式:

import java.io.*;
import java.nio.charset.*;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;

import static java.lang.System.out;

class Untitled {
    static final String input = 
        "Start of log entry 1\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 2\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "Start of log entry 3\n" +
        "    ...some log details\n" +
        "    ...some log details\n" +
        "    ...some log details";

    static final Predicate<String> isLogEntryStart = line -> line.startsWith("Start of log entry"); 

    public static void main(String[] args) throws Exception {
        try (ByteArrayInputStream gzipInputStream
        = new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); // mock for fileInputStream based gzipInputStream
             InputStreamReader inputStreamReader = new InputStreamReader( gzipInputStream ); 
             BufferedReader reader = new BufferedReader( inputStreamReader )) {

            reader.lines()
                .splitByPredicate(isLogEntryStart) // <--- What witchcraft should go here?
                .map(LogEntry::new)
                .forEach(out::println);
        }
    }
}

约束条件:我有数百个这样的大文件需要处理,可以并行处理(但每个文件仅有一个顺序流),将它们全部加载到内存中(例如通过将它们存储为List<String> lines)是不可行的。
任何帮助都将不胜感激!

1
听起来像是 StreamEx 的工作。 - shmosel
@shmosel 很有趣,我会研究一下!你有没有想过这个 API 可能叫什么名字?我试了一些关键词,比如“partition”、“slice”、“chunk”和“separated”,但都没有结果。 - Alexander
也许你可以使用 collapse() 方法,并配合一个谓词 (line1, line2) -> line1.startsWith(...) && !line2.startsWith(...) - shmosel
我建议使用Spring Integration来并行处理多个文件。我曾经使用它在不同的核心中并行处理50个大小为4-5 GB的文件。https://stackoverflow.com/questions/31819189/move-file-after-successful-ftp-transfer-using-java-dsl - Harish
尝试(使用BufferedReader bufferedReader = new BufferedReader(inputStreamReader)){ String line; boolean logStart = false; while ((line = bufferedReader.readLine()) != null) { if (line.startsWith("Start of log entry")) { logStart = true; } if (!logStart) { } } } catch (Exception ex) { } - Harish
显示剩余4条评论
2个回答

3

Frederico的回答可能是解决这个问题最好的方式。跟随他最后提到关于自定义Spliterator的想法,我将添加一个适应于类似问题的答案的改编版本,在那里我建议使用自定义迭代器创建分块流。这种方法也适用于其他不是由输入读取器创建的流。

public class StreamSplitter<T>
    implements Iterator<Stream<T>>
{
    private Iterator<T>  incoming;
    private Predicate<T> startOfNewEntry;
    private T            nextLine;

    public static <T> Stream<Stream<T>> streamOf(Stream<T> incoming, Predicate<T> startOfNewEntry)
    {
        Iterable<Stream<T>> iterable = () -> new StreamSplitter<>(incoming, startOfNewEntry);
        return StreamSupport.stream(iterable.spliterator(), false);
    }

    private StreamSplitter(Stream<T> stream, Predicate<T> startOfNewEntry)
    {
        this.incoming = stream.iterator();
        this.startOfNewEntry = startOfNewEntry;
        if (incoming.hasNext())
            nextLine = incoming.next();
    }

    @Override
    public boolean hasNext()
    {
        return nextLine != null;
    }

    @Override
    public Stream<T> next()
    {
        List<T> nextEntrysLines = new ArrayList<>();
        do
        {
            nextEntrysLines.add(nextLine);
        } while (incoming.hasNext()
                 && !startOfNewEntry.test((nextLine = incoming.next())));

        if (!startOfNewEntry.test(nextLine)) // incoming does not have next
            nextLine = null;

        return nextEntrysLines.stream();
    }
}

示例

public static void main(String[] args)
{
    Stream<String> flat = Stream.of("Start of log entry 1",
                                    "    ...some log details",
                                    "    ...some log details",
                                    "Start of log entry 2",
                                    "    ...some log details",
                                    "    ...some log details",
                                    "Start of log entry 3",
                                    "    ...some log details",
                                    "    ...some log details");

    StreamSplitter.streamOf(flat, line -> line.matches("Start of log entry.*"))
                  .forEach(logEntry -> {
                      System.out.println("------------------");
                      logEntry.forEach(System.out::println);
                  });
}

// Output
// ------------------
// Start of log entry 1
//     ...some log details
//     ...some log details
// ------------------
// Start of log entry 2
//     ...some log details
//     ...some log details
// ------------------
// Start of log entry 3
//     ...some log details
//     ...some log details

迭代器总是向前看一行。 只要该行是新条目的开头,它将在流中包装先前的条目并返回其作为next。 工厂方法streamOf将此迭代器转换为流,以便在上面的示例中使用。
我已将拆分条件从正则表达式更改为Predicate,因此您可以借助多个正则表达式、if条件等指定更复杂的条件。
请注意,我仅对上述示例数据进行了测试,因此不知道它如何处理更复杂、错误或空输入。

2
我认为主要问题在于你按行阅读并试图从这些行创建一个LogEntry实例,而不是按块阅读(可能涵盖多行)。
为此,您可以使用{{link1:Scanner.findAll}}(自Java 9以来可用)和适当的正则表达式:
String input =
        "Start of log entry 1\n"        +
        "    ...some log details 1.1\n" +
        "    ...some log details 1.2\n" +
        "    ...some log details 1.3\n" +
        "Start of log entry 2\n"        +
        "    ...some log details 2.1\n" +
        "    ...some log details 2.2\n" +
        "    ...some log details 2.3\n" +
        "Start of log entry 3\n"        +
        "    ...some log details 3.1\n" +
        "    ...some log details 3.2\n" +
        "    ...some log details 3.3";

try (ByteArrayInputStream gzip = 
         new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8));
     InputStreamReader reader = new InputStreamReader(gzip);
     Scanner scanner = new Scanner(reader)) {

    String START = "Start of log entry \\d+";
    Pattern pattern = Pattern.compile(
            START + "(?<=" + START + ").*?(?=" + START + "|$)", 
            Pattern.DOTALL);

    scanner.findAll(pattern)
            .map(MatchResult::group)
            .map(s -> s.split("\\R"))
            .map(LogEntry::new)
            .forEach(System.out::println);

} catch (IOException e) {
    throw new UncheckedIOException(e);
}

所以,这是通过在Scanner实例中懒惰地查找匹配项来工作的。 Scanner.findAll返回一个Stream ,而MatchResult.group()返回匹配的字符串。然后,我们通过换行符(\\ R)拆分此字符串。这将返回一个String [],其中数组的每个元素都是每行。然后,假设LogEntry有一个接受String []参数的构造函数,我们将每个这些数组转换为LogEntry实例。最后,假设LogEntry具有重写的toString()方法,我们正在将每个LogEntry实例打印到输出中。
值得一提的是,当在流上调用forEach时,Scanner开始其工作。
一个重要的地方是我们正在使用的正则表达式来匹配输入中的日志条目。我不是正则表达式领域的专家,所以我几乎确定这里有很大的改进空间。首先,我们使用Pattern.DOTALL,以便.不仅匹配常见字符,还匹配换行符。然后,实际的正则表达式是什么。它的想法是匹配并消耗Start of log entry \\d+,然后对Start of log entry \\d+进行向后查找,然后以非贪婪的方式(这是.*?部分)从输入中消耗字符,最后向前查找以检查是否有另一个Start of log entry \\d+的出现或者是否已经到达了输入的结尾。如果您想深入了解这个主题,请参考这篇关于正则表达式的神奇文章
如果你不使用 Java 9+,我不知道有任何类似的替代方法。但是,你可以创建一个自定义的 Spliterator 包装由 BufferedReader.lines() 返回的流中的 Spliterator,并将所需的解析行为添加到其中。然后,你需要从这个 Spliterator 创建一个新的 Stream。这绝非易事...

1
不错,这看起来像是一个好方法。日志条目的开头各不相同,不太容易匹配,但我会尝试一下并回报! - Alexander
很好的发现,这种方法应该能够使许多输入处理变得更简单!对于这种特定情况来说,它可能比自定义(Spl)迭代器更合适。唯一的优势是能够处理来自读取器以外的其他来源的流。 - Malte Hartwig

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接