使用spark-csv与DataFrames时出现NullPointerException错误

8

在阅读spark-csv README时,有一些Java示例代码如下:

import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.*;

这些代码涉及IT技术。

SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(
    new StructField("year", IntegerType, true), 
    new StructField("make", StringType, true),
    new StructField("model", StringType, true),
    new StructField("comment", StringType, true),
    new StructField("blank", StringType, true));

DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("newcars.csv");

它无法直接编译,因此经过一些处理,我将不正确的FooType语法更改为DataTypes.FooType并将StructFields作为new StructField[]传递才得以编译; 编译器要求在StructField构造函数中提供第四个参数metadata,但我很难找到有关其含义的文档(javadocs描述了其用例,但不是如何在StructField构建期间决定要传递什么)。使用以下代码,现在可以运行,直到任何副作用方法,例如collect()

JavaSparkContext sc = new JavaSparkContext(conf);

SQLContext sqlContext = new SQLContext(sc);

// Read features.
System.out.println("Reading features from " + args[0]);
StructType featuresSchema = new StructType(new StructField[] {
    new StructField("case_id", DataTypes.StringType, false, null), 
    new StructField("foo", DataTypes.DoubleType, false, null)
});
DataFrame features = sqlContext.read()
    .format("com.databricks.spark.csv")
    .schema(featuresSchema)
    .load(args[0]);
for (Row r : features.collect()) {
  System.out.println("Row: " + r);
}

我遇到了以下异常:

Exception in thread "main" java.lang.NullPointerException
  at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:202)
  at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
  at scala.collection.immutable.HashSet.elemHashCode(HashSet.scala:65)
  at scala.collection.immutable.HashSet.computeHash(HashSet.scala:74)
  at scala.collection.immutable.HashSet.$plus(HashSet.scala:56)
  at scala.collection.immutable.HashSet.$plus(HashSet.scala:59)
  at scala.collection.immutable.Set$Set4.$plus(Set.scala:127)
  at scala.collection.immutable.Set$Set4.$plus(Set.scala:121)
  at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
  at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
  at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
  at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
  at scala.collection.SetLike$class.map(SetLike.scala:93)
  at scala.collection.AbstractSet.map(Set.scala:47)
  at org.apache.spark.sql.catalyst.expressions.AttributeSet.foreach(AttributeSet.scala:114)
  at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:105)
  at org.apache.spark.sql.catalyst.expressions.AttributeSet.size(AttributeSet.scala:56)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:307)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
  at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926)
  at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924)
  at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930)
  at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
  at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
  at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
...

您有什么想法是出了什么问题吗?
2个回答

10

看起来README已经非常过时,需要对Java示例进行重大编辑。我找到了实际添加元数据字段的JIRA,它指向在Scala情况下使用默认Map.empty值,并且撰写文档的人员可能只是直接将Scala翻译成了Java,尽管输入参数缺少相同的默认值。

SparkSQL 1.5分支的代码中,我们可以看到它引用了metadata.hashCode()而没有检查,这就导致了NullPointerException。存在Metadata.empty()方法以及关于在Scala中使用空映射作为默认值的讨论似乎意味着正确的实现方式是,如果你不在意,那么就继续传递Metadata.empty()。修订后的示例应该是:

SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(new StructField[] {
    new StructField("year", DataTypes.IntegerType, true, Metadata.empty()), 
    new StructField("make", DataTypes.StringType, true, Metadata.empty()),
    new StructField("model", DataTypes.StringType, true, Metadata.empty()),
    new StructField("comment", DataTypes.StringType, true, Metadata.empty()),
    new StructField("blank", DataTypes.StringType, true, Metadata.empty())
});

DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .schema(customSchema)
    .option("header", "true")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("newcars.csv");

向 spark-csv 发送了一个拉取请求,以修复文档。 - Dennis Huo
我认为你需要将你的答案从.option("inferSchema", "true")改为.option("customSchema", "true")。 - Zahiro Mor
啊,确实你说得对,发现得好,我只关注了指定customSchema定义的行,试图对上游示例进行最小更改。已经发送了另一个pull request来修复文档中的问题,并编辑了答案以匹配。 - Dennis Huo

3

我也遇到了同样的异常。通过提供元数据,我解决了这个问题。

因此,请将代码更改为:

StructType customSchema = new StructType(
new StructField("year", IntegerType, true,Metadata.empty()), 
new StructField("make", StringType, true,Metadata.empty()),
new StructField("model", StringType, true,Metadata.empty()),
new StructField("comment", StringType, true,Metadata.empty()),
new StructField("blank", StringType, true,Metadata.empty()));

这将解决问题。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接