Spark ML索引器无法解析带有点的DataFrame列名?

16

我有一个DataFrame,其中有一列名为a.b。当我将a.b作为输入列名指定给StringIndexer时,会抛出AnalysisException异常,错误信息为"cannot resolve 'a.b' given input columns a.b"。我正在使用Spark 1.6.0版本。

我知道旧版本的Spark可能存在列名中有点号的问题,但在更近的版本中,可以在Spark shell和SQL查询中使用反引号(backquotes)来引用列名。例如,另一个问题"如何在Spark SQL中转义带有连字符的列名"的解决方法就是这样。一些这些问题被报告在SPARK-6898,Special chars in column names is broken,但这在1.4.0版本中就已经解决了。

以下是一个最小化的示例和堆栈跟踪:

public class SparkMLDotColumn {
    public static void main(String[] args) {
        // Get the contexts
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("test")
                .set("spark.ui.enabled", "false"); // http://permalink.gmane.org/gmane.comp.lang.scala.spark.user/21385
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sparkContext);

        // Create a schema with a single string column named "a.b"
        StructType schema = new StructType(new StructField[] {
                DataTypes.createStructField("a.b", DataTypes.StringType, false)
        });

        // Create an empty RDD and DataFrame
        JavaRDD<Row> rdd = sparkContext.parallelize(Collections.emptyList());
        DataFrame df = sqlContext.createDataFrame(rdd, schema);

        StringIndexer indexer = new StringIndexer()
            .setInputCol("a.b")
            .setOutputCol("a.b_index");
        df = indexer.fit(df).transform(df);
    }
}

现在,我们值得尝试使用反引号列名的相同示例,因为我们会得到一些奇怪的结果。这里有一个具有相同模式的示例,但是这次我们的框架中有数据。在尝试任何索引之前,我们将把名为a.b的列复制到名为a_b的列中。这需要使用反引号,并且可以正常工作。然后,我们将尝试对a_b列进行索引,这也可以正常工作。然后,当我们尝试使用反引号对a.b列进行索引时,发生了非常奇怪的事情。我们没有错误,但也没有结果:

public class SparkMLDotColumn {
    public static void main(String[] args) {
        // Get the contexts
        SparkConf conf = new SparkConf()
                .setMaster("local[*]")
                .setAppName("test")
                .set("spark.ui.enabled", "false");
        JavaSparkContext sparkContext = new JavaSparkContext(conf);
        SQLContext sqlContext = new SQLContext(sparkContext);

        // Create a schema with a single string column named "a.b"
        StructType schema = new StructType(new StructField[] {
                DataTypes.createStructField("a.b", DataTypes.StringType, false)
        });

        // Create an empty RDD and DataFrame
        List<Row> rows = Arrays.asList(RowFactory.create("foo"), RowFactory.create("bar")); 
        JavaRDD<Row> rdd = sparkContext.parallelize(rows);
        DataFrame df = sqlContext.createDataFrame(rdd, schema);

        df = df.withColumn("a_b", df.col("`a.b`"));

        StringIndexer indexer0 = new StringIndexer();
        indexer0.setInputCol("a_b");
        indexer0.setOutputCol("a_bIndex");
        df = indexer0.fit(df).transform(df);

        StringIndexer indexer1 = new StringIndexer();
        indexer1.setInputCol("`a.b`");
        indexer1.setOutputCol("abIndex");
        df = indexer1.fit(df).transform(df);

        df.show();
    }
}
+---+---+--------+
|a.b|a_b|a_bIndex|  // where's the abIndex column?
+---+---+--------+
|foo|foo|     0.0|
|bar|bar|     1.0|
+---+---+--------+

第一个示例的堆栈跟踪

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'a.b' given input columns a.b;
    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:60)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:318)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:316)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:316)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:107)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:117)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:121)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
    at scala.collection.AbstractTraversable.map(Traversable.scala:105)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:121)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:125)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
    at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
    at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
    at scala.collection.AbstractIterator.to(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
    at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
    at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
    at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:125)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:57)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
    at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
    at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
    at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751)
    at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:84)
    at SparkMLDotColumn.main(SparkMLDotColumn.java:38)

1
配置中的 .set("spark.ui.enabled", "false"); 用于禁用 Spark 捆绑的 Jersey;请参见 http://permalink.gmane.org/gmane.comp.lang.scala.spark.user/21385。 - Joshua Taylor
深入挖掘一下,这不是机器学习特有的问题。df.select(df.col("a.b"))也存在同样的问题。df.col的文档提到了进入嵌套列的能力,但resolveQuoted方法应该使df.col("`a.b`")工作,并且它确实返回了一个列。然而索引仍然无法正常工作。 - Joshua Taylor
虽然我希望在这里得到答案,但这似乎是一个错误,所以我已经打开了SPARK-12965 - Joshua Taylor
嘿,你解决了吗? - mshikher
@mshikher 我从来没有找到解决办法。 - Joshua Taylor
1个回答

1

我在Spark 2.1上遇到了同样的问题。最终,我创建了一个函数来“验证”所有列名,通过替换所有的点。Scala实现:

def validifyColumnnames[T](df : Dataset[T], spark : SparkSession) : DataFrame = {
   val newColumnNames = ArrayBuffer[String]()
   for(oldCol <- df.columns) {
      newColumnNames +=  oldCol.replaceAll("\\.","") // append
   }
   val newColumnNamesB = spark.sparkContext.broadcast(newColumnNames.toArray)
   df.toDF(newColumnNamesB.value : _*)
}

抱歉,这可能不是你期望的答案,但这篇文章太长了,无法在评论中回答。


1
是的,在我的应用程序中,我最终对列名进行了“净化”并跟踪“原始到净化”的映射。虽然这不是特别令人满意,但它很强大并且有效。我很希望能够修复潜在的问题,但已经过去一年了,所以谁知道是否会发生这种情况。 - Joshua Taylor

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接