我知道默认的TTL设置为无限(非正数)。但是,如果我们需要在存储中保留数据最长2天,我们可以使用RocksDBConfigSetter接口实现覆盖吗?也就是说,options.setWalTtlSeconds(172800)?还是会与Kafka流内部发生冲突?
我知道默认的TTL设置为无限(非正数)。但是,如果我们需要在存储中保留数据最长2天,我们可以使用RocksDBConfigSetter接口实现覆盖吗?也就是说,options.setWalTtlSeconds(172800)?还是会与Kafka流内部发生冲突?
stream.groupByKey().windowedBy(...).reduce(...)
,其中TimeWindow
为1毫秒,"dummy" reduce只返回键的最新值。windowedBy()
函数,设置了1小时的时间窗口。但是当我检查磁盘使用情况时,/tmp/kafka-streams
文件夹不断增加。从5G到24小时内的20G。Rocksdb会多久删除旧数据?谢谢!
我的代码像这样.windowedBy(TimeWindows.of(Duration.ofMinutes(60)).grace(Duration.ZERO)) .reduce((event1, event2) -> event2)
- thinktwicereduce(..., Materialized.as(null).withRetention(...))
进行配置。 - Matthias J. Sax