Spark独立模式:如何压缩写入到HDFS的Spark输出

22

与我其他的问题有关,但是不同:

someMap.saveAsTextFile("hdfs://HOST:PORT/out")

如果我将RDD保存到HDFS中,如何告诉Spark使用gzip压缩输出?在Hadoop中,可以进行以下设置

mapred.output.compress = true

并选择压缩算法

mapred.output.compression.codec = <<classname of compression codec>>

我该如何在Spark中实现这个?这样做是否有效?

编辑:使用spark-0.7.2版本。

4个回答

21

saveAsTextFile 方法还有一个可选参数,可以用来指定编解码器的类。因此,在你的示例中,如果要使用 gzip,应该像这样:

saveAsTextFile(outputPath, classOf[gzip.GzipCodec])
someMap.saveAsTextFile("hdfs://HOST:PORT/out", classOf[GzipCodec])

更新

因为你正在使用0.7.2,你可能能够通过在启动时设置的配置选项来移植压缩代码。我不确定这是否会完全起作用,但你需要从以下内容进行更改:

conf.setCompressMapOutput(true)
conf.set("mapred.output.compress", "true")
conf.setMapOutputCompressorClass(c)
conf.set("mapred.output.compression.codec", c.getCanonicalName)
conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString)

变成这样:

System.setProperty("spark.hadoop.mapred.output.compress", "true")
System.setProperty("spark.hadoop.mapred.output.compression.codec", "true")
System.setProperty("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec")
System.setProperty("spark.hadoop.mapred.output.compression.type", "BLOCK")
如果您成功了,将您的配置发布出来可能对其他人也有所帮助。

1
我看到它在最新的spark-0.8.0中。因为这是一个相当重要的功能,所以必须拉取它。 - ptikobj
啊,那就说得通了。我一直在使用主分支,而不是0.7.2版本。 - Noah
我已经测试了你的第二个片段(System.setProperty(...) [...]),并且它在0.7.2上立即生效。谢谢 :) - ptikobj
1
@noah 你设置了 spark.hadoop.mapred.output.compression.codec 两次,除非我漏掉了什么,否则这是多余的。 - sw1nn
是否可以在spark-defaults.xml中以类似的方式设置这些参数,以便每个作业都可以使用它?我尝试将设置复制到spark-defaults.xml中,但好像没有被采用。 - nikk
显示剩余3条评论

2

将gzipped文件保存到HDFS或Amazon S3目录系统的另一种方法是使用saveAsHadoopFile方法。

someMap是RDD[(K,V)],如果您把someMap当作RDD[V],那么您可以调用someMap.map(line=>(line, "")来使用saveAsHadoopFile方法。

import org.apache.hadoop.io.compress.GzipCodec

someMap.saveAsHadoopFile(output_folder_path, classOf[String], classOf[String], classOf[MultipleTextOutputFormat[String, String]], classOf[GzipCodec])

是否可以在spark-defaults.xml中以类似的方式设置这些参数,以便每个作业都可以使用它?我尝试将设置复制到spark-defaults.xml中,但好像没有被采用。 - nikk

1

对于较新的Spark版本,请在您的spark-defaults.xml文件中执行以下操作。(mapred已弃用)。

<property>
    <name>mapreduce.output.fileoutputformat.compress</name>
    <value>true</value>
</property>
<property>
    <name>mapreduce.output.fileoutputformat.compress.codec</name>
    <value>GzipCodec</value>
</property>
<property>
    <name>mapreduce.output.fileoutputformat.compress.type</name>
    <value>BLOCK</value>
</property>

0
这是一种最简单/最快的压缩方法,适用于几乎所有版本的Spark。
import org.apache.hadoop.io.SequenceFile.CompressionType

 /**
   * Set compression configurations to Hadoop `Configuration`.
   * `codec` should be a full class path
   */
  def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
    if (codec != null) {
      conf.set("mapreduce.output.fileoutputformat.compress", "true")
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString) // "BLOCK" as string
      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
      conf.set("mapreduce.map.output.compress", "true")
      conf.set("mapreduce.map.output.compress.codec", codec)
    } else {
      // This infers the option `compression` is set to `uncompressed` or `none`.
      conf.set("mapreduce.output.fileoutputformat.compress", "false")
      conf.set("mapreduce.map.output.compress", "false")
    }
  }

其中confspark.sparkContext.hadoopConfiguration

codec参数的字符串选项在上述方法中为

 1.none 
 2.uncompressed 
 3.bzip2 
 4.deflate 
 5.gzip 
 6.lz4 
 7.snappy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接