如何在PySpark中读取Avro文件

17

我正在使用 Python 编写 Spark 作业。不过,我需要读取大量的 avro 文件。

这个 是在 Spark 示例文件夹中找到的最接近的解决方案。不过,你需要使用 spark-submit 提交这个 Python 脚本。在 spark-submit 的命令行中,你可以指定 driver-class,在那种情况下,所有的 avrokey、avrovalue 类将被定位。

avro_rdd = sc.newAPIHadoopFile(
        path,
        "org.apache.avro.mapreduce.AvroKeyInputFormat",
        "org.apache.avro.mapred.AvroKey",
        "org.apache.hadoop.io.NullWritable",
        keyConverter="org.apache.spark.examples.pythonconverters.AvroWrapperToJavaConverter",
        conf=conf)
在我的情况下,我需要在Python脚本中运行所有内容,我尝试创建一个环境变量来包含jar文件,希望通过这种方式Python可以将其添加到路径中,但是很明显并没有成功,它给我报了一个意料之外的类错误。
os.environ['SPARK_SUBMIT_CLASSPATH'] = "/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/spark/examples/lib/spark-examples_2.10-1.0.0-cdh5.1.0.jar"

有人可以帮我在一个Python脚本中如何读取Avro文件吗?

4个回答

16

Spark >= 2.4.0

您可以使用内置的Avro支持。该API与spark-avro包向后兼容,并添加了一些功能(最显著的是from_avro/to_avro函数)。

请注意,该模块未与标准Spark二进制文件捆绑在一起,必须使用spark.jars.packages或等效机制进行包含。

另请参见Pyspark 2.4.0,在Python中从Kafka读取流数据并读取Avro

Spark < 2.4.0

您可以使用spark-avro库。首先让我们创建一个示例数据集:

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter

schema_string ='''{"namespace": "example.avro",
 "type": "record",
 "name": "KeyValue",
 "fields": [
     {"name": "key", "type": "string"},
     {"name": "value",  "type": ["int", "null"]}
 ]
}'''

schema = avro.schema.parse(schema_string)

with open("kv.avro", "w") as f, DataFileWriter(f, DatumWriter(), schema) as wrt:
    wrt.append({"key": "foo", "value": -1})
    wrt.append({"key": "bar", "value": 1})

使用 spark-csv 读取它就像这样简单:

df = sqlContext.read.format("com.databricks.spark.avro").load("kv.avro")
df.show()

## +---+-----+
## |key|value|
## +---+-----+
## |foo|   -1|
## |bar|    1|
## +---+-----+ 

2
请提供一个使用 pysparkfrom_avro 的示例,可以吗? - Soumendra
3
如果我理解正确的话,在 PySpark 2.4.x 中似乎还没有内置的from_avroto_avro函数。根据此处的“@since”标签,看起来这些函数将在 PySpark 3.0 中添加。链接 - mattjw

6

前一种解决方案需要安装第三方Java依赖项,这并不是大多数Python开发人员所喜欢的。但是,如果您只想使用给定模式解析Avro文件,则实际上不需要外部库。您可以直接读取二进制文件,并使用您喜欢的Python Avro包进行解析。

例如,以下是使用fastavro加载Avro文件的方法:

from io import BytesIO
import fastavro

schema = {
    ...
}

rdd = sc.binaryFiles("/path/to/dataset/*.avro")\
    .flatMap(lambda args: fastavro.reader(BytesIO(args[1]), reader_schema=schema))

print(rdd.collect())

2

我们可以将Avro文件的数据读入Spark DataFrame中。请参考link和下面的代码,使用PySpark读取Avro文件。

df = spark.read.format("avro").load("<avro_file_location>")

2

对于Spark < 2.4.0,PySpark可以通过使用JAR“com.databricks.spark.avro”和Python的“subprocess”模块来读取avro文件及其相应的模式(.avsc)创建数据帧,而无需任何外部Python模块。

以下是解决方案:

avsc_location = hdfs://user/test/test.avsc
avro_location = hdfs://user/test/test.avro

#use subprocess module
import subproccess as SP

load_avsc_file = SP.Popen(["hdfs", "dfs", "-cat", avsc_location], stdout=SP.PIPE, stderr=SP.PIPE)
(avsc_file_output, avsc_file_error) = load_avsc_file.communicate()

avro_df = spark.read.format("com.databricks.spark.avro").option("avroSchema", avsc_file_output).load(avro_location)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接