我有一个Spark SQL DataFrame
,其中有两列,每个条目都是字符串数组。
val ngramDataFrame = Seq(
(Seq("curious", "bought", "20"), Seq("iwa", "was", "asj"))
).toDF("filtered_words", "ngrams_array")
我希望将每一行中的数组合并成一个新列中的单个数组。我的代码如下:
def concat_array(firstarray: Array[String],
secondarray: Array[String]) : Array[String] =
{ (firstarray ++ secondarray).toArray }
val concatUDF = udf(concat_array _)
val concatFrame = ngramDataFrame.withColumn("full_array", concatUDF($"filtered_words", $"ngrams_array"))
我可以成功地在两个数组上使用
concat_array
函数。但是当我运行以上代码时,会出现以下异常:
org.apache.spark.SparkException: 由于阶段失败而中止作业:第16.0阶段的任务0.0失败了1次,最近一次失败:在第16.0阶段(TID 12,localhost)中丢失任务0.0:org.apache.spark.SparkException:未能执行用户定义的函数(anonfun$1:(array,array)=> array)at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)at org.apache.spark.scheduler.Task.run(Task.scala:86)at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:274)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)Caused by:java.lang.ClassCastException:scala.collection.mutable.WrappedArray $ ofRef无法转换为[Ljava.lang.String; at $line80.$read $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ anonfun $ 1.apply(:76)... 13 more Driver stacktrace: