为什么在创建DataFrame时,Spark会推断二进制而不是Array[Byte]?

4
原则上,我有一个由“Name”和“Values”字段组成的DataFrame。第一个字段是String类型,而第二个字段是Array[Byte]类型。
我想对这个DataFrame中的每条记录应用任何函数,使用UDF并创建一个新列。当“Values”是Array[Int]类型时,这个方法完美地工作。然而,当它是Array[Byte]类型时,会出现以下错误:
org.apache.spark.sql.AnalysisException: cannot resolve 'UDF(Values)' due to data type mismatch: argument 1 requires array<tinyint> type, however, '`Values`' is of binary type.;;
'Project [Name#47, Values#48, UDF(Values#48) AS TwoTimes#56]
+- Project [_1#44 AS Name#47, _2#45 AS Values#48]
+- SerializeFromObject [staticinvoke(class 
org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._1, true) AS _1#44, assertnotnull(assertnotnull(input[0, scala.Tuple2, true]))._2 AS _2#45]
  +- ExternalRDD [obj#43]

完整代码如下:
scala> val df1 = spark.sparkContext.parallelize(Seq(("one", Array[Byte](1, 2, 3, 4, 5)), ("two", Array[Byte](6, 7, 8, 9, 10)))).toDF("Name", "Values")
df1: org.apache.spark.sql.DataFrame = [Name: string, Values: binary]

scala> df1.show
+----+----------------+
|Name|          Values|
+----+----------------+
| one|[01 02 03 04 05]|
| two|[06 07 08 09 0A]|
+----+----------------+

scala> val twice = udf { (values: Seq[Byte]) =>
   |     val result = Array.ofDim[Byte](values.length)
   |     for (i <- values.indices)
   |         result(i) = (2 * values(i).toInt).toByte
   |     result
   | }
twice: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,BinaryType,Some(List(ArrayType(ByteType,false))))

scala> val df2 = df1.withColumn("TwoTimes", twice('Values))

我了解这种错误是由于错误的数据类型导致的(期望是Array[Byte],但它发现了一个Binary),但我不明白为什么Spark会将我的Array[Byte]推断为Binary。能否有人向我解释一下呢?
如果我必须使用Binary而不是Array[Byte],我应该如何在我的UDF中处理它?
我澄清一下,我的原始UDF没有使用简单的for循环。我了解在这个例子中,这可以被map方法替换。
1个回答

5
在Spark中,Array[Byte] 被表示为 BinaryType。从文档中我们可以看到:

public class BinaryType extends DataType
代表Array[Byte]值的数据类型。请使用单例DataTypes.BinaryType。

因此,Array[Byte]Binary 是相同的,但是这些和 Seq[Byte] 之间有一些差异,导致出现错误。
要解决此问题,只需在udf中将 Seq[Byte] 替换为 Array[Byte] 即可。
val twice = udf { (values: Array[Byte]) =>
  val result = Array.ofDim[Byte](values.length)
  for (i <- values.indices)
    result(i) = (2 * values(i).toInt).toByte
  result
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接