我有几个结构化流配置在一个Spark会话中。我需要知道每个流中读取和写入了多少记录。
例如,我有以下两个流:
1. read-s3 -> transform -> write-s3 2. read-s3 -> transform -> write-db
我知道可以使用SparkListener().onTaskEnd()来获取信息,但此时我没有查询名称,而taskEnd.taskMetrics().outputMetrics().recordsWritten()始终为0,因此这不是一个选项。
另一种方法是在dataset.map()中使用累加器进行增量计算。但这只能得到要写入的记录数(如果sink未失败)。
除此之外,我尝试使用StreamingQueryListener(我用它来获取numInputRows),但我找不到任何关于写入记录数量的指标。
是否有可能获取此类指标?
例如,我有以下两个流:
1. read-s3 -> transform -> write-s3 2. read-s3 -> transform -> write-db
我知道可以使用SparkListener().onTaskEnd()来获取信息,但此时我没有查询名称,而taskEnd.taskMetrics().outputMetrics().recordsWritten()始终为0,因此这不是一个选项。
另一种方法是在dataset.map()中使用累加器进行增量计算。但这只能得到要写入的记录数(如果sink未失败)。
除此之外,我尝试使用StreamingQueryListener(我用它来获取numInputRows),但我找不到任何关于写入记录数量的指标。
是否有可能获取此类指标?