我有数百个大型(6GB)的gzip压缩日志文件,我正在使用GZIPInputStream
读取并希望解析。假设每个文件都具有以下格式:
Start of log entry 1
...some log details
...some log details
...some log details
Start of log entry 2
...some log details
...some log details
...some log details
Start of log entry 3
...some log details
...some log details
...some log details
我正在通过 BufferedReader.lines()
按行流式传输gzip文件内容。 流看起来像:
[
"Start of log entry 1",
" ...some log details",
" ...some log details",
" ...some log details",
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
]
每个日志条目的开头都可以通过谓词进行识别:
line -> line.startsWith("Start of log entry")
。我想根据此谓词将此Stream<String>
转换为Stream<Stream<String>>
。每个“子流”应在谓词为真时开始,并在谓词为假时收集行,直到下一次谓词为真,这表示该子流的结束和下一个子流的开始。结果应如下所示:[
[
"Start of log entry 1",
" ...some log details",
" ...some log details",
" ...some log details",
],
[
"Start of log entry 2",
" ...some log details",
" ...some log details",
" ...some log details",
],
[
"Start of log entry 3",
" ...some log details",
" ...some log details",
" ...some log details",
],
]
从那里,我可以将每个子流通过new LogEntry(Stream<String> logLines)
映射,以便将相关的日志行聚合到LogEntry
对象中。
以下是大致的实现方式:
import java.io.*;
import java.nio.charset.*;
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
import static java.lang.System.out;
class Untitled {
static final String input =
"Start of log entry 1\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details\n" +
"Start of log entry 2\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details\n" +
"Start of log entry 3\n" +
" ...some log details\n" +
" ...some log details\n" +
" ...some log details";
static final Predicate<String> isLogEntryStart = line -> line.startsWith("Start of log entry");
public static void main(String[] args) throws Exception {
try (ByteArrayInputStream gzipInputStream
= new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)); // mock for fileInputStream based gzipInputStream
InputStreamReader inputStreamReader = new InputStreamReader( gzipInputStream );
BufferedReader reader = new BufferedReader( inputStreamReader )) {
reader.lines()
.splitByPredicate(isLogEntryStart) // <--- What witchcraft should go here?
.map(LogEntry::new)
.forEach(out::println);
}
}
}
约束条件:我有数百个这样的大文件需要处理,可以并行处理(但每个文件仅有一个顺序流),将它们全部加载到内存中(例如通过将它们存储为
List<String> lines
)是不可行的。任何帮助都将不胜感激!
collapse()
方法,并配合一个谓词(line1, line2) -> line1.startsWith(...) && !line2.startsWith(...)
。 - shmosel