如何在一天内计算从 Kafka 主题中获取的消息数量?

4

我正在从Kafka主题中获取数据,并将其存储在Deltalake(parquet)格式中。我想要找出特定一天获取的消息数量

我的思路: 我打算使用Spark读取以parquet格式存储的数据所在的目录,并对特定一天的带有“.parquet”扩展名的文件应用计数。这将返回一个计数,但我并不确定这是否是正确的方法。

这种方式正确吗?还有其他方法可以计算从Kafka主题获取的特定一天(或持续时间)的消息数量吗?

3个回答

0
您可以利用Delta Lake提供的“时间旅行”功能。
在您的情况下,您可以这样做:
// define location of delta table
val deltaPath = "file:///tmp/delta/table"

// travel back in time to the start and end of the day using the option 'timestampAsOf'
val countStart = spark.read.format("delta").option("timestampAsOf", "2021-04-19 00:00:00").load(deltaPath).count()
val countEnd = spark.read.format("delta").option("timestampAsOf", "2021-04-19 23:59:59").load(deltaPath).count()

// print out the number of messages stored in Delta Table within one day
println(countEnd - countStart)

请查看查询表的旧快照(时间旅行)的文档。


0

我们从主题中消费的消息不仅具有键值,还具有其他信息,例如时间戳

这可以用于跟踪消费者流程。

时间戳 时间戳由代理或生产者根据主题配置更新。如果主题配置的时间戳类型是CREATE_TIME,则代理将使用生产者记录中的时间戳,而如果主题配置为LOG_APPEND_TIME,则在附加记录时代理将使用代理本地时间覆盖时间戳。

  1. 因此,如果您在任何地方存储时间戳,您可以很好地跟踪每天或每小时的消息速率。

  2. 另一种方法是使用一些Kafka仪表板,如Confluent Control Center(许可价格)或Grafana(免费),或任何其他工具来跟踪消息流。

  3. 在我们的情况下,当我们消费消息并存储或处理消息时,我们还会将消息的元数据路由到Elastic Search,并且我们可以通过Kibana进行可视化。


0

另一种检索信息的方法,而不是计算两个版本之间的行数,是使用Delta表历史记录。这样做有几个优点-您不需要读取整个数据集,还可以考虑更新和删除,例如如果您正在执行MERGE操作(无法通过比较不同版本上的.count来完成,因为更新将替换实际值,或者删除该行)。

例如,对于仅追加的情况,以下代码将计算由正常append操作编写的所有插入行的数量(对于其他事情,如MERGE / UPDATE / DELETE,我们可能需要查看其他指标):

from delta.tables import *

df = DeltaTable.forName(spark, "ml_versioning.airbnb").history()\
  .filter("timestamp > 'begin_of_day' and timestamp < 'end_of_day'")\
  .selectExpr("cast(nvl(element_at(operationMetrics, 'numOutputRows'), '0') as long) as rows")\
  .groupBy().sum()

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接