reduceByKey Spark 维护顺序

3
我的输入数据集长这样:
id1, 10, v1
id2, 9, v2
id2, 34, v3
id1, 6, v4
id1, 12, v5
id2, 2, v6

我希望你能够输出内容

id1; 6,v4 | 10,v1 | 12,v5
id2; 2,v6 | 9,v2 | 34,v3

这是这样的:

id1: array[num(i),value(i)] where num(i) should be sorted

我尝试过以下方法:
  • 将id和第二列作为键,使用sortByKey排序。但由于它是一个字符串,所以排序并不像整数那样进行,而是按照字符串的方式。

  • 将第二列作为键,使用sortByKey排序,然后在值中获取id、键和第二列。使用reduceByKey时,顺序不会被保留。即使使用groupByKey也无法保持顺序。实际上这是预期的。

如果有任何帮助,将不胜感激。
1个回答

8

由于您没有提供有关输入类型的任何信息,我假设它是 RDD[(String, Int, String)]:

val rdd = sc.parallelize(
    ("id1", 10, "v1") :: ("id2", 9, "v2") ::
    ("id2", 34, "v3") :: ("id1", 6, "v4") :: 
    ("id1", 12, "v5") :: ("id2", 2, "v6") :: Nil)

rdd
  .map{case (id, x, y) => (id, (x, y))}
  .groupByKey
  .mapValues(iter => iter.toList.sortBy(_._1))
  .sortByKey() // Optional if you want id1 before id2

编辑:

要获得您在评论中描述的输出,您可以将传递给mapValues的函数替换为类似以下内容的函数:

def process(iter: Iterable[(Int, String)]): String = {
  iter.toList
      .sortBy(_._1)
      .map{case (x, y) => s"$x,$y"}
      .mkString("|")
}

非常感谢 @zero323 - user2200660
非常感谢@zero323。结果rdd是""RDD[(String, List[(Int, String)])]""。您能告诉我如何将此RDD转换为""RDD[(String, String)]""吗?所以基本上您的输出是""(id1,List((6,v4), (10,v1), (12,v5)))"",但我需要""(id1; 6,v4 | 10,v1 | 12,v5)""。 - user2200660

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接