我有一个Spark Structured Streaming 作业,它从Kafka主题读取偏移量,并将其写入aerospike数据库。目前我正在将这个工作变得可生产并实现SparkListener
。
在阅读文档时,我偶然发现了这个例子:
StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination();
After this code is executed, the streaming computation will have
started in the background. The query object is a handle to that active
streaming query, and we have decided to wait for the termination of
the query using awaitTermination() to prevent the process from exiting
while the query is active.
我知道它会在查询完成之前等待终止进程。
这准确地意味着什么?它有助于避免由查询编写的数据丢失。
当查询每天写入数百万条记录时,它如何帮助?
虽然我的代码看起来很简单:
dataset.writeStream()
.option("startingOffsets", "earliest")
.outputMode(OutputMode.Append())
.format("console")
.foreach(sink)
.trigger(Trigger.ProcessingTime(triggerInterval))
.option("checkpointLocation", checkpointLocation)
.start();