我试图找到一种方法来基于另一个可观测对象的值对其进行筛选。例如,假设我们只想接收x和y时间之间的事件,是否可以根据计时器的值来筛选可观测对象?
我试图找到一种方法来基于另一个可观测对象的值对其进行筛选。例如,假设我们只想接收x和y时间之间的事件,是否可以根据计时器的值来筛选可观测对象?
有几种方法。如果没有一些代码,很难知道哪种方法最好。
通过CombineLatest(始终监听并根据最新值进行过滤):
var astream = ...;
var bstream = ...;
var filtered = Observable.CombineLatest(astream, bstream, (a, b) => new { a, b })
.Where(v => v.b >= x && v.b <= y)
.Select(v => v.a); // Alas sometimes you will get duplicate a's.
通过选择和切换(仅在bstream
满足某些条件时才侦听astream
):
var astream = ...;
var bstream = ...;
var filtered = bstream
.Select(b => (b >= x && b <= y) ? astream : Observable.Empty<T>())
.Switch();
Observable.Join
的反应式连接是一个非常通用的工具,但Rx内置的运算符中有相当一部分是以一种支持基于另一个流进行过滤的方式组合流的。Select
+ Switch
技术(+1); 我已经把它存储起来以备将来使用!Select
+ Switch
具有一些优点,包括:
OnCompleted()
,而不会像源流一样持续很长时间。Observable.Window
运算符的特定重载,但我会逐步解释它。var xs = Observable.Interval(TimeSpan.FromSeconds(1));
var startTime = DateTime.Now + TimeSpan.FromSeconds(5);
var endTime = DateTime.Now + TimeSpan.FromSeconds(8);
为了简洁起见,我没有检查startTime
是否在endTime
之前。现在我们创建一个流来打开窗口,和一个流来关闭窗口:
var start = Observable.Timer(startTime);
var end = Observable.Timer(endTime);
Observable.Window
过滤源流。此运算符的输出是一系列流(IObservable<IObservable<T>>
) - 每个子流都是一个新窗口。Observable.Merge
展开流的流:var filtered = xs.Window(start, _ => end).Merge();
filtered.Subscribe(Console.WriteLine);
。
4
5
6
同样的,解决这个问题的方法有很多种,并不仅限于使用 Window
。例如,您还可以轻松地扩展此解决方案以支持多窗口(通过使用打开时间流和关闭时间流工厂)。
Select
+Switch
的想法! - James Worldfil.map(function(x) { if(x) { return input; } else { return Rx.Observable.empty(); } }).switch().subscribe(function(x) { print(x); })
'这段代码在输入递增整数时不会打印任何内容? - user2269972filter
呢?input.filter(function (i) { return (i % 3) == 0; })
- Brandon