如何在使用Spark模式读取CSV时删除格式不正确的行?

10

我在使用Spark DataSet加载CSV文件时,更喜欢清晰地指定模式。但我发现有一些行不符合我的模式。一个列应该是double类型,但有些行是非数字值。是否可以轻松地从DataSet中过滤掉所有不符合我的模式的行?

val schema = StructType(StructField("col", DataTypes.DoubleType) :: Nil)
val ds = spark.read.format("csv").option("delimiter", "\t").schema(schema).load("f.csv")

f.csv:

a
1.0

我希望能够轻松地从我的数据集中过滤出"a"。谢谢!

2个回答

12
如果您正在阅读一个CSV文件,并想要删除与架构不匹配的行,您可以通过添加选项mode作为DROPMALFORMED来实现。
输入数据。
a,1.0
b,2.2
c,xyz
d,4.5
e,asfsdfsdf
f,3.1
Schema
val schema = StructType(Seq(
  StructField("key", StringType, false),
  StructField("value", DoubleType, false)
))

使用 schemaoption 读取 csv 文件

  val df = spark.read.schema(schema)
    .option("mode", "DROPMALFORMED")
    .csv("/path to csv file ")

输出:

+-----+-----+
|key  |value|
+-----+-----+
|hello|1.0  |
|hi   |2.2  |
|how  |3.1  |
|you  |4.5  |
+-----+-----+

你可以在这里获取更多有关 spark-csv 的详细信息。

希望这能有所帮助!


谢谢!你们两个的回答都是正确的。所以我设置了第一个答案为采纳的答案。但是非常感谢你们宝贵的回答! - Zhe Hou
如果我想要用于审计目的,如何恢复损坏的记录。例如,我想创建一个包含损坏记录的新数据框。 - Abhi
确切地说,格式错误的记录应该被报告给源系统/供应商,这是非常基本的功能,Spark中是否有这样的功能? - kensai
一个需要注意的地方是,你会悄悄地丢失你的输入数据。 - Ricardo Mutti

9

.option("mode", "DROPMALFORMED") 应该能完成工作。

mode (默认值为 PERMISSIVE):允许在解析期间处理损坏记录的模式。

  • PERMISSIVE: 当遇到损坏记录时,将其他字段设置为 null,并将畸形字符串放入由 columnNameOfCorruptRecord 配置的新字段中。当用户设置模式时,它会为额外的字段设置 null 值。

  • DROPMALFORMED: 忽略整个损坏的记录。

  • FAILFAST: 在遇到损坏的记录时抛出异常。


1
这个答案不是和另一个一样吗? - koiralo
2
当CSV加载器的结果实际上是两个分开的数据帧对象时,一个已解析,另一个已损坏,这将更好。 - kensai

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接