如何动态定义流式数据集的模式以写入CSV?

3

我有一个从Kafka读取的流数据集,想要写入CSV文件。

case class Event(map: Map[String,String])
def decodeEvent(arrByte: Array[Byte]): Event = ...//some implementation
val eventDataset: Dataset[Event] = spark
  .readStream
  .format("kafka")
  .load()
  .select("value")
  .as[Array[Byte]]
  .map(decodeEvent)

Event包含一个Map[String,String],为了将其写入CSV文件中,需要一些模式。

假设所有字段都是String类型,因此我尝试了来自Spark Repo的示例。

val columns = List("year","month","date","topic","field1","field2")
val schema = new StructType() //Prepare schema programmatically
columns.foreach { field => schema.add(field, "string") }
val rowRdd = eventDataset.rdd.map { event => Row.fromSeq(
     columns.map(c => event.getOrElse(c, "")
)}
val df = spark.sqlContext.createDataFrame(rowRdd, schema)

以下内容在"eventDataset.rdd"这一行会在运行时报错:

原因:org.apache.spark.sql.AnalysisException错误:带有流式数据源的查询必须使用writeStream.start()执行;

下面的代码无法正常工作,因为'.map'拥有List[String]而非Tuple类型:

eventDataset.map(event => columns.map(c => event.getOrElse(c,""))
.toDF(columns:_*)

有没有使用编程架构和结构化流数据集来实现这一目标的方法?
1个回答

2
我会采用更简单的方法:
import org.apache.spark.sql.functions._

eventDataset.select(columns.map(
  c => coalesce($"map".getItem(c), lit("")).alias(c)
): _*).writeStream.format("csv").start(path)

但是如果你想要更接近当前解决方案,可以跳过RDD转换。

import org.apache.spark.sql.catalyst.encoders.RowEncoder

eventDataset.rdd.map(event =>
  Row.fromSeq(columns.map(c => event.getOrElse(c,"")))
)(RowEncoder(schema)).writeStream.format("csv").start(path)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接