为了测试流处理和Flink,我给自己提出了一个看似简单的问题。我的数据流包括粒子的
x
和y
坐标以及记录位置的时间t
。我的目标是为这些数据注释特定粒子的速度。因此,流可能看起来像这样。<timestamp:Long> <particle_id:String> <x:Double> <y:Double>
1612103771212 p1 0.0 0.0
1612103771212 p2 0.0 0.0
1612103771213 p1 0.1 0.1
1612103771213 p2 -0.1 -0.1
1612103771214 p1 0.1 0.2
1612103771214 p2 -0.1 -0.2
1612103771215 p1 0.2 0.2
1612103771215 p2 -0.2 -0.2
现在不能保证事件的顺序,例如1612103771213 p2 -0.1 -0.1
可能会在1612103771212 p2 0.0 0.0
之前达到10ms。
为简单起见,可以假设任何延迟数据都将在早期数据的100ms
内到达。
我承认我对流处理和Flink是新手,所以这可能是一个愚蠢的问题,并且有明显的答案,但我目前还想不出如何实现我的目标。
编辑
按照David的回答,我尝试使用Flink Table API对Datastream进行排序,使用nc-lk 9999
进行文本套接字流。问题在于,直到关闭文本套接字流之前,没有任何内容会打印到控制台。以下是我编写的Scala代码-
package processor
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{EnvironmentSettings, FieldExpression, WithOperations}
import org.apache.flink.util.Collector
import java.time.Duration
object AnnotateJob {
val OUT_OF_ORDER_NESS = 100
def main(args: Array[String]) {
// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val bSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = StreamTableEnvironment.create(env, bSettings)
env.setParallelism(1)
// Obtain the input data by connecting to the socket. Here you want to connect to the local 9999 port.
val text = env.socketTextStream("localhost", 9999)
val objStream = text
.filter( _.nonEmpty )
.map(new ParticleMapFunction)
val posStream = objStream
.assignTimestampsAndWatermarks(
WatermarkStrategy
.forBoundedOutOfOrderness[ParticlePos](Duration.ofMillis(OUT_OF_ORDER_NESS))
.withTimestampAssigner(new SerializableTimestampAssigner[ParticlePos] {
override def extractTimestamp(t: ParticlePos, l: Long): Long = t.t
})
)
val tablePos = tableEnv.fromDataStream(posStream, $"t".rowtime() as "et", $"t", $"name", $"x", $"y")
tableEnv.createTemporaryView("pos", tablePos)
val sorted = tableEnv.sqlQuery("SELECT t, name, x, y FROM pos ORDER BY et ASC")
val sortedPosStream = tableEnv.toAppendStream[ParticlePos](sorted)
// sortedPosStream.keyBy(pos => pos.name).process(new ValAnnotator)
sortedPosStream.print()
// execute program
env.execute()
}
case class ParticlePos(t : Long, name : String, x : Double, y : Double) extends Serializable
case class ParticlePosVal(t : Long, name : String, x : Double, y : Double,
var vx : Double = 0.0, var vy : Double = 0.0) extends Serializable
class ParticleMapFunction extends MapFunction[String, ParticlePos] {
override def map(t: String): ParticlePos = {
val parts = t.split("\\W+")
ParticlePos(parts(0).toLong, parts(1), parts(2).toDouble, parts(3).toDouble)
}
}
}