反应式编程:IObservable 接口接收的最新值

8

我知道以下是阻塞调用,会返回可观察序列中的第一个值:

var result = myObservable.First();

根据我使用的主题类型不同,会产生不同的影响:

  • Subject - First() 将会阻塞,直到下一个 OnNext() 被调用,这意味着最终这将是最新的值
  • BehaviorSubject - First() 将会阻塞,直到至少有一个值通过 OnNext() 推送,因为 BehaviorSubject 跟踪最后一个值,这将是最新的值
  • ReplaySubject - First() 将会阻塞,直到至少有一个值通过 OnNext() 推送,但是在许多项已经通过 OnNext() 推送的情况下,它将是其缓冲区中的第一个而不是最后一个

现在我正在尝试找到一种一致的方法来获取最后一个值,无论使用哪个基础 Observable。

有任何想法吗?


顺便说一下,我相信BehaviorSubject不会阻塞 - 它将返回在创建时提供的默认值,直到新值被推送,然后它将返回新值。 - Lee Oades
2个回答

4

根据你之前关于 Rx 的问题,我认为你想要这个:

var rootSubject = new ReplaySubject<Types>();
var firstSubject = rootSubject.Where(x => x == Types.First);
var secondSubject = rootSubject.Where(x => x == Types.Second);
var thirdSubject = rootSubject.Where(x => x == Types.Third);
var forthSubject = rootSubject.Where(x => x == Types.Fourth);

var mergedSubject = Observable
              .Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
        .Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))    
        .Replay();

mergedSubject.Connect();

rootSubject.OnNext(Types.First);
rootSubject.OnNext(Types.Second);

var result = mergedSubject.First();

rootSubject.OnNext(Types.Third);
rootSubject.OnNext(Types.Fourth);

Console.WriteLine(String.Format("result - {0}", result));

现在不管使用什么样的Subject,它们都会返回"result - First"。

如果你想在调用mergedSubject.First()之前获取最新的值,你需要使用Replay(1):

var mergedSubject = Observable
                .Merge(firstSubject, secondSubject, thirdSubject, forthSubject)
                .Timeout(TimeSpan.FromSeconds(2), Observable.Return(Types.Error))    
                .Replay(1);

在这种情况下,所有主题类型都将返回“result - Second”。

3
听起来你正在寻找的是“重放”(在最新版本中,它与使用ReplaySubjectMulticast功能上等效):
IObservable<int> source = SomeHotObservable();

IConnectableObservable<int> sourceWithOneBufferedValue = source.Replay(1);

IDisposable subscription = sourceWithOneBufferedValue.Connect();

source.OnNext(5);

sourceWithOneBufferedValue.Subscribe(x => Console.WriteLine(x));

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接