这里的问题与在Job中使用的avro.Schema类的不可序列化相关。当您尝试从map函数中的代码引用模式对象时,会抛出异常。
例如,如果您尝试执行以下操作,则会收到“任务不可序列化”异常:
val schema = new Schema.Parser().parse(new File(jsonSchema))
...
rdd.map(t => {
// reference to the schema object declared outside
val record = new GenericData.Record(schema)
})
你可以通过在函数块内创建模式的新实例来使所有内容正常工作:
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.map(t => {
// create a new Schema object
val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
val record = new GenericData.Record(innserSchema)
...
})
为了避免在处理每条记录时都解析Avro架构,更好的解决方案是在分区级别解析架构。以下方法也可行:
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.mapPartitions(tuples => {
// create a new Schema object
val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
tuples.map(t => {
val record = new GenericData.Record(innserSchema)
...
// this closure will be bundled together with the outer one
// (no serialization issues)
})
})
上述代码只要提供可移植的jsonSchema文件引用即可正常工作,由于map函数将被多个远程执行程序运行,因此引用应该指向HDFS中的文件,或者与JAR应用程序打包在一起(在后一种情况下,您将使用类加载器功能以获取其内容)。
对于那些尝试在Spark中使用Avro的用户,请注意仍然存在一些未解决的编译问题,并且您必须在Maven POM中使用以下导入:
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro-mapred</artifactId>
<version>1.7.7</version>
<classifier>hadoop2</classifier>
<dependency>
请注意
"hadoop2"
分类器。您可以在
https://issues.apache.org/jira/browse/SPARK-3039 上跟踪此问题。