Apache Flink的BZip2压缩输入

7
1个回答

6

在Apache Flink中,可以读取以下格式的压缩文件:

org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.Lz4Codec
org.apache.hadoop.io.compress.SnappyCodec

从包名可以看出,Flink使用Hadoop的InputFormats来实现这一点。

以下是使用Flink Scala API读取gz文件的示例:

(至少需要Flink 0.8.1)

def main(args: Array[String]) {

  val env = ExecutionEnvironment.getExecutionEnvironment
  val job = new JobConf()
  val hadoopInput = new TextInputFormat()
  FileInputFormat.addInputPath(job, new Path("/home/robert/Downloads/cawiki-20140407-all-titles.gz"))
  val lines = env.createHadoopInput(hadoopInput, classOf[LongWritable], classOf[Text], job)

  lines.print

  env.execute("Read gz files")
}

Apache Flink目前只内置支持.deflate文件的压缩。添加更多压缩编解码器很容易,但尚未完成。

在Flink中使用HadoopInputFormats不会造成任何性能损失。Flink已经内置了对Hadoop的Writable类型的序列化支持。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接