Spark SQL如何读取压缩的csv文件?

5
我已经尝试使用API spark.read.csv 读取扩展名为bzgzip 的压缩CSV文件。它可以正常工作。但是在源代码中,我没有找到任何选项参数来声明codec类型。
即使在此链接中,也只有写入方面的codec设置。是否有人能告诉我或者给出源代码路径,展示Spark 2.x版本如何处理压缩的CSV文件呢?

1
请注意,Spark将使用单个任务读取压缩的CSV文件,而不是在读取未压缩的CSV文件时并行化读取多个任务。 - Chris
2个回答

4

所有与文本相关的数据源,包括CSVDataSource,都使用Hadoop文件API来处理文件(这也是Spark Core的RDD中使用的方法)。

您可以在readFile中找到相关行,该行导致HadoopFileLinesReader,其包含以下行:

val fileSplit = new FileSplit(
  new Path(new URI(file.filePath)),
  file.start,
  file.length,
  // TODO: Implement Locality
  Array.empty)

这使用了Hadoop的org.apache.hadoop.fs.Path,处理底层文件的压缩。


经过快速搜索,我找到了处理压缩的Hadoop属性,即mapreduce.output.fileoutputformat.compress

这引导我查找Spark SQL的CompressionCodecs,该配置具有以下压缩配置:

"none" -> null,
"uncompressed" -> null,
"bzip2" -> classOf[BZip2Codec].getName,
"deflate" -> classOf[DeflateCodec].getName,
"gzip" -> classOf[GzipCodec].getName,
"lz4" -> classOf[Lz4Codec].getName,
"snappy" -> classOf[SnappyCodec].getName)

在下面的代码中,您可以找到setCodecConfiguration,它使用了“我们”的选项。
  def setCodecConfiguration(conf: Configuration, codec: String): Unit = {
    if (codec != null) {
      conf.set("mapreduce.output.fileoutputformat.compress", "true")
      conf.set("mapreduce.output.fileoutputformat.compress.type", CompressionType.BLOCK.toString)
      conf.set("mapreduce.output.fileoutputformat.compress.codec", codec)
      conf.set("mapreduce.map.output.compress", "true")
      conf.set("mapreduce.map.output.compress.codec", codec)
    } else {
      // This infers the option `compression` is set to `uncompressed` or `none`.
      conf.set("mapreduce.output.fileoutputformat.compress", "false")
      conf.set("mapreduce.map.output.compress", "false")
    }
  }

另一种方法getCodecClassName用于解析JSONCSVtext格式的compression选项。

谢谢,伙计。我查看了Path包文件,但仍然有点困惑。如果您能提供更多细节,比如Path包的哪个部分处理压缩,那就太好了。再次感谢。 - G_cy
在Spark SQL的代码中添加了一些额外的链接,涉及到压缩。由于我对Hadoop的源代码一无所知,所以我将把探索它留给你作为家庭练习。 - Jacek Laskowski
2
非常感谢您的耐心和友善。我仔细阅读了getCodecClassName代码,并沿着链路进行了追踪。我发现这段代码只在写入端被调用,而在读取端没有找到使用情况。我认为这项工作可能是由文件系统完成的,但并没有找到证据支持这一点。 - G_cy
1
你所引用的所有部分都涉及到写文件。因此,所有提到的选项都在它们的名称中带有“输出”。问题是关于读取文件的。 - mvherweg
2
有趣的信息,但像一些评论者指出的那样,它只涉及写入方面,而不是读取方面。这个答案没有展示Spark在读取时如何选择编解码器的相关内部细节,但至少它演示了如何指定自定义读取编解码器。 - Nick Chammas

3

您不需要为了让spark 2.x版本读取gz压缩的csvtsv文件而做任何特殊处理。以下代码经过spark 2.0.2测试。

val options= Map("sep" -> ",")
val csvRDD = spark.read.options(options).csv("file.csv.gz")

我已经为制表符分隔的gz文件执行类似操作。
val options= Map("sep" -> "\t")
val csvRDD = spark.read.options(options).csv("file.tsv.gz")

您还可以通过指定文件夹来读取多个 .gz 文件以及未压缩的文件的组合。

 val csvRDD = spark.read.options(options).csv("/users/mithun/tsvfilelocation/")

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接