将JavaDStream<String>转换为JavaRDD<String>

4

我有一个JavaDStream,从外部数据源获取数据。我试图集成Spark Streaming和SparkSQL。众所周知,JavaDStream由JavaRDD组成。我只能在拥有JavaRDD时应用applySchema()函数。请帮我将其转换为JavaRDD。我知道Scala中有函数,而且更容易。但请在Java中帮助我。

3个回答

5

谢谢。帮了我很多! - Navin Ahmed

1
你需要首先使用forEachRDD访问DStream中的所有RDD,如下所示:
javaDStream.foreachRDD( rdd => {
    rdd.collect.foreach({
        ...
    })
})

0

希望这能帮助将JavaDstream转换为JavaRDD!

    JavaDStream<String> lines = stream.map(ConsumerRecord::value);

    //Create JavaRDD<Row>
    lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
        @Override
        public void call(JavaRDD<String> rdd) {
            JavaRDD<Row> rowRDD = rdd.map(new Function<String, Row>() {
                @Override
                public Row call(String msg) {
                    Row row = RowFactory.create(msg);
                    return row;
                }
            });
            //Create Schema
            StructType schema = DataTypes.createStructType(new StructField[] {
                    DataTypes.createStructField("value", DataTypes.StringType, true)});
            //Get Spark 2.0 session
            SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
            Dataset msgDataFrame = spark.createDataFrame(rowRDD, schema);
            msgDataFrame.show();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接