从Spark DataFrame/RDD中获取前N个项目

4

我的要求是从数据框中获取前N个项目。

我有这个数据框:

val df = List(
      ("MA", "USA"),
      ("MA", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("OH", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("NY", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA"),
      ("CT", "USA")).toDF("value", "country")

我能够将它映射到一个 RDD[((Int, String), Long)] 中 colValCount: 读取: ((colIdx, value), count)

((0,CT),5)
((0,MA),2)
((0,OH),4)
((0,NY),6)
((1,USA),17)

现在,我需要获取每个列索引的前2个数据项。所以我的预期输出是这样的:
RDD[((Int, String), Long)]

((0,CT),5)
((0,NY),6)
((1,USA),17)

我尝试在DataFrame中使用freqItems api,但是速度很慢。

欢迎您提出任何建议。


1
我认为你需要一些 sort()limit() 的组合,但老实说我不明白你如何得到你的输出。 - pault
4个回答

3

例如:

import org.apache.spark.sql.functions._

df.select(lit(0).alias("index"), $"value")
   .union(df.select(lit(1), $"country"))
   .groupBy($"index", $"value")
   .count
  .orderBy($"count".desc)
  .limit(3)
  .show

// +-----+-----+-----+
// |index|value|count|
// +-----+-----+-----+
// |    1|  USA|   17|
// |    0|   NY|    6|
// |    0|   CT|    5|
// +-----+-----+-----+

where:

df.select(lit(0).alias("index"), $"value")
  .union(df.select(lit(1), $"country"))

创建一个两列的 DataFrame:
// +-----+-----+
// |index|value|
// +-----+-----+
// |    0|   MA|
// |    0|   MA|
// |    0|   OH|
// |    0|   OH|
// |    0|   OH|
// |    0|   OH|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   NY|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    0|   CT|
// |    1|  USA|
// |    1|  USA|
// |    1|  USA|
// +-----+-----+

如果您想为每一列明确指定两个值:
import org.apache.spark.sql.DataFrame

def topN(df: DataFrame, key: String, n: Int)  = {
   df.select(
        lit(df.columns.indexOf(key)).alias("index"), 
        col(key).alias("value"))
     .groupBy("index", "value")
     .count
     .orderBy($"count")
     .limit(n)
}

topN(df, "value", 2).union(topN(df, "country", 2)).show
// +-----+-----+-----+ 
// |index|value|count|
// +-----+-----+-----+
// |    0|   MA|    2|
// |    0|   OH|    4|
// |    1|  USA|   17|
// +-----+-----+-----+

所以就像pault said的那样 - 只是 "sort()和limit()的某种组合"。

一些 orderBy()limit() 的组合...基本上就是我说的 :-) - pault
@pault 是的,我猜要么是这样,要么是在这里使用groupByagg的某种组合,偶尔再加上窗口函数 :-) - Alper t. Turker
3
.orderBy($"count".desc).limit(3)在这种情况下可以得到指定的结果,但在一般情况下它不能给出每个列索引的前两个项目。 - Kirk Broadhurst
谢谢,让我试一下! - Sam
@user8371915 这个可行,但处理TB级别的数据还是比较慢,因为我还在处理超过170列的数据。性能与之前提到的freqItems API相当。处理整个数据需要大约一个多小时的时间。 - Sam
这个过程一段时间后会抛出GC异常:Exception in thread "dispatcher-event-loop-39" java.lang.OutOfMemoryError: GC overhead limit exceeded - Sam

3

最简单的方法是使用自然窗口函数,可以通过编写SQL语句来实现。Spark自带SQL语法,而SQL是解决此问题的强大和表达性极佳的工具。

将您的数据框注册为临时表,然后在其上进行分组和窗口操作。

spark.sql("""SELECT idx, value, ROW_NUMBER() OVER (PARTITION BY idx ORDER BY c DESC) as r 
             FROM (
               SELECT idx, value, COUNT(*) as c 
               FROM (SELECT 0 as idx, value FROM df UNION ALL SELECT 1, country FROM df) 
               GROUP BY idx, value) 
             HAVING r <= 2""").show()

我想看看在没有迭代或循环的情况下,过程化/scala方法是否允许您执行窗口函数。 我不知道Spark API中是否支持这种方法。

顺便说一句,如果您有任意数量的要包含的列,那么您可以使用列列表动态轻松生成内部部分(SELECT 0 as idx, value ... UNION ALL SELECT 1, country等)。


谢谢,我尝试使用窗口函数时出现了GC开销的问题。我也会尝试您的解决方案。 - Sam

2

假如您有一个RDD:

val rdd =
  sc.parallelize(
    List(
      ((0, "CT"), 5),
      ((0, "MA"), 2),
      ((0, "OH"), 4),
      ((0, "NY"), 6),
      ((1, "USA"), 17)
    ))

rdd.filter(_._1._1 == 0).sortBy(-_._2).take(2).foreach(println)
> ((0,NY),6)
> ((0,CT),5)
rdd.filter(_._1._1 == 1).sortBy(-_._2).take(2).foreach(println)
> ((1,USA),17)

我们首先获取给定列索引的项目(.filter(_._1._1 == 0))。然后我们按递减顺序对项目进行排序(.sortBy(-_._2))。最后,我们最多取前两个元素(.take(2)),如果记录数少于2,则只取1个元素。

0

您可以使用Sparkz中定义的此辅助函数映射每个单独的分区,然后将它们组合在一起:

package sparkz.utils

import scala.reflect.ClassTag

object TopElements {
  def topN[T: ClassTag](elems: Iterable[T])(scoreFunc: T => Double, n: Int): List[T] =
    elems.foldLeft((Set.empty[(T, Double)], Double.MaxValue)) {
      case (accumulator@(topElems, minScore), elem) =>
        val score = scoreFunc(elem)
        if (topElems.size < n)
          (topElems + (elem -> score), math.min(minScore, score))
        else if (score > minScore) {
          val newTopElems = topElems - topElems.minBy(_._2) + (elem -> score)
          (newTopElems, newTopElems.map(_._2).min)
        }
        else accumulator
    }
      ._1.toList.sortBy(_._2).reverse.map(_._1)
}

源代码: https://github.com/gm-spacagna/sparkz/blob/master/src/main/scala/sparkz/utils/TopN.scala


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接