我有一个使用bzip2压缩的维基百科转储文件(从http://dumps.wikimedia.org/enwiki/下载),但我不想解压它:我想在解压时进行处理。 我知道可以在普通Java中完成此操作(例如参见Java - Read BZ2 file and uncompress/parse on the fly),但我想知道如何在Apache Flink中实现?我可能需要类似于https://github.com/whym/wikihadoop但针对Flink而非Hadoop的东西。
在Apache Flink中,可以读取以下格式的压缩文件:
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec
从包名可以看出,Flink使用Hadoop的InputFormats来实现这一点。
以下是使用Flink Scala API读取gz文件的示例:
(至少需要Flink 0.8.1)
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val job = new JobConf()
val hadoopInput = new TextInputFormat()
FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job)
lines.print
env.execute("Read gz files")
}
Apache Flink目前只内置支持.deflate文件的压缩。添加更多压缩编解码器很容易,但尚未完成。
在Flink中使用HadoopInputFormats不会造成任何性能损失。Flink已经内置了对Hadoop的Writable
类型的序列化支持。