如何避免Spark中的NumberFormatException:null错误

3
我有一个普遍问题,源于我遇到的具体异常。
我正在使用Spark 1.6通过Dataproc查询数据。我需要从两个日志中获取一天的数据(约10000个文件),然后进行一些转换。
然而,我的数据可能会(也可能不会)有一些坏数据。在没有成功完成整天查询后,我尝试了00-09小时,并没有出现错误。尝试了10-19小时并出现了异常。逐小时尝试发现坏数据在第10小时。11和12小时都很好。
基本上,我的代码是:
val imps = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("inferSchema", "true").load("gs://logs.xxxx.com/2016/03/14/xxxxx/imps/2016-03-14-10*").select("C0","C18","C7","C9","C33","C29","C63").registerTempTable("imps")

val conv = sqlContext.read.format("com.databricks.spark.csv").option("header", "false").option("inferSchema", "true").load("gs://logs.xxxx.com/2016/03/14/xxxxx/conv/2016-03-14-10*").select("C0","C18","C7","C9","C33","C29","C65").registerTempTable("conversions")

val ff = sqlContext.sql("select * from (select * from imps) A inner join (select * from conversions) B on A.C0=B.C0 and A.C7=B.C7 and A.C18=B.C18 ").coalesce(16).write.format("com.databricks.spark.csv").save("gs://xxxx-spark-results/newSparkResults/Plara2.6Mar14_10_1/")

{过度简化的}

我收到的错误信息是:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 38 in stage 130.0 failed 4 times, most recent failure: Lost task 38.3 in stage 130.0 (TID 88495, plara26-0317-0001-sw-v8oc.c.xxxxx-analytics.internal): java.lang.NumberFormatException: null
    at java.lang.Integer.parseInt(Integer.java:542)
    at java.lang.Integer.parseInt(Integer.java:615)
    at scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229)
    at scala.collection.immutable.StringOps.toInt(StringOps.scala:31)
    at com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:53)
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:181)
    at com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$6.apply(CsvRelation.scala:162)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:511)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:686)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:95)
    at org.apache.spark.sql.execution.aggregate.TungstenAggregate$$anonfun$doExecute$1$$anonfun$2.apply(TungstenAggregate.scala:86)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$20.apply(RDD.scala:710)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

我的问题是 - 如何使用spark-csv实现异常处理? 我可以将数据框转换为RDD并在那里进行操作,但似乎必须有更好的方法...... 有人解决了类似的问题吗?

1
更新:在将选项更改为推断模式为false后,我已经成功获取了我的数据。这样字段将被读取为字符串,转换为Int当然是不必要的。我仍在寻找一个强大的解决方案来捕获异常..... - Zahiro Mor
1个回答

0

这是因为自动推断模式不安全,可能会受到输入文件中无效数据的影响。

这可能导致使用不同的输入文件时,数据框架模式不同。

假设我们有一个包含浮点数和字符串的 CSV 文件:

0.018
0.095
0.000
'hoi'
0.000
0.093
0.012

当我们使用inferSchema将其读入数据框中,就像这样:

>>> df = spark.read.format('csv').option('inferSchema', True).load('./test_csv.dat')
>>> df.show()
+-----+
|  _c0|
+-----+
|0.018|
|0.095|
|0.000|
|'hoi'|
|0.000|
|0.093|
|0.012|
+-----+

那么类型无法正确推断:

>>> df.schema
StructType(List(StructField(_c0,StringType,true)))

您可以手动将该列进行类型转换来解决此问题,例如:
>>> df = df.withColumn('val_float', df_tmp._c0.cast(FloatType())).select('val_float')
>>> df.show()
+---------+
|val_float|
+---------+
|    0.018|
|    0.095|
|      0.0|
|     null|
|      0.0|
|    0.093|
|    0.012|
+---------+

>>> df.schema
StructType(List(StructField(val_float,FloatType,true)))

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接