立即获取 Observable 的最新值并立即发出它

34
我希望获取给定Observable的最新值并在调用时立即发射它。以下面的代码为例:
return Observable.just(myObservable.last())
    .flatMap(myObservable1 -> {
        return myObservable1;
    })
    .map(o -> o.x) // Here I want to end up with a T object instead of Observable<T> object

这样做是行不通的,因为flatMap会发出myObservable1,而myObservable1必须要发出才能到达map。我不知道是否有可能做到这一点。有没有人知道如何实现这个目标?谢谢。

你在谈论哪个可观察对象?你所说的“latest”是什么意思? - akarnokd
myObservable 是一个热可观察对象,它会在不规则的时间间隔内发出例如 "1"、"2"、"3" 等值。我想要做的是,在到达 return 指令时能够获取 myObservable 的最新值(在本例中为 "3")。 - E-Kami
我的可观察对象是热的还是冷的? - yevt
1
似乎你还在以命令式(而非响应式)的方式思考。你能提供更大的视角吗?关键是通常情况下,您会基于其他可观察对象创建可观察对象。您可以基于myObservable创建lastObservable,它将被阻塞直到myObservable结束,然后发出最后一个值。除了像subscribe这样的端点之外,没有任何方法可以从这些流“管道”中走出去。 - yevt
1
我曾在RxJava Google Group上问过类似的问题 - 也许Ben在那里的回答对你有帮助:https://groups.google.com/forum/#!searchin/rxjava/mihola/rxjava/I4CxPoU5rpE/v44DIj3Q2RAJ - david.mihola
startsWith 是默认值,而 replay(1) + connect 不起作用吗? - Vincent D.
3个回答

36

last() 方法在这里没有任何帮助,因为它等待 Observable 终止才给出最后一个发出的项目。

假设您无法控制正在发出数据的 Observable,可以简单地创建一个 BehaviorSubject 并订阅该主题以侦听发出所需数据的 Observable,然后订阅创建的 Subject。由于 Subject 既是 Observable 又是 Subscriber,因此您将得到所需内容。

我认为(现在没有时间检查)您可能需要手动取消订阅原始 Observable,因为一旦 BehaviorSubject 的所有订阅者都取消订阅,它不会自动取消订阅。

类似以下代码:

BehaviorSubject subject = new BehaviorSubject();
hotObservable.subscribe(subject);
subject.subscribe(thing -> {
    // Here just after subscribing 
    // you will receive the last emitted item, if there was any.
    // You can also always supply the first item to the behavior subject
});

http://reactivex.io/RxJava/javadoc/rx/subjects/BehaviorSubject.html


3
在RxJava中,subscriber.onXXX是异步调用的。这意味着,如果你的Observable在新线程中发出项目,你在返回之前永远无法得到最后一个项目,除非你阻塞线程并等待该项。但是,如果Observable同步地发出项目,而且你没有通过subscribeOn和observOn更改它的线程,就像下面的代码一样:
Observable.just(1,2,3).subscribe();

在这种情况下,你可以通过以下方式获取最后一项:
Integer getLast(Observable<Integer> o){
    final int[] ret = new int[1];
    Observable.last().subscribe(i -> ret[0] = i);
    return ret[0];
}

这样做是不好的想法。RxJava更希望你通过它来进行异步工作。


1
你实际想要做的是将异步任务转换为同步任务。有几种方法可以实现,每种方法都有其优缺点:
- 使用toBlocking() - 这意味着该线程将被阻塞,直到流完成,为了仅获取一个项目,只需使用first(),因为它在传递一个项目后就会完成。 假设你的整个流是Observable<T> getData(); 那么立即获取最后一个值的方法如下所示:

public T getLastItem(){ return getData().toBlocking().first(); }

请不要使用last(),因为它将等待流完成,然后才发出最后一个项目。

如果您的流是一个网络请求,并且它还没有获取任何项目,则会阻塞您的线程!因此,仅在确定立即可用项目时使用它(或者如果您确实需要阻塞...)。
另一个选择是简单地缓存最后一次结果,类似于这样:
getData().subscribe(t-> cachedT = t;) //某处代码中将保存最后提供的项目 public T getLastItem(){ return cachedT; }
如果在您请求它时尚未发送任何项目,则会获得null或您设置的任何初始值。这种方法的问题是如果在两个不同的线程中使用,订阅阶段可能会发生在获取之后,从而引发竞争条件。

我在这里遇到了代码问题;getLastItem应用于哪个类,getData()函数的细节是什么? - TahoeWolverine
GetData只是强调你正在使用的流。 你可以用你正在使用的observable替换它,比如在原始示例中使用myObservableObservable<T> myObservable;然后myObservable.toBlocking().first()将在第一个项目到达时给你第一个项目。 - ndori

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接