Pyspark如何加载压缩的snappy文件

6

我使用python-snappy压缩了一个文件并将其放入我的hdfs存储中。现在我想这样读取它,但是我得到了以下追溯信息。我找不到如何读取文件以便我可以处理它的示例。我可以很好地读取文本文件(未压缩)版本。我应该使用sc.sequenceFile吗?谢谢!

I first compressed the file and pushed it to hdfs

python-snappy -m snappy -c gene_regions.vcf gene_regions.vcf.snappy
hdfs dfs -put gene_regions.vcf.snappy /

I then added the following to spark-env.sh
export SPARK_EXECUTOR_MEMORY=16G                                                
export HADOOP_HOME=/usr/local/hadoop                                            

export JAVA_LIBRARY_PATH=$JAVA_LIBRARY_PATH:$HADOOP_HOME/lib/native             
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$HADOOP_HOME/lib/native                 
export SPARK_LIBRARY_PATH=$SPARK_LIBRARY_PATH:$HADOOP_HOME/lib/native           
export SPARK_CLASSPATH=$SPARK_CLASSPATH:$HADOOP_HOME/lib/lib/snappy-java-1.1.1.8-SNAPSHOT.jar

I then launch my spark master and slave and finally my ipython notebook where I am executing the code below.

a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf.snappy")
a_file.first()

ValueError Traceback (most recent call last) in () ----> 1 a_file.first()

/home/user/Software/spark-1.3.0-bin-hadoop2.4/python/pyspark/rdd.pyc in first(self) 1244 if rs: 1245 return rs[0] -> 1246 raise ValueError("RDD is empty") 1247 1248 def isEmpty(self):

数值错误:RDD为空

Working code (uncompressed) text file
a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf")
a_file.first()

output: u'##fileformat=VCFv4.1'


请更完善您的问题。同时,请提供更多相关代码(例如 - 如何保存文件)。 - Mark Segal
5个回答

3
问题在于python-snappy与Hadoop的snappy编解码不兼容,而Spark在读取数据时会使用Hadoop的snappy编解码,因此当遇到".snappy"后缀时会出现问题。它们基于相同的底层算法,但互不兼容,你不能用其中一个进行压缩并用另一个进行解压缩。
你可以通过在Spark或Hadoop中使用snappy将数据写入,或者让Spark将数据读取为二进制块,然后手动调用python-snappy进行解压缩(请参见binaryFiles http://spark.apache.org/docs/latest/api/python/pyspark.html)。二进制块方法有点脆弱,因为它需要为每个输入文件将整个文件放入内存中。但如果你的数据足够小,那么这种方法也可行。

谢谢Patrick,这让我很有感触。我读了更多关于Hadoop的snappy编解码器,它似乎是用于在将所有内容缩小之前从mapper生成的中间文件。是否有命令行实用程序可以使用Hadoop snappy编解码器压缩我的文本文件,然后将它们推送到hdfs存储?我基本上有大约10,000个5000万行文本文件。看起来这可能会起作用...https://github.com/kubo/snzip - Levi Pierce
这已经过时了,python-snappy支持hadoop-snappy,尽管不是很清楚。 - Jeroen

2

现在接受的答案已经过时了。你可以使用python-snappy来压缩hadoop-snappy,但是文档几乎不存在。 例如:

import snappy
with open('test.json.snappy', 'wb') as out_file:
    data=json.dumps({'test':'somevalue','test2':'somevalue2'}).encode('utf-8')
    compressor = snappy.hadoop_snappy.StreamCompressor()
    compressed = compressor.compress(data)
    out_file.write(compressed)

您也可以使用命令行,其中选项更加直接,使用 -t hadoop_snappy 标志。示例:
echo "{'test':'somevalue','test2':'somevalue2'}" | python -m snappy -t hadoop_snappy -c - test.json.snappy

1

我不确定我的文件具体使用了哪种snappy编解码器,但spark.read.text对我而言没有任何问题。


0

好的,我找到了一个解决方案!

按照以下步骤构建... https://github.com/liancheng/snappy-utils 在Ubuntu 14.10上,我需要安装gcc-4.4才能成功构建,这是我在这里遇到错误时的评论 https://code.google.com/p/hadoop-snappy/issues/detail?id=9

现在我可以在命令行中使用snappy压缩文本文件。

snappy -c gene_regions.vcf -o gene_regions.vcf.snappy

将其转储到HDFS中

hdfs dfs -put gene_regions.vcf.snappy

然后在pyspark中加载它!

a_file = sc.textFile("hdfs://master:54310/gene_regions.vcf.snappy")
a_file.first()

看这里!vcf 的头文件...

u'##fileformat=VCFv4.1'

0

spark.read.format("parquet").load(path)


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接