Spark: 写入 Avro 文件

17

我在Spark中,有一个来自Avro文件的RDD。现在我想对该RDD进行一些转换,并将其保存为Avro文件:

val job = new Job(new Configuration())
AvroJob.setOutputKeySchema(job, getOutputSchema(inputSchema))

rdd.map(elem => (new SparkAvroKey(doTransformation(elem._1)), elem._2))
   .saveAsNewAPIHadoopFile(outputPath, 
  classOf[AvroKey[GenericRecord]], 
  classOf[org.apache.hadoop.io.NullWritable], 
  classOf[AvroKeyOutputFormat[GenericRecord]], 
  job.getConfiguration)

运行Spark时出现“Schema $ recordSchema不可序列化”的错误。

如果我取消注释.map调用(仅使用rdd.saveAsNewAPIHadoopFile),则调用成功。

我在这里做错了什么?

有什么想法吗?


请提供异常堆栈跟踪信息,同时Spark、Hadoop和Avro的版本号也可能会有所帮助。 - Wildfire
请原谅我的幼稚。我可以问一下这个工作在做什么吗?看起来像是一个MapReduce任务?如果我们使用Spark编写输出,为什么还需要MapReduce任务? - lucky_start_izumi
3个回答

5

这里的问题与在Job中使用的avro.Schema类的不可序列化相关。当您尝试从map函数中的代码引用模式对象时,会抛出异常。

例如,如果您尝试执行以下操作,则会收到“任务不可序列化”异常:

val schema = new Schema.Parser().parse(new File(jsonSchema))
...
rdd.map(t => {
  // reference to the schema object declared outside
  val record = new GenericData.Record(schema)
})

你可以通过在函数块内创建模式的新实例来使所有内容正常工作:
val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.map(t => {
  // create a new Schema object
  val innserSchema = new Schema.Parser().parse(new File(jsonSchema))
  val record = new GenericData.Record(innserSchema)
  ...
})

为了避免在处理每条记录时都解析Avro架构,更好的解决方案是在分区级别解析架构。以下方法也可行:

val schema = new Schema.Parser().parse(new File(jsonSchema))
// The schema above should not be used in closures, it's for other purposes
...
rdd.mapPartitions(tuples => {
  // create a new Schema object
  val innserSchema = new Schema.Parser().parse(new File(jsonSchema))

  tuples.map(t => {
    val record = new GenericData.Record(innserSchema)
    ...
    // this closure will be bundled together with the outer one 
    // (no serialization issues)
  })
})

上述代码只要提供可移植的jsonSchema文件引用即可正常工作,由于map函数将被多个远程执行程序运行,因此引用应该指向HDFS中的文件,或者与JAR应用程序打包在一起(在后一种情况下,您将使用类加载器功能以获取其内容)。
对于那些尝试在Spark中使用Avro的用户,请注意仍然存在一些未解决的编译问题,并且您必须在Maven POM中使用以下导入:
<dependency>
  <groupId>org.apache.avro</groupId>
  <artifactId>avro-mapred</artifactId>
  <version>1.7.7</version>
  <classifier>hadoop2</classifier>
<dependency>

请注意 "hadoop2" 分类器。您可以在 https://issues.apache.org/jira/browse/SPARK-3039 上跟踪此问题。

当我们的map函数内部没有外部依赖时,这种方法运行良好。有没有办法使模式可序列化? - Prince Bhatti

2
Spark使用的默认序列化程序是Java序列化。因此,对于所有java类型,它将尝试使用Java序列化进行序列化。AvroKey不可序列化,因此您会遇到错误。
您可以使用KryoSerializer,或插入自定义序列化程序(如Avro)。您可以在这里阅读有关序列化的更多信息:http://spark-project.org/docs/latest/tuning.html 您还可以将对象包装在外部化对象中。例如,可以检查SparkFlumeEvent,该事件在此处包装了AvroFlumeEvent:https://github.com/apache/spark/blob/master/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala

0

使用Dataframe和Databricks库创建Avro非常简单。

dataframe.write.format("com.databricks.spark.avro").avro($hdfs_path)

在您的情况下,输入是avro,因此它将与模式相关联,因此您可以直接将avro读入数据框架中,在转换完成后,您可以使用上述代码将其写入avro。
要将avro读入数据框架中:
Spark 1.6
val dataframe = sqlContext.read.avro($hdfs_path) OR val dataframe = sqlContext.read.format("com.databricks.spark.avro").load($hdfs_path)
Spark 2.1
val dataframe = sparkSession.read.avro($hdfs_path) OR val dataframe = sparkSession.read.format("com.databricks.spark.avro").load($hdfs_path)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接