RxJava-缓存直到发射空闲一段时间?

5
我经常遇到一种模式,但我不太确定如何有效地解决它。 基本上,如果我有一个Observable持有一个昂贵的项目T,我不希望每次使用它时都重新构建该T项目,或者将其映射到数千个不同的其他observable中,这将导致它被构建1000次。
因此,我开始使用replay()将其缓存一段时间,但理想情况下,我希望在发射空闲一段时间后清除缓存。 有没有运算符或某个转换器可以实现这一点?
public final class ActionManager {

    private final Observable<ImmutableList<Action>> actionMap;

    private ActionManager() { 
        this.actionMap = Observable.defer(() -> buildExpensiveList()).replay(10, TimeUnit.SECONDS).autoConnect();
    }

    //this method could get called thousands of times
    //I don't want to rebuild the map for every call

    public Observable<Action> forItem(Item item) { 
        actionMap.map(l -> //get Action for item);
    }


}

更新

尝试将此实现到Transformer / Operator组合中。这里有什么我做错了吗?

   public static <T> Transformer<T,T> recacheOnIdle(long time, TimeUnit timeUnit) { 

        return obs -> obs.timeout(time, timeUnit).lift(new Operator<T,T>() {

            private volatile T cachedItem;
            private volatile boolean isCurrent = false;

            @Override
            public Subscriber<? super T> call(Subscriber<? super T> s) {
                return new Subscriber<T>(s) {
                    @Override
                    public void onCompleted() {
                         if(!s.isUnsubscribed()) {
                              s.onCompleted();
                         }
                    }
                    @Override
                    public void onError(Throwable e) {
                        if(!s.isUnsubscribed()) {
                            if (e instanceof TimeoutException) { 
                                isCurrent = false;
                                cachedItem = null;
                            } else { 
                                s.onError(e);
                            }
                        }
                    }

                    @Override
                    public void onNext(T t) {
                        if(!s.isUnsubscribed()) { 
                            if (!isCurrent) { 
                                cachedItem = t;
                            }
                            s.onNext(cachedItem);
                        }
                    } 
                };
            }

        });
    }

当时间过去后,重放不会清除缓存吗? - Reut Sharabani
是的,但我想在没有发射活动一段时间后清除缓存,而不是一个固定的窗口。 - tmn
2个回答

4
您可以使用超时运算符可连接的Observable(以持有并同步多个订阅者)

镜像源Observable,但如果在没有发出任何已发出项的特定时间段内,则发出错误通知

这样,您就可以通过重新缓存昂贵的项来响应引发的错误。假设这是一个“罕见”的情况:

// if no emissions are made for a period of 3 seconds - will call onError
observableWithCache.timeout(3000, TimeUnit.MILLISECONDS).subscribe(new Subscriber<SomeObject>() {

    public void onCompleted() {

    }

    public void onError(Throwable arg0) {
        doClearCache(); // make sure to re-subscribe with timeout
    }

    public void onNext(SomeObject item) {
        System.out.println("Got item: " + item); // you can ignore this
    }
});

请注意,onError不会取消原始的可观测对象,如图所示:

enter image description here

但是你可以在一段时间内没有发出任何事件时做出反应。

太棒了。我正在尝试实现一个Transformer/Operator组合来完成这个任务,并且我已经在上面更新了我的帖子。我对操作符不是很熟悉,当错误向上传递时,有些东西似乎并不完全正常。 - tmn

0

看看我为rxjava2创建的gist,它是一个自定义转换器,将在一定时间内保留缓存值。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接