我有一个IObservable<IObservable<T>>
,其中每个内部的IObservable<T>
是一系列值后面跟随一个OnCompleted
事件的流。
我想将其转换为IObservable<IEnumerable<T>>
,即由任何未完成的内部流的最新值组成的流。每当一个内部流产生新值(或内部流过期)时,它应该生成一个新的IEnumerable<T>
。
可以用大理石图表来展示它(我希望这足够全面):
input ---.----.---.----------------
| | '-f-----g-|
| 'd------e---------|
'a--b----c-----|
result ---a--b-b--c-c-c-e-e-e---[]-
d d d e f g
f f
([]
是一个空的 IEnumerable<T>
,-|
表示 OnCompleted)
你可以看到它有点类似于 CombineLatest
操作。
我一直在尝试使用 Join
和 GroupJoin
,但都没有成功,但我感到这几乎肯定是正确的方向。
我希望在此操作符中尽可能少地使用状态。
更新
我已经更新了这个问题,不仅包括单值序列 - 产生的 IObservable<IEnumerable<T>>
应该仅包括每个序列的最新值 - 如果一个序列没有产生值,则不应包括它。