Java中SparkContext的parallelize调用示例

3
我正在学习Spark,并尝试实现map函数的简单示例时遇到问题。问题出在新版本Spark中“parallelize”的定义上。有人可以分享如何使用它的示例吗?因为以下方式会报错,提示参数不足。
Spark版本:2.3.2 Java版本:1.8
SparkSession session = SparkSession.builder().appName("Compute Square of Numbers").config("spark.master","local").getOrCreate();
SparkContext context = session.sparkContext();
List<Integer> seqNumList = IntStream.rangeClosed(10, 20).boxed().collect(Collectors.toList());
JavaRDD<Integer> numRDD = context.parallelize(seqNumList, 2);

编译时错误信息:该方法需要3个参数。 我不知道第三个参数应该是什么样的?根据文档,它应该是
scala.reflect.ClassTag<T>

但是如何定义或使用它呢?

请不要建议使用JavaSparkContext,因为我想知道如何使用通用的SparkContext来使这种方法工作。

参考:https://spark.apache.org/docs/2.2.1/api/java/org/apache/spark/SparkContext.html#parallelize-scala.collection.Seq-int-scala.reflect.ClassTag-


有什么问题吗?还是我们要猜测一下? - UninformedUser
你有读过JavaDoc吗?这个方法需要三个参数:https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/SparkContext.html#parallelize-scala.collection.Seq-int-scala.reflect.ClassTag- - UninformedUser
1
scala.reflect.ClassTag$.MODULE$.apply(Integer.class); - UninformedUser
并且它还需要一个Scala序列:JavaConversions.asScalaBuffer(seqNumList) - 老实说,我不知道你为什么要在Java中调用这个方法...这很奇怪。 - UninformedUser
@AKSW 感谢您的 ClassTag 评论。这正是我所需要的。我知道这不是最佳方式,但我想了解如何使用 SparkContext 进行并行化处理,因为这是访问 Spark 处理的新统一方式。或者您建议仍然回退到 JavaSparkContext 来完成这些任务吗? - Saurabh Mishra
@AKSW 请更新问题并提供更多细节。我正在添加最终的工作代码细节,请尽管提出更好的实现方式。 - Saurabh Mishra
2个回答

3

这里是最终对我有用的代码。虽然不是实现结果的最佳方式,但对于我来说是探索API的一种方式。

SparkSession session = SparkSession.builder().appName("计算数字的平方") .config("spark.master", "local").getOrCreate();

SparkContext context = session.sparkContext();

List<Integer> seqNumList = IntStream.rangeClosed(10, 20).boxed().collect(Collectors.toList());


RDD<Integer> numRDD = context
        .parallelize(JavaConverters.asScalaIteratorConverter(seqNumList.iterator()).asScala()
                .toSeq(), 2, scala.reflect.ClassTag$.MODULE$.apply(Integer.class));


numRDD.toJavaRDD().foreach(x -> System.out.println(x));
session.stop();

0

如果您不想使用 sparkConext 提供额外的两个参数,您也可以使用 JavaSparkContext.parallelize(),它只需要一个输入列表:

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
final RDD<Integer> rdd = jsc.parallelize(seqNumList).map(num -> {
    // your implementation
}).rdd();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接