如何根据另一个值过滤Observable?

3

我试图找到一种方法来基于另一个可观测对象的值对其进行筛选。例如,假设我们只想接收x和y时间之间的事件,是否可以根据计时器的值来筛选可观测对象?

2个回答

4

有几种方法。如果没有一些代码,很难知道哪种方法最好。

通过CombineLatest(始终监听并根据最新值进行过滤):

var astream = ...;
var bstream = ...;
var filtered = Observable.CombineLatest(astream, bstream, (a, b) => new { a, b })
          .Where(v => v.b >= x && v.b <= y)
          .Select(v => v.a); // Alas sometimes you will get duplicate a's.

通过选择和切换(仅在bstream满足某些条件时才侦听astream):

var astream = ...;
var bstream = ...;
var filtered = bstream
    .Select(b => (b >= x && b <= y) ? astream : Observable.Empty<T>())
    .Switch();

我非常喜欢 Select + Switch 的想法! - James World
非常感谢,我也非常喜欢选择+切换的想法!我想投票,但是声望不够。 - user2269972
所以我尝试了选择+切换的想法,但遇到了一个问题,我认为切换方法忽略了最新的值,只是等待下一个。解释一下:` var fil = input.map(function(x) { return (x % 3) == 0; }); fil.map(function(x) { if(x) { return input; } else { return Rx.Observable.empty(); } }).switch().subscribe(function(x) { print(x); })'这段代码在输入递增整数时不会打印任何内容? - user2269972
针对您刚才描述的情况,为什么不直接使用 filter 呢?input.filter(function (i) { return (i % 3) == 0; }) - Brandon

1
Brandon所提到的,有许多方法可以组合事件流。
使用Observable.Join的反应式连接是一个非常通用的工具,但Rx内置的运算符中有相当一部分是以一种支持基于另一个流进行过滤的方式组合流的。
我真的很喜欢Brandon的Select + Switch技术(+1); 我已经把它存储起来以备将来使用!
这里有一种直接解决源流筛选开始和结束时间问题的方法。 它比Select + Switch具有一些优点,包括:
  • 它避免了对源流的每个事件检查过滤条件。
  • 它避免需要一个“空”流。
  • 它只订阅一次源。
  • 它在达到结束时间时立即发送OnCompleted(),而不会像源流一样持续很长时间。
它实际上归结为Observable.Window运算符的特定重载,但我会逐步解释它。
基本思路是通过应用在开始时间打开并在结束时间关闭的窗口来过滤源流。
首先,让我们创建一个示例源流(xs),它发出一秒钟的脉冲,以及一个开始时间和结束时间:
var xs = Observable.Interval(TimeSpan.FromSeconds(1));

var startTime = DateTime.Now + TimeSpan.FromSeconds(5);    
var endTime = DateTime.Now + TimeSpan.FromSeconds(8);

为了简洁起见,我没有检查startTime是否在endTime之前。现在我们创建一个流来打开窗口,和一个流来关闭窗口:

var start = Observable.Timer(startTime);
var end = Observable.Timer(endTime);

最后,使用Observable.Window过滤源流。此运算符的输出是一系列流(IObservable<IObservable<T>>) - 每个子流都是一个新窗口。
我们将使用的重载接受一个流,其事件标记了新窗口的开启,并提供一个工厂函数来提供给定触发窗口开启事件的关闭流。
使用我们的计时器流,我们知道在开始时间会创建一个窗口,并在结束时间关闭。
我们使用Observable.Merge展开流的流:
var filtered = xs.Window(start, _ => end).Merge();

如果我们这样订阅:
filtered.Subscribe(Console.WriteLine);

我们得到了如下输出,正如预期所示:


4
5
6

同样的,解决这个问题的方法有很多种,并不仅限于使用 Window。例如,您还可以轻松地扩展此解决方案以支持多窗口(通过使用打开时间流和关闭时间流工厂)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接