如何在Spark中对RDD进行排序并限制结果数量?

10

我有一个Foo类的RDD:class Foo( name : String, createDate : Date )。 我想要另一个RDD,其中包含比原RDD日期早10%的Foo对象。 我的第一想法是按照createDate进行排序,并通过0.1 * count来限制,但是没有limit函数。

你有什么想法吗?

1个回答

16

假设Foo是一个类似于这样的案例类:

import java.sql.Date
case class Foo(name: String, createDate: java.sql.Date)
  1. 使用普通的RDD:

  2. import org.apache.spark.rdd.RDD
    import scala.math.Ordering
    
    val rdd: RDD[Foo] = sc
      .parallelize(Seq(
        ("a", "2015-01-03"), ("b", "2014-11-04"), ("a", "2016-08-10"),
        ("a", "2013-11-11"), ("a", "2015-06-19"), ("a", "2009-11-23")))
      .toDF("name", "createDate")
      .withColumn("createDate", $"createDate".cast("date"))
      .as[Foo].rdd
    
    rdd.cache()
    val  n = scala.math.ceil(0.1 * rdd.count).toInt
    
    • 数据适合驱动器内存:

      • 且您需要的比例相对较小

    rdd.takeOrdered(n)(Ordering.by[Foo, Long](_.createDate.getTime))
    // Array[Foo] = Array(Foo(a,2009-11-23))
    
  3. 您想要的分数相对较大:

    rdd.sortBy(_.createDate.getTime).take(n)
    
  4. 否则

    rdd
      .sortBy(_.createDate.getTime)
      .zipWithIndex
      .filter{case (_, idx) => idx < n}
      .keys
    
  5. 使用DataFrame(注意 - 这实际上在性能方面并不是最优的,因为存在限制行为)。

  6. import org.apache.spark.sql.Row
    
    val topN = rdd.toDF.orderBy($"createDate").limit(n)
    topN.show
    
    // +----+----------+
    // |name|createDate|
    // +----+----------+
    // |   a|2009-11-23|
    // +----+----------+
    
    
    // Optionally recreate RDD[Foo]
    topN.map{case Row(name: String, date: Date) => Foo(name, date)} 
    

2
你好zero323,能否快速告诉我为什么DataFrame在limit操作上的性能不佳?在实现方面与RDD中的top有何区别?@zero333 - Xinwei Liu
@XinweiLiu 我已经回答了你的问题,希望可以解释清楚正在发生的事情。 - zero323
1
非常好的回答@zero323。但我仍然有和刘新伟一样的问题。为什么df.limit()很慢? - guilhermecgs
1
@zero323 请详细说明为什么 df.limit(n) 操作很慢。谢谢。 - BushMinusZero
我认为在这种情况下使用.zipWithIndex不安全。也许zipWithUniqueId更安全?哦,还有为什么limit很慢? - Wonay
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接