我一直在尝试将RDD转换为数据框。为此,需要定义类型而不是使用Any。我正在使用Spark MLLib PrefixSpan,其中包含freqSequence.sequence。我从包含Session_IDs、views和purchases的数据框开始,它们都是字符串数组:
viewsPurchasesGrouped: org.apache.spark.sql.DataFrame =
[session_id: decimal(29,0), view_product_ids: array[string], purchase_product_ids: array[string]]
我会计算频繁模式并将它们放入数据帧中,以便我可以将它们写入Hive表。
val viewsPurchasesRddString = viewsPurchasesGrouped.map( row => Array(Array(row(1)), Array(row(2)) ))
val prefixSpan = new PrefixSpan()
.setMinSupport(0.001)
.setMaxPatternLength(2)
val model = prefixSpan.run(viewsPurchasesRddString)
val freqSequencesRdd = sc.parallelize(model.freqSequences.collect())
case class FreqSequences(views: Array[String], purchases: Array[String], support: Long)
val viewsPurchasesDf = freqSequencesRdd.map( fs =>
{
val views = fs.sequence(0)(0)
val purchases = fs.sequence(1)(0)
val freq = fs.freq
FreqSequences(views, purchases, freq)
}
)
viewsPurchasesDf.toDF() // optional
当我尝试运行这段代码时,"views"和"purchases"变成了"Any"而不是"Array[String]"。我已经拼命尝试将它们转换过来,但最好的结果是"Array[Any]"。我认为我需要将内容映射到一个字符串上,例如我尝试了这个:如何在WrappedArray中获取元素:Dataset.select("x").collect()的结果? 和这个:如何将WrappedArray [WrappedArray [Float]] 转换为 Array [Array [Float]] 在spark(scala)中 以及数千个其他的Stackoverflow问题...
我真的不知道该如何解决这个问题了。我猜我可能已经对初始的dataframe / RDD 进行了太多的转换,但是我无法理解在哪里。
viewsPurchasesRddString: org.apache.spark.rdd.RDD [Array[Array[Array[String]]]] = MapPartitionsRDD[1801] at map at <console>: 197 prefixSpan:org.apache.spark.mllib.fpm.PrefixSpan = org.apache.spark.mllib.fpm.PrefixSpan@13b756c2 org.apache.spark.SparkException:由于阶段失败而中止作业:第1272.0阶段中的任务28已失败4次,最近一次失败:任务28.3在阶段1272.0中(...):java.lang.ClassCastException:scala.collection.mutable.WrappedArray$ofRef无法转换为[Ljava.lang.String;
- Kora K