如何在数据流中计算独特单词数?

11

在 Flink Streaming 中,有没有一种方法可以统计数据流中唯一单词的数量?结果应该是一个数字流,不断增加。

1个回答

8
您可以通过存储您已经看到的所有单词来解决问题。 有了这个知识,您可以过滤掉所有重复的单词。 剩余的可以通过具有并行性1的map运算符进行计数。 以下代码片段正是如此。
val env = StreamExecutionEnvironment.getExecutionEnvironment

val inputStream = env.fromElements("foo", "bar", "foobar", "bar", "barfoo", "foobar", "foo", "fo")

// filter words out which we have already seen
val uniqueWords = inputStream.keyBy(x => x).filterWithState{
  (word, seenWordsState: Option[Set[String]]) => seenWordsState match {
    case None => (true, Some(HashSet(word)))
    case Some(seenWords) => (!seenWords.contains(word), Some(seenWords + word))
  }
}

// count the number of incoming (first seen) words
val numberUniqueWords = uniqueWords.keyBy(x => 0).mapWithState{
  (word, counterState: Option[Int]) =>
    counterState match {
      case None => (1, Some(1))
      case Some(counter) => (counter + 1, Some(counter + 1))
    }
}.setParallelism(1)

numberUniqueWords.print();

env.execute()

如果传入的流是“无限”的,并且在filterWithState中的字符串集变得太大,它会导致OOM或性能下降吗? - Maxim
1
如果您使用支持离线的状态后端,则不需要担心此问题。RocksDBStateBackend 就是这样一种状态后端。如果您使用内存状态后端,则必须定期清除状态,否则可能会遇到 OOM 问题。 - Till Rohrmann
还有一个问题,我理解在使用RocksDBStateBackend后端的情况下,保存/恢复操作的复杂度是O(N),其中N是集合中元素的数量,即此后端是否始终保存/恢复Set的所有元素或仅更改的元素? - Maxim
2
此实现使用ValueState抽象,该抽象始终保存/恢复完整的Set。但是,可能还可以使用ListState抽象使检查点增量化。 - Till Rohrmann
嗨Till,filterWithState只在1.1版本中可用吗?我在Flink 1.0.0中找不到它。 - Jun
filterWithState 方法自 Flink 1.0.0 版本起成为 Scala API 的一部分。然而,它仅适用于 KeyedStreams。这意味着您必须先在流上调用 keyBy 方法。 - Till Rohrmann

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接