交叉连接运行时错误:使用CROSS JOIN语法允许在这些关系之间进行笛卡尔积。

4
我有以下可以编译的函数。
  def compare(dbo: Dataset[Cols], ods: Dataset[Cols]) = {
    val j = dbo.crossJoin(ods)
    // Tried val j = dbo.joinWith(ods, func.expr("true")) too
    j.take(5).foreach(r => println(r)) 
  }

但是在提交到Spark时出现了运行时错误。

连接条件缺失或微不足道。(如果使用joinWith而不是crossJoin,请使用CROSS JOIN语法允许这些关系之间的笛卡尔积。);在org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$21.applyOrElse(Optimizer.scala:1067);在org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$21.applyOrElse(Optimizer.scala:1064);在org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268);在org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:268);在org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70);在org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:267);在org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273);在org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273);在org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307);在org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188);在org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305);在org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273);在org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273);在org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:273);在org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:307);在org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:188);在org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:305);在org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:273);在org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:257);在org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:1064);在org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts.apply(Optimizer.scala:1049);在org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85);在org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82);在scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:57);在scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:66);在scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:35);在org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82);在org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74);在scala.collection.immutable.List.foreach(List.scala:381);在org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74);在org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78);在org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78);在org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:84);在org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:80);在org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:89);在org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:89);在org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2814);在org.apache.spark.sql.Dataset.head(Dataset.scala:2127);在org.apache.spark.sql.Dataset.take(Dataset.scala:2342);在MappingPoint$.compare(MappingPoint.scala:43);在MappingPoint$.main(MappingPoint.scala:33);在MappingPoint.main(MappingPoint.scala);在sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method);在sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source);在sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source);在java.lang.reflect.Method

为什么不使用Spark 2.2呢?在这个版本中,许多问题都得到了很大的改善。有没有坚持使用2.1.1的理由?至少试试新版本,看看是否可以解决问题。我认为2.2可能会有所改进(因为代码已经改变)。 - Jacek Laskowski
我想我几个月前安装了2.1.1版(当时最新版本?),并没有升级它。 - ca9163d9
没有Spark的安装,但可以将其定义为库依赖项或在PATH中。是时候升级了。至少你会知道我们是否正在与已经修复的问题作斗争(这样你就不会浪费时间)。 - Jacek Laskowski
1
尝试使用Scala 2.11.8和2.2.0版本,仍然出现相同的错误。 - ca9163d9
2个回答

9

0
以下对我有效。我简化了Cols case class,这样我就不必输入那么多,但我认为它是你所尝试的东西。
我使用的是Spark 2.1.1:
case class Cols (
    A: Int,
    B: String
)

val dbo: Dataset[Cols] = spark.createDataset(
    Seq[Cols](
        Cols(1, "One"),
        Cols(2, "Two")
    )
)
val ods: Dataset[Cols] = spark.createDataset(
    Seq[Cols](
        Cols(3, "Three"),
        Cols(4, "Four")
    )
)

val cartesian: Dataset[(Cols,Cols)] = dbo.crossJoin(ods).map {
    case Row(lA: Int, lB: String, rA: Int, rB: String) => (Cols(lA, lB), Cols(rA, rB))
}
val result: Dataset[Int] = cartesian.map {
    case (l: Cols, r: Cols) => 0
}

只要 Cols 元素少于11个,你应该没问题。否则,在 crossJoin 之后尝试对 >22 个元素进行模式匹配时可能会遇到问题。
在我看来,你提交给 Spark 的内容似乎仍在使用 joinWith 行,而 Spark 显然会尝试检测和防止笛卡尔积的发生。

我能否总是使用 dbo.joinWith(ods, func.lit(true)) 来避免模式匹配中超过22个元素的问题?顺便说一下,dbo.joinWith(ods, func.lit(true)) 已经返回了类型为 Dataset[(Cols,Cols)] 的值,因此在您的 cartesian 代码中不需要进行额外的映射。但我仍然遇到了错误。也许有一些系统设置可以允许交叉连接? - ca9163d9

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接