禁用 Flowable/Observable 的缓冲区

4

我遇到了一个问题,就是通过combineLatest操作符得到的组合中始终只能得到最新的值。

我有两个热流(a、b),它们以高频率生成事件(每个事件间隔为100ms):

Flowable<OrderBook> flowA = sourceA.getObservableOrderBook(value);
Flowable<OrderBook> flowB = sourceB.getObservableOrderBook(value);

结合使用 combineLatest,这需要近300毫秒才能完成其工作。

Flowable<OrderBookCouple> combined = Flowable.combineLatest(flowA, flowB,        OrderBookCouple::new).observeOn(Schedulers.newThread());
combined.subscribe((bookCouple) -> {
                System.out.println("A timestamp: " + bookCouple.aOrderBook.getTimeStamp());
                System.out.println("B timestamp: " + bookCouple.bOrderBook.getTimeStamp());
                Thread.sleep(300);
            }

在执行合并器一次后,我想要处理生成的最后一个组合事件,也就是(lastA,lastB)。

合并流的默认行为是将所有事件的组合缓存在自己的缓冲区中,因此合并流接收到的组合事件非常旧,而且时间间隔越来越大。

我应该如何更改我的代码以禁用此缓冲区并始终接收最后一个组合?

1个回答

5
你可以在 flowAflowB 上应用 onBackpressureLatest ,并使用重载的 combineLatest ,该方法让你可以指定预取量。
Flowable.combineLatest(
    Arrays.asList(flowA.onBackpressureLatest(), flowB.onBackpressureLatest()),
    a -> new OrderBookCouple((OrderBook)a[0], (OrderBook)a[1]),
    1
)
.onBackpressureLatest()
.observeOn(Schedulers.newThread(), false, 1)

很不幸,没有同时使用BiFunction和bufferSize的重载方法,因此您必须将数组元素转换为类型。

编辑

应用第二个onBackpressureLatest并在observeOn上限制缓冲区大小可以让您更接近所需的模式,尽管combineLatest不适用于这种情况。您可能需要一些多样本操作符。


我已经在这里使用了一个简化的示例尝试了您的解决方案,似乎仍然存在缓冲问题。代码可在此处找到:链接 - Luca Ambrosini
2
更新了我的回答。 - akarnokd
这个已经可以了,我已经相应地更新了代码片段。谢谢。 - Luca Ambrosini

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接