如果第一个IObservable为空,请切换到另一个IObservable。

7
我正在编写一个函数,用于检索有关某个主题的新闻,并通过IObservable返回值反馈这些新闻。
然而,我有几个新闻来源。我不想使用Merge将这些来源组合成一个。相反,我想按优先级对它们进行排序 -
1. 当我的函数被调用时,将查询第一个新闻源(它会生成表示该源的IObservable)。 2. 如果该新闻源的IObservable在未返回任何结果的情况下完成,则查询下一个新闻源。 3. 如果第二个源在未返回结果的情况下完成,则查询最后一个新闻源。 4. 将整个行为封装到可观察对象中,以便我可以将其返回给用户。
这种行为是我可以使用内置的Rx扩展方法来实现,还是需要实现自定义类来处理?我该如何处理这两种情况?

如果您能帮忙重新表述这个问题,我将不胜感激--我已经尽力了,但这个问题很难用语言描述清楚。 - David Pfeffer
如果源A返回结果然后完成,那么会继续执行源B吗? - JerKimball
另外,“新闻查询器”调用的签名是什么?它是否已经返回IObservable - JerKimball
@JerKimball 如果A返回结果,我们不关心B或C。新闻查询调用都会返回IObservable - David Pfeffer
进一步澄清,如果 A 不返回结果而 B 返回 结果,我们不关心 C。 - David Pfeffer
在我看来,查询 IObservable 没有太多意义。查询意味着“我提出一个问题并期望立即得到答案”。IObservable 只接受订阅。它会在自己选择的时间回调给其订阅者。它是主动推送者,而不是您可以按照自己喜好拉取的被动对象。当与 IObservable 交互时,您是观察者,是被动的一方! - Theodor Zoulias
5个回答

4

我认为接受的答案不理想,因为它使用了SubjectDo,而且仍然订阅第二序列,即使第一序列是空的。如果第二个可观察对象调用任何非平凡操作,后者可能会带来很大问题。相反,我提出了以下解决方案:

public static IObservable<T> SwitchIfEmpty<T>(this IObservable<T> @this, IObservable<T> switchTo)
{
    if (@this == null) throw new ArgumentNullException(nameof(@this));
    if (switchTo == null) throw new ArgumentNullException(nameof(switchTo));
    return Observable.Create<T>(obs =>
    {
        var source = @this.Replay(1);
        var switched = source.Any().SelectMany(any => any ? Observable.Empty<T>() : switchTo);
        return new CompositeDisposable(source.Concat(switched).Subscribe(obs), source.Connect());
    });
}

名称SwitchIfEmpty现有的RxJava实现一致。这里正在讨论将一些RxJava操作符纳入RxNET中。
我相信自定义的IObservable实现比我的更有效率。您可以在ReactiveX成员akarnokd编写的此处找到它。它也可以在NuGet上获得。

1

编辑自原帖:

我采用了这个答案,但将其转换为扩展方法--

/// <summary> Returns the elements of the first sequence, or the values in the second sequence if the first sequence is empty. </summary>
/// <param name="first"> The first sequence. </param>
/// <param name="second"> The second sequence. </param>
/// <typeparam name="T"> The type of elements in the sequence. </typeparam>
/// <returns> The <see cref="IObservable{T}"/> sequence. </returns>
public static IObservable<T> DefaultIfEmpty<T>(this IObservable<T> first, IObservable<T> second)
{
    var signal = new AsyncSubject<Unit>();
    var source1 = first.Do(item => { signal.OnNext(Unit.Default); signal.OnCompleted(); });
    var source2 = second.TakeUntil(signal);

    return source1.Concat(source2); // if source2 is cold, it won't invoke it until source1 is completed
}

原始答案:

这可能会起作用。

var signal1 = new AsyncSubject<Unit>();
var signal2 = new AsyncSubject<Unit>();
var source1 = a.Do(item => { signal1.onNext(Unit.Default); signal1.onCompleted(); });
var source2 = b.Do(item => { signal2.onNext(Unit.Default); signal2.onCompleted(); })).TakeUntil(signal1);
var source3 = c.TakeUntil(signal2.Merge(signal1));

return Observable.Concat(source1, source2, source3);

编辑:哎呀,需要一个单独的信号来表示第二个来源,而第三个不需要任何信号。

编辑2:哎呀...打错字了。我习惯了RxJs :)

附注:还有一些不那么RX-y的方法可以做到这一点,可能需要输入更少的内容:

var gotResult = false;
var source1 = a();
var source2 = Observable.Defer(() => return gotResult ? Observable.Empty<T>() : b());
var source3 = Observable.Defer(() => return gotResult ? Observable.Empty<T>() : c());
return Observable.Concat(source1, source2, source3).Do(_ => gotResult = true;);

如果您的调用返回冷式可观察者(Cold Observables),那么请使用这些调用替换'a'、'b'和'c'。如果它们返回热式可观察者(Hot Observables)(希望不会发生!),那么请使用变量Observable.Defer(() => callToNewSource())进行替换。 - Brandon
抱歉,完全错过了那些变量。我确实有热可观测对象,因为它们实际上是已转换的“Task”。 - David Pfeffer
1
是的,那么 Observable.Defer(() => yourMethodThatReturnsTask().ToObservable()) 可能是最好的选择。 - Brandon
对于第一个代码片段,TakeUntil不会导致第三个可观察对象被订阅吗?并且它会一直读取它,直到A或B接收到一个项目,这可能比C返回结果所需的时间更长,是吗? - David Pfeffer
我认为你不应该使用Do来做这个...感觉不对...它是用于副作用的。 - Richard Anthony Freeman-Hein
显示剩余3条评论

1

听起来你可以使用普通的Amb查询。

编辑:根据评论,Amb行不通 - 尝试这个:

public static IObservable<T> SwitchIfEmpty<T>(
     this IObservable<T> first, 
     Func<IObservable<T>> second)
{
    return first.IsEmpty().FirstOrDefault() ? second() : first;
}

测试装置:
static Random r = new Random();
public IObservable<string> GetSource(string sourceName)
{
    Console.WriteLine("Source {0} invoked", sourceName);
    return r.Next(0, 10) < 5 
        ? Observable.Empty<string>() 
        : Observable.Return("Article from " + sourceName);
}

void Main()
{
    var query = GetSource("A")
        .SwitchIfEmpty(() => GetSource("B"))
        .SwitchIfEmpty(() => GetSource("C"));

    using(query.Subscribe(Console.WriteLine))
    {
        Console.ReadLine();
    }           
}

一些示例运行:

Source A invoked
Article from A

Source A invoked
Source B invoked
Article from B

Source A invoked
Source B invoked
Source C invoked
Article from C

编辑:

我想你可以将其概括为这样:

public static IObservable<T> SwitchIf<T>(
    this IObservable<T> first, 
    Func<IObservable<T>, IObservable<bool>> predicate, 
    Func<IObservable<T>> second)
{
    return predicate(first).FirstOrDefault() 
        ? second() 
        : first;
}

“Amb” 将同时开始观察所有三个,而我只想在第一个没有结果的情况下才开始观察第二个。 - David Pfeffer
啊,我明白了 - 如果不需要调用获取函数,你想要跳过它;好的,让我考虑一下,我会进行编辑。 - JerKimball
这很有前途,但它需要一个阻塞调用FirstOrDefault(),在我的情况下可能需要几秒钟。 - David Pfeffer
@DavidPfeffer 没错,确实有这个问题 - 实际上,我又想到了一个新的想法...它与之前完全不同,所以我会写一个新的答案。 - JerKimball
1
这需要订阅 first 并等待两次结果。如果你知道它是一个 AsyncSubjectTask,那么可能没问题,但否则...可能不是个好主意。 - Brandon

1

另一种方法-与其他方法相比相当激进,因此我将提供一个新的答案:

这里是带有各种有趣的调试行的版本:

public static IObservable<T> FirstWithValues<T>(this IEnumerable<IObservable<T>> sources)
{
    return Observable.Create<T>(obs =>
    {
        // these are neat - if you set it's .Disposable field, and it already
        // had one in there, it'll auto-dispose it
        SerialDisposable disp = new SerialDisposable();
        // this will trigger our exit condition
        bool hadValues = false;
        // start on the first source (assumed to be in order of importance)
        var sourceWalker = sources.GetEnumerator();
        sourceWalker.MoveNext();

        IObserver<T> checker = null;
        checker = Observer.Create<T>(v => 
            {
                // Hey, we got a value - pass to the "real" observer and note we 
                // got values on the current source
                Console.WriteLine("Got value on source:" + v.ToString());
                hadValues = true;
                obs.OnNext(v);
            },
            ex => {
                // pass any errors immediately back to the real observer
                Console.WriteLine("Error on source, passing to observer");
                obs.OnError(ex);
            },
            () => {
                // A source completed; if it generated any values, we're done;                    
                if(hadValues)
                {
                    Console.WriteLine("Source completed, had values, so ending");
                    obs.OnCompleted();
                }
                // Otherwise, we need to check the next source in line...
                else
                {
                    Console.WriteLine("Source completed, no values, so moving to next source");
                    sourceWalker.MoveNext();
                    disp.Disposable = sourceWalker.Current.Subscribe(checker);
                }
            });
        // kick it off by subscribing our..."walker?" to the first source
        disp.Disposable = sourceWalker.Current.Subscribe(checker);
        return disp.Disposable;
    });
}

使用方法:

var query = new[]
{
    Observable.Defer(() => GetSource("A")), 
    Observable.Defer(() => GetSource("B")), 
    Observable.Defer(() => GetSource("C")), 
}.FirstWithValues();

输出:

Source A invoked
Got value on source:Article from A
Article from A
Source completed, had values, so ending

Source A invoked
Source completed, no values, so moving to next source
Source B invoked
Got value on source:Article from B
Article from B
Source completed, had values, so ending

Source A invoked
Source completed, no values, so moving to next source
Source B invoked
Source completed, no values, so moving to next source
Source C invoked
Got value on source:Article from C
Article from C
Source completed, had values, so ending

0

这里是JerKimball的SwitchIfEmpty运算符的非阻塞版本。

/// <summary>Returns the elements of the first sequence, or the elements of the
/// second sequence if the first sequence is empty.</summary>
public static IObservable<T> SwitchIfEmpty<T>(this IObservable<T> first,
    IObservable<T> second)
{
    return Observable.Defer(() =>
    {
        bool isEmpty = true;
        return first
            .Do(_ => isEmpty = false)
            .Concat(Observable.If(() => isEmpty, second));
    });
}

这里有一个接受多个序列并返回第一个非空序列元素的相同运算符版本:

/// <summary>Returns the elements of the first non-empty sequence.</summary>
public static IObservable<T> SwitchIfEmpty<T>(params IObservable<T>[] sequences)
{
    return Observable.Defer(() =>
    {
        bool isEmpty = true;
        return sequences
            .Select(s => s.Do(_ => isEmpty = false))
            .Select(s => Observable.If(() => isEmpty, s))
            .Concat();
    });
}

Observable.Defer 操作符用于防止多个订阅共享相同的 bool isEmpty 状态(有关更多信息,请参见 此处)。


注意,在当前版本的 Rx 库(5.0.0)中,Concat 运算符表现异常。我的建议是在 Concat 问题得到解决之前,使用等效的 Merge(1) 运算符。这里的 1maxConcurrent 参数的值。将此参数设置为 1 表示无并发。 - Theodor Zoulias

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接