Apache Flink 会话支持

3
我正在调查Apache Flink Streaming,以用于我们的ETL和机器学习平台。我还没想清楚如何将事件流入“会话”中。更具体地说:所有事件都包含一个会话ID,为了丰富数据,我需要将属于同一会话的所有事件分组在一起。请注意,事件是持续流入的(因此在进行groupBy之后没有批处理支持)。
可能的解决方案之一是维护一个LRU缓存会话,并将所有传入的事件按其关联会话排序。然后,在每个会话X分钟不活动后,可以从缓存中“关闭”或删除该会话。问题是如何在多租户系统中处理此缓存; Flink是否具有分布式缓存的概念,或者它是否包含某种智能负载均衡器,其中事件被定向到网格中的同一分区?
更一般地说:使用流API建立会话支持的最佳方法(用例和陷阱)是什么?这真的可能吗?如何处理重放流?(即从特定时间点开始,事件流入不完整会话的事件(即在时间点之前的事件))
期待您的反馈、想法和/或指引。
提前感谢。
2个回答

5
我创建了一个示例,可能非常接近您需要的内容:https://gist.github.com/aljoscha/91b6422114eac814479f 我使用了一个 Tuple2 来模拟数据。 Integer 是会话 ID,而 String 是一些字段,我们将其用作分区键(partition)。
我建议您先查看 main() 方法,这里可以看到程序的流程。其他部分是自定义窗口定义 SessionWindow、窗口分配器和 SessionTrigger。这基本上实现了您建议的缓存概念。基于分配的窗口和键,窗口被保留在缓冲区中。一旦触发器触发,我们处理窗口并清空其中的内容。
当触发器接收到一个元素时,它会注册一个未来 10 秒的计时器。如果在此时间窗口内没有新元素到达,则触发器将触发。如果在此时间窗口内有新元素到达,它将注册一个新的计时器,这将替换旧计时器,因为触发器同一时间只能有一个活动计时器。
此外,它使用称为“处理时间窗口”的方法。这也可以更改为根据元素的“事件时间”(即时间戳)进行触发。

谢谢 @aljoscha - 那个例子非常有帮助。 - dpeacock
我一直在尝试那个例子,但似乎无法让会话触发器在基于事件时间的窗口中起作用(尽管对于处理时间窗口来说运行良好)。我将分叉你的gist - 这是相同的代码,但触发器注册+处理事件时间,并且我从输入中提取了时间戳。当运行时,所有内容都会进入同一个窗口(使用flink 1.0-SNAPSHOT)- 有任何想法为什么会发生这种情况吗? - dpeacock
我已经成功使用基于事件时间的窗口,通过使用EventTimeSourceFunction和collectWithTimestamp而不是assignTimestamps。 - dpeacock
我也应该使用assignTimestamps。你能否发布一下你的示例代码,看看它为什么不起作用? - aljoscha

0
会话可以使用EventTimeSessionWindows从事件流中提取。它将所有一起出现的事件组合成一个会话窗口,直到它们之间的间隔大于指定的值。如果流包含许多会话(可以通过每个事件中的sessionId进行标识),则应首先按会话ID分组,以便为每个会话单独保留会话窗口。
在下面的代码示例中,形式为的事件
case class Event(
    createdat: Timestamp,
    session: String
)

被转换为

case class SessionEvent(
    sessionId: String,
    start: Instant,
    end: Instant,
    `type`: String
)

SessionEvent 是在最后20分钟内没有事件(sessionTimeout)时发出的。

// Apache Flink 1.1.4 with Kafka connector

import java.util.Properties

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer09, FlinkKafkaProducer09}
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector

object Main extends App {

  val sessionTimeout = Time minutes 20
  val kafkaCluster = "localhost:9092"
  val inputTopic = "events"
  val outputTopic = "sessions"

  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val properties = new Properties
  properties.setProperty("bootstrap.servers", kafkaCluster)
  properties.setProperty("group.id", "sessions")

  val consumer = new FlinkKafkaConsumer09[String](inputTopic, new SimpleStringSchema, properties)
  val producer = new FlinkKafkaProducer09[String](kafkaCluster, outputTopic, new SimpleStringSchema)

  val stream =
    env
      .addSource(consumer)
      .map(Formats.readEvent _)
      .keyBy(_.session)
      .window(ProcessingTimeSessionWindows withGap sessionTimeout)
      .apply[SessionEvent] {
        (key: String, window: TimeWindow, values: Iterable[Event], out: Collector[SessionEvent]) ⇒
          val session =
            SessionEvent(
              key,
              values.head.createdat.toInstant,
              values.last.createdat.toInstant,
              "end"
            )
          out.collect(session)
      }
      .map(Formats.writeSessionEvent _)
      .addSink(producer)

  env.execute("sessions")
}

在提供代码之前,请先提供一些关于您的答案的描述。 - ravthiru

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接