如何在事件中根据条件完成Rx Observable

6

我有一个事件,我无法控制它,但它会提供给我一些数据。事件参数看起来像这样:

class MyEventArg {
  bool IsLastItem {get;}
  Data DataItem {get;}
}

我使用 Rx 来将此事件转换为 IObservable。但是如果 IsLastItem 为 true,我希望完成这个可观察对象。
有没有更优雅的想法?一种方法是通过一个 subject 来传递数据,以便更好地控制它,从而在条件出现时设置 OnComplete 事件...
3个回答

9
如果您想包括最后一个元素,可以将仅包含最后一个元素的流与常规流结合起来,然后使用TakeWhile组合。下面是一个简单的控制台应用程序来证明这一点:
var subject = new List<string>
{                            
"test",
"last"
}.ToObservable();

var my = subject
            .Where(x => x == "last").Take(1)
            .Merge(subject.TakeWhile(x => x != "last"));

my.Subscribe(
    o => Console.WriteLine("On Next: " + o), 
    () => Console.WriteLine("Completed"));

Console.ReadLine();

这将打印:

On Next: test
On Next: last
Completed

更新 如果底层Observable没有实际完成,会出现抑制OnCompleted消息的错误。我更正了代码以确保调用OnCompleted

如果您想避免为冷观察者多次订阅底层序列,则可以像这样重构代码:

var my = subject.Publish(p => p
            .Where(x => x == "last").Take(1)
            .Merge(p.TakeWhile(x => x != "last")));

不错!我花了几秒钟才看懂它是怎么做的。或许最好写成:subject.TakeWhile(x => x != "last").Merge(subject.Where(x => x == "last").Take(1)); 这个冷的可观察对象技巧非常有用。 - lukebuehler

3
public static IObservable<TSource> TakeWhileInclusive<TSource>(
        this IObservable<TSource> source, Func<TSource, bool> predicate)
{
    return Observable
        .Create<TSource>(o => source.Subscribe(x =>
                                                   {
                                                       o.OnNext(x);
                                                       if (!predicate(x))
                                                           o.OnCompleted();
                                                   },
                                               o.OnError,
                                               o.OnCompleted
                                  ));
}

2
你是否正在寻找这样的东西?
IObservable<MyEventArg> result =
    myEventArgObservable.TakeWhile(arg => !arg.IsLastItem);

哇,那看起来简单又好用。你知道如果谓词为真,Observable会完成吗? - lukebuehler
是的,如果您不希望它通知OnCompleted(),您可以简单地使用Where(arg => !arg.IsLastItem)。 - Christoph
2
是的,看起来Observable已经完成了,但现在我的问题是我将无法接收到最后一个项目... - lukebuehler

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接