我遇到了一个问题,就是通过combineLatest操作符得到的组合中始终只能得到最新的值。
我有两个热流(a、b),它们以高频率生成事件(每个事件间隔为100ms):
Flowable<OrderBook> flowA = sourceA.getObservableOrderBook(value);
Flowable<OrderBook> flowB = sourceB.getObservableOrderBook(value);
结合使用 combineLatest,这需要近300毫秒才能完成其工作。
Flowable<OrderBookCouple> combined = Flowable.combineLatest(flowA, flowB, OrderBookCouple::new).observeOn(Schedulers.newThread());
combined.subscribe((bookCouple) -> {
System.out.println("A timestamp: " + bookCouple.aOrderBook.getTimeStamp());
System.out.println("B timestamp: " + bookCouple.bOrderBook.getTimeStamp());
Thread.sleep(300);
}
在执行合并器一次后,我想要处理生成的最后一个组合事件,也就是(lastA,lastB)。
合并流的默认行为是将所有事件的组合缓存在自己的缓冲区中,因此合并流接收到的组合事件非常旧,而且时间间隔越来越大。
我应该如何更改我的代码以禁用此缓冲区并始终接收最后一个组合?