将两个类型为Array[string]的列合并为一个新的Array[string]列

9

我有一个Spark SQL DataFrame,其中有两列,每个条目都是字符串数组。

val  ngramDataFrame = Seq(
  (Seq("curious", "bought", "20"), Seq("iwa", "was", "asj"))
).toDF("filtered_words", "ngrams_array")

我希望将每一行中的数组合并成一个新列中的单个数组。我的代码如下:
def concat_array(firstarray: Array[String], 
                 secondarray: Array[String]) : Array[String] = 
                                     { (firstarray ++ secondarray).toArray }
val concatUDF = udf(concat_array _)
val concatFrame = ngramDataFrame.withColumn("full_array", concatUDF($"filtered_words", $"ngrams_array"))

我可以成功地在两个数组上使用concat_array函数。但是当我运行以上代码时,会出现以下异常:

org.apache.spark.SparkException: 由于阶段失败而中止作业:第16.0阶段的任务0.0失败了1次,最近一次失败:在第16.0阶段(TID 12,localhost)中丢失任务0.0:org.apache.spark.SparkException:未能执行用户定义的函数(anonfun$1:(array,array)=> array)at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)at org.apache.spark.scheduler.Task.run(Task.scala:86)at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:274)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run(Thread.java:748)Caused by:java.lang.ClassCastException:scala.collection.mutable.WrappedArray $ ofRef无法转换为[Ljava.lang.String; at $line80.$read $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ iw $$ anonfun $ 1.apply(:76)... 13 more Driver stacktrace:

2个回答

15

如果您希望保留重复项,在Spark 2.4或更高版本中可以使用concat

ngramDataFrame.withColumn(
  "full_array", concat($"filtered_words", $"ngrams_array")
).show
+--------------------+---------------+--------------------+
|      filtered_words|   ngrams_array|          full_array|
+--------------------+---------------+--------------------+
|[curious, bought,...|[iwa, was, asj]|[curious, bought,...|
+--------------------+---------------+--------------------+

或者 array_union(如果你想去重):

ngramDataFrame.withColumn(
  "full_array",
   array_union($"filtered_words", $"ngrams_array")
)

这些函数也可以由其他高阶函数组合而成,例如

ngramDataFrame.withColumn(
   "full_array",
   flatten(array($"filtered_words", $"ngrams_array"))
)

有重复的,和

ngramDataFrame.withColumn(
   "full_array",
   array_distinct(flatten(array($"filtered_words", $"ngrams_array")))
)

另外需要注意的是,在处理 ArrayType 列时,不应该使用 WrappedArray。而应该使用保证的接口 Seq。因此,udf 应该使用以下参数签名的函数:

without.

(Seq[String], Seq[String]) => Seq[String]
请参考SQL编程指南了解详情。

4

Arjun,你创建的udf存在错误。当你传递数组类型的列时,数据类型不是Array[String],而是WrappedArray[String]。下面我将粘贴修改后的udf及其输出结果。

val SparkCtxt = new SparkContext(sparkConf)

val sqlContext = new SQLContext(SparkCtxt)

import sqlContext.implicits

import org.apache.spark.sql.functions._
val temp=SparkCtxt.parallelize(Seq(Row(Array("String1","String2"),Array("String3","String4"))))
val df= sqlContext.createDataFrame(temp,
  StructType(List(
    StructField("Col1",ArrayType(StringType),true),
    StructField("Col2",ArrayType(StringType),true)
  )
  )    )

def concat_array(firstarray: mutable.WrappedArray[String],
                 secondarray: mutable.WrappedArray[String]) : mutable.WrappedArray[String] =
{
 (firstarray ++ secondarray)
}
val concatUDF = udf(concat_array _)
val df2=df.withColumn("udftest",concatUDF(df.col("Col1"), df.col("Col2")))
df2.select("udftest").foreach(each=>{println("***********")
println(each(0))})
df2.show(true)

输出:

+------------------+------------------+--------------------+
|              Col1|              Col2|             udftest|
+------------------+------------------+--------------------+
|[String1, String2]|[String3, String4]|[String1, String2...|
+------------------+------------------+--------------------+

包含String1、String2、String3和String4的数组


谢谢@sai!这解决了我的问题。我不知道那种类型。在你的数据集中,你将列定义为ArrayType,所以它们为什么会被转换为WrappedArray?此外,刚开始我遇到了一个错误,所以对于任何其他可能遇到这个问题的人来说,首先:导入collection.mutable._ - Arjun Mishra
@ArjunMishra 当您打印数据框的模式时,它将显示数据类型为包装数组而不是数组类型。数据框将对数组应用包装器类,以赋予其额外的功能。WrappedArray将一个数组包装起来,使其具有额外的功能。它还有一堆类型,而数组只扩展了可序列化和可克隆性,这允许将数组包装起来,以便在需要某些通用集合类型(如Seq)的地方使用。 - sai pradeep kumar kotha

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接