定期更新的静态数据集结构化流处理

9
将流数据与静态数据集合并是结构化流的一个很好的特性。但是每个批处理都会从数据源刷新数据集。由于这些数据源并不总是那么动态,为了提高性能,缓存静态数据集以供一定时间(或批次数)使用是可行的。 在指定的时间或批次数之后,重新加载数据集,否则从缓存中检索。
在Spark Streaming中,我通过缓存数据集并在指定的批处理运行数量之后取消持久化来实现此目的,但出于某种原因,在结构化流中这种方式不再起作用。
有没有建议如何使用结构化流实现这一点?

1
你能展示一下使用Structured Streaming的代码吗?我想到了使用mapGroupsWithState来处理刷新静态数据集的自定义逻辑。 - Paul Leclercq
你对mapGroupsWithState有什么想法?我在想这是否是一种有效的方式,因为mapGroupsWithState是在记录级别上操作的(而不是整个dataframe/dataset)。在一个批次中,mapGroupsWithState可能会运行多次。使用Spark Streaming时,我通过使用批次计数器并在计数器达到阈值时刷新(缓存/持久化)数据集来管理此问题。但是,在结构化流处理中,批次计数器不再起作用了(我猜增量器只运行一次,而不是在每个批次上运行)。 - Chris
1
关于mapGroupsWithState,你说得对。我会创建一个自定义的sink,以便在每个X批次(基于batchId)的addBatch方法中刷新我的数据集。希望这个例子能够帮到你:https://github.com/polomarcus/Spark-Structured-Streaming-Examples/blob/master/src/main/scala/cassandra/StreamSinkProvider/CassandraSink.scala#L39 - Paul Leclercq
有趣的例子!但是自定义Sink是唯一的方法吗?Sink是终止点,刷新静态数据集也需要在Sink中进行数据处理(例如与静态数据集的连接、过滤等)。也许您想在连接静态数据集后使用mapsGroupsWithState添加状态。我猜如果您还想进行数据处理,Sink并不是最好的地方。 - Chris
1个回答

0

我已经为另一个问题开发了一个解决方案Stream-Static Join: How to refresh (unpersist/persist) static Dataframe periodically,这个解决方案可能也有助于解决你的问题:

你可以利用Structured Streaming提供的流调度功能来触发静态Dataframe的刷新(取消持久化 -> 加载 -> 持久化)。具体步骤如下:

  1. 首先加载静态Dataframe并保存为var
  2. 定义一个刷新静态Dataframe的方法
  3. 使用一个"Rate"流,在所需的时间间隔(例如1小时)触发刷新操作
  4. 读取实际的流数据,并与静态Dataframe进行连接操作
  5. 在Rate流中使用foreachBatch接收器调用刷新方法

以下代码在Spark 3.0.1、Scala 2.12.10和Delta 0.7.0上运行良好。

  // 1. Load the staticDataframe initially and keep as `var`
  var staticDf = spark.read.format("delta").load(deltaPath)
  staticDf.persist()

  //  2. Define a method that refreshes the static Dataframe
  def foreachBatchMethod[T](batchDf: Dataset[T], batchId: Long) = {
    staticDf.unpersist()
    staticDf = spark.read.format("delta").load(deltaPath)
    staticDf.persist()
    println(s"${Calendar.getInstance().getTime}: Refreshing static Dataframe from DeltaLake")
  }

  // 3. Use a "Rate" Stream that gets triggered at the required interval (e.g. 1 hour)
  val staticRefreshStream = spark.readStream
    .format("rate")
    .option("rowsPerSecond", 1)
    .option("numPartitions", 1)
    .load()
    .selectExpr("CAST(value as LONG) as trigger")
    .as[Long]

  // 4. Read actual streaming data and perform join operation with static Dataframe
  // As an example I used Kafka as a streaming source
  val streamingDf = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "test")
    .option("startingOffsets", "earliest")
    .option("failOnDataLoss", "false")
    .load()
    .selectExpr("CAST(value AS STRING) as id", "offset as streamingField")

  val joinDf = streamingDf.join(staticDf, "id")

  val query = joinDf.writeStream
    .format("console")
    .option("truncate", false)
    .option("checkpointLocation", "/path/to/sparkCheckpoint")
    .start()

  // 5. Within that Rate Stream have a `foreachBatch` sink that calls refresher method
  staticRefreshStream.writeStream
    .outputMode("append")
    .foreachBatch(foreachBatchMethod[Long] _)
    .queryName("RefreshStream")
    .trigger(Trigger.ProcessingTime("5 seconds"))
    .start()

为了有一个完整的例子,Delta表应该按照下面的方式创建:
  val deltaPath = "file:///tmp/delta/table"

  import spark.implicits._
  val df = Seq(
    (1L, "static1"),
    (2L, "static2")
  ).toDF("id", "deltaField")

  df.write
    .mode(SaveMode.Overwrite)
    .format("delta")
    .save(deltaPath)

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接