Scala将WrappedArray或Array[Any]转换为Array[String]

5

我一直在尝试将RDD转换为数据框。为此,需要定义类型而不是使用Any。我正在使用Spark MLLib PrefixSpan,其中包含freqSequence.sequence。我从包含Session_IDs、views和purchases的数据框开始,它们都是字符串数组:

viewsPurchasesGrouped: org.apache.spark.sql.DataFrame =
  [session_id: decimal(29,0), view_product_ids: array[string], purchase_product_ids: array[string]]

我会计算频繁模式并将它们放入数据帧中,以便我可以将它们写入Hive表。

val viewsPurchasesRddString = viewsPurchasesGrouped.map( row => Array(Array(row(1)), Array(row(2)) ))

val prefixSpan = new PrefixSpan()
  .setMinSupport(0.001)
  .setMaxPatternLength(2)

val model = prefixSpan.run(viewsPurchasesRddString)

val freqSequencesRdd = sc.parallelize(model.freqSequences.collect())

case class FreqSequences(views: Array[String], purchases: Array[String], support: Long)

val viewsPurchasesDf = freqSequencesRdd.map( fs =>
  {   
  val views = fs.sequence(0)(0)
  val purchases = fs.sequence(1)(0)
  val freq = fs.freq
  FreqSequences(views, purchases, freq)
  }
)
viewsPurchasesDf.toDF() // optional

当我尝试运行这段代码时,"views"和"purchases"变成了"Any"而不是"Array[String]"。我已经拼命尝试将它们转换过来,但最好的结果是"Array[Any]"。我认为我需要将内容映射到一个字符串上,例如我尝试了这个:如何在WrappedArray中获取元素:Dataset.select("x").collect()的结果? 和这个:如何将WrappedArray [WrappedArray [Float]] 转换为 Array [Array [Float]] 在spark(scala)中 以及数千个其他的Stackoverflow问题...
我真的不知道该如何解决这个问题了。我猜我可能已经对初始的dataframe / RDD 进行了太多的转换,但是我无法理解在哪里。
2个回答

3

我认为问题在于您有一个DataFrame,它没有保留任何静态类型信息。当您从Row中取出一个项目时,您必须明确告诉它您希望得到的类型。

根据您提供的信息推断,尚未经过测试:

import scala.collection.mutable.WrappedArray

val viewsPurchasesRddString = viewsPurchasesGrouped.map( row =>
  Array(
    Array(row.getAs[WrappedArray[String]](1).toArray), 
    Array(row.getAs[WrappedArray[String]](2).toArray)
  )
)

1
感谢您的回答!在尝试时,我得到了以下错误信息:viewsPurchasesRddString: org.apache.spark.rdd.RDD [Array[Array[Array[String]]]] = MapPartitionsRDD[1801] at map at <console>: 197 prefixSpan:org.apache.spark.mllib.fpm.PrefixSpan = org.apache.spark.mllib.fpm.PrefixSpan@13b756c2 org.apache.spark.SparkException:由于阶段失败而中止作业:第1272.0阶段中的任务28已失败4次,最近一次失败:任务28.3在阶段1272.0中(...):java.lang.ClassCastException:scala.collection.mutable.WrappedArray$ofRef无法转换为[Ljava.lang.String; - Kora K
不幸的是,即使使用了导入,我仍然得到相同的错误。 - Kora K

3
我解决了这个问题。供参考,以下方法可行:
val viewsPurchasesRddString = viewsPurchasesGrouped.map( row =>
  Array(
  row.getSeq[Long](1).toArray, 
  row.getSeq[Long](2).toArray
  )
)

val prefixSpan = new PrefixSpan()
  .setMinSupport(0.001)
  .setMaxPatternLength(2)

val model = prefixSpan.run(viewsPurchasesRddString)

case class FreqSequences(views: Long, purchases: Long, frequence: Long)

val ps_frequences = model.freqSequences.filter(fs => fs.sequence.length > 1).map( fs =>
    {   
    val views = fs.sequence(0)(0)
    val purchases = fs.sequence(1)(0)
    val freq = fs.freq
    FreqSequences(views, purchases, freq)
    }
)

ps_frequences.toDF()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接