合并多个JavaRDD

3

我尝试合并多个JavaRDD,但只得到了两个已合并的,请有人好心帮忙。我一直在努力解决这个问题,但总体上我想能够获取多个集合并使用sqlContext创建一个组,并打印出所有结果。

以下是我的代码:

  JavaRDD<AppLog> logs =  mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.ppa_logs").union(
                              mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.fav_logs").union(
                                mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.pps_logs").union(
                                  mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.dd_logs").union(
                                    mapCollection(sc, "mongodb://hadoopUser:Pocup1ne9@localhost:27017/hbdata.ppt_logs")
                                  )
                                )
                              )
                          );


public JavaRDD<AppLog> mapCollection(JavaSparkContext sc ,String uri){

  Configuration mongodbConfig = new Configuration();
  mongodbConfig.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
  mongodbConfig.set("mongo.input.uri", uri);

  JavaPairRDD<Object, BSONObject> documents = sc.newAPIHadoopRDD(
      mongodbConfig,            // Configuration
      MongoInputFormat.class,   // InputFormat: read from a live cluster.
      Object.class,             // Key class
      BSONObject.class          // Value class
    );

    return documents.map(

      new Function<Tuple2<Object, BSONObject>, AppLog>() {

          public AppLog call(final Tuple2<Object, BSONObject> tuple) {
              AppLog log = new AppLog();
              BSONObject header =
                (BSONObject) tuple._2();

              log.setTarget((String) header.get("target"));
              log.setAction((String) header.get("action"));

              return log;
          }
      }
    );
}

// 打印集合 SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

(说明:该段代码为Java语言,用于Spark中的SQL操作。)
    DataFrame logsSchema = sqlContext.createDataFrame(logs, AppLog.class);
    logsSchema.registerTempTable("logs");

    DataFrame groupedMessages = sqlContext.sql(
      "select * from logs");
      // "select target, action, Count(*) from logs group by target, action");

      // "SELECT to, body FROM messages WHERE to = \"eric.bass@enron.com\"");



    groupedMessages.show();

    logsSchema.printSchema();

你如何识别只有两个RDD被合并?第二个问题:为什么你以递归的方式调用union(我知道它不是递归执行),而不是采用函数式写法?我的意思是rdd1.union(rdd2).union(rdd3)等等。 union的返回类型应该是一个RDD。在你的写作风格中-> mapCollection(sth1, sth2).union(mapCollection(sth1,sth2))等等。 - hasan
嗨,我使用了sqlcontext来打印rdd,但在运行代码后只显示了两个rdd。我已经更新了我的问题,这样你就可以看到我如何打印数据了。我使用了递归式的代码风格,因为我尝试了很多种方法,但都没有成功,这是我最接近成功的方式。有什么建议吗? - D.Asare
对我来说看起来是正确的。唯一让我困惑的是递归写作风格。也许你可以加载每个log rdd,稍后进行连接,然后打印计数或每个rdd的第一个元素,以查看它们是否为空。函数式写作风格将带来更好的可读性,这只是一个建议。 - hasan
1个回答

3
如果您想合并多个 JavaRDDs,只需使用 sc.union(rdd1,rdd2,..) 而不是 rdd1.union(rdd2).union(rdd3)
此外,请查看这个链接:RDD.union vs SparkContex.union

谢谢帮忙。看起来我仍然只得到了前两个RDD的连接,不太确定为什么。 - D.Asare
请有人来打我一巴掌。我注意到在我打印值的底部写着“仅显示前20行”。我已经在这上面工作了6-7个小时。 - D.Asare

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接