Flink: 按键进行有状态流处理

4
我有一串数据,例如带有ID的JSON记录。
我想处理这些数据,使得所有具有相同键的记录被同一个有状态任务处理。
我该怎么做?
1个回答

4
这可以通过对KeyedStream使用有状态的操作符来完成。 KeyedStream将所有记录根据键进行分区,并确保具有相同键的所有记录都进入同一个操作符实例并与相同状态交互。
在代码中,看起来像这样:
val stream: DataStream[(String, Long)] = ???
val sumByKey: DataStream[(String, Long)] = stream
  .keyBy(_._1) // key on the first attribute
  .map(new SumMapper())

class SumMapper extends RichMapFunction[(String, Long), (String, Long)] {

  var sumState: ValueState[Long] = _

  override def open(config: Configuration) {
    // configure state
    val sumDesc: ValueStateDescriptor[Long] =
      new ValueStateDescriptor[Long]("sum", classOf[Long])
    sumState = getRuntimeContext.getState(sumDesc)
  }

  override def map(in: (String, Long)): (String, Long) = {
    val sum = sumState.value() // get current sum from state
    val newSum = sum + in._2   // compute new sum
    sumState.update(newSum)    // update state
    (in._1, newSum)            // emit result
  }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接