如何在Spark中打印特定RDD分区的元素?

12
如何只打印特定分区(例如第5个)的元素?
val distData = sc.parallelize(1 to 50, 10)
3个回答

11

使用 Spark/Scala:

val data = 1 to 50
val distData = sc.parallelize(data,10)
distData.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) =>it.toList.map(x => if (index ==5) {println(x)}).iterator).collect

生成:

26
27
28
29
30

它的 Python 等效代码是什么? - anwartheravian
不确定为什么,但当我尝试在spark-shell终端中执行上述命令时,输出只是一组空元素。有什么想法吗? - Saurabh Mishra
1
明白了。最后一条语句不应该简单地打印(x),这将不会将任何内容返回到输出RDD,因此在控制台上不会产生任何输出。相反,你可以简单地使用x : distData.mapPartitionsWithIndex((index: Int, it: Iterator[Int]) => it.toList.map(x => if (index == 5) {x}).iterator).collect。 - Saurabh Mishra
1
上面的答案输出将会完全由这行代码产生。试一下 distData.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) =>it.toList.map(x => if (index ==5) {x} else "S").iterator).filter(x=>x!="S").collect.foreach(x=>print(x+" ")) - Krish

4
假设您只是为了测试目的而这样做,那么请使用glom()。请参阅Spark文档:https://spark.apache.org/docs/1.6.0/api/python/pyspark.html#pyspark.RDD.glom
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> rdd.glom().collect()
[[1, 2], [3, 4]]
>>> rdd.glom().collect()[1]
[3, 4]

编辑:Scala示例:

scala> val distData = sc.parallelize(1 to 50, 10)
scala> distData.glom().collect()(4)
res2: Array[Int] = Array(21, 22, 23, 24, 25)

2

您可以使用计数器来对foreachPartition() API进行操作,以实现此目的。

以下是一个Java程序,用于打印每个分区的内容 JavaSparkContext context = new JavaSparkContext(conf);

    JavaRDD<Integer> myArray = context.parallelize(Arrays.asList(1,2,3,4,5,6,7,8,9));
    JavaRDD<Integer> partitionedArray = myArray.repartition(2);

    System.out.println("partitioned array size is " + partitionedArray.count());
    partitionedArray.foreachPartition(new VoidFunction<Iterator<Integer>>() {

        public void call(Iterator<Integer> arg0) throws Exception {

            while(arg0.hasNext()) {
                System.out.println(arg0.next());
            }

        }
    });

我已经尝试过'foreachPartition()',但它没有打印任何元素。 - Arnav

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接