在Spark中将流式数据集附加到批处理数据集

4
我们在Spark中有一个使用案例,需要将历史数据从数据库加载到Spark,并继续添加新的流式数据到Spark,然后我们可以对整个最新数据集进行分析。
据我所知,无论是Spark SQL还是Spark Streaming都无法将历史数据和流式数据相结合。然后我发现了Spark 2.0中的Structured Streaming似乎就是为解决这个问题而构建的。但经过一些实验,我仍然无法搞清楚。以下是我的代码:
SparkSession spark = SparkSession
        .builder()
        .config(conf)
        .getOrCreate();

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

// Load historical data from MongoDB
JavaMongoRDD<Document> mongordd = MongoSpark.load(jsc);


// Create typed dataset with customized schema
JavaRDD<JavaRecordForSingleTick> rdd = mongordd.flatMap(new FlatMapFunction<Document, JavaRecordForSingleTick>() {...});
Dataset<Row> df = spark.sqlContext().createDataFrame(rdd, JavaRecordForSingleTick.class);
Dataset<JavaRecordForSingleTick> df1 = df.as(ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));


// ds listens to a streaming data source
Dataset<Row> ds = spark.readStream()
        .format("socket")
        .option("host", "127.0.0.1")
        .option("port", 11111)
        .load();

// Create the typed dataset with customized schema
Dataset<JavaRecordForSingleTick> ds1 = ds
        .as(Encoders.STRING())
        .flatMap(new FlatMapFunction<String, JavaRecordForSingleTick>() {
    @Override
    public Iterator<JavaRecordForSingleTick> call(String str) throws Exception {
    ...
    }
}, ExpressionEncoder.javaBean(JavaRecordForSingleTick.class));


// ds1 and df1 have the same schema. ds1 gets data from the streaming data source, df1 is the dataset with historical data

ds1 = ds1.union(df1);
StreamingQuery query = ds1.writeStream().format("console").start();
query.awaitTermination();

当我将两个数据集union()起来时,出现了错误信息"org.apache.spark.sql.AnalysisException: Union between streaming and batch DataFrames/Datasets is not supported;"。请问有人能帮忙解决吗?是我方向错了吗?


Spark 2.0中的Structured Streaming处于Alpha版本 - 许多功能尚未得到支持。我想知道是否可以改用有状态流处理。在有状态流处理中,您可以使用历史数据引导状态,然后以所需方式附加流数据。有关详细信息,请参见此Databrick博客文章 - Glennie Helles Sindholt
@GlennieHellesSindholt Hi Glennie,感谢您的建议。我认为mapWithState()最适合用于用新的流数据替换/更新当前状态(键值对)。在我的用例中,我的RDD没有键值对,并且不需要更新旧数据。使用mapWithState()是否过度了一些? - Xiao Tan
我同意如果没有任何形式的聚合,mapWithState不是显而易见的选择,但是如果你不需要历史数据,为什么要在流中保留它呢? - Glennie Helles Sindholt
1个回答

1

就支持这种功能而言,我不能代表MongoDB的火花连接器发言,在Google上也似乎没有太多相关信息。但是,Spark数据库生态系统中有其他数据库可以实现此功能。我在另一个答案中介绍了大部分Spark数据库生态系统中的内容。虽然我不知道哪个数据库可以轻松地实现您正在寻找的功能,但我知道SnappyDataMemSQL都在该列表中。但是,您可能需要将数据转换为关系形式。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接