响应式扩展同步订阅

4

有人能帮我实现对 IObserver 的同步订阅吗?这样调用方法将会在订阅完成前一直阻塞。

示例:

发布者

public static class Publisher {
public static IObservable<string> NonBlocking()
    {
        return Observable.Create<string>(
            observable =>
            {
                Task.Run(() =>
                {
                    observable.OnNext("a");
                    Thread.Sleep(1000);
                    observable.OnNext("b");
                    Thread.Sleep(1000);
                    observable.OnCompleted();
                    Thread.Sleep(1000);
                });

                return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
            });
    }

}

Subscriber

public static class Subscriber{
public static bool Subscribe()
    {
        Publisher.NonBlocking().Subscribe((s) =>
        {
            Debug.WriteLine(s);
        }, () =>
        {
            Debug.WriteLine("Complete");
        });
        // This will currently return true before the subscription is complete
        // I want to block and not Return until the Subscriber is Complete
        return true;
    }

}

1个回答

6
你需要使用 System.Reactive.Threading.Task 来实现这个功能:
将你的可观察对象转化为一个任务...
var source = Publisher.NonBlocking()
    .Do(
        (s) => Debug.WriteLines(x),
        ()  => Debug.WriteLine("Completed")
    )
    .LastOrDefault()
    .ToTask();

Do(...).Subscribe()Subscribe(...)类似。因此,Do只是添加了一些副作用。

LastOrDefault存在的原因是由于ToTask创建的Task仅等待来自源Observable的第一个项目,如果没有产生任何项目,则会失败(抛出异常)。因此,LastOrDefault有效地导致Task等待源已完成,无论它产生什么。

因此,在我们拥有任务之后,只需等待即可:

task.Wait(); // blocking

或者使用async/await:

await task; // non-blocking

编辑:

Cory Nelson提出了一个很好的观点:

在最新版本的C#和Visual Studio中,你实际上可以await一个IObservable<T>。这是一个非常酷的功能,但它的工作方式略有不同于等待Task

当你等待一个任务时,它会导致任务运行。如果等待多次单个任务实例,则该任务仅会执行一次。Observables略有不同。你可以将observable视为具有多个返回值的异步函数...每次订阅observable时,observable/函数都会执行。因此,这两段代码具有不同的含义:

等待Observable:

// Console.WriteLine will be invoked twice.
var source = Observable.Return(0).Do(Console.WriteLine);
await source; // Subscribe
await source; // Subscribe

通过任务等待可观察对象:

// Console.WriteLine will be invoked once.
var source = Observable.Return(0).Do(Console.WriteLine);
var task = source.ToTask();
await task; // Subscribe
await task; // Just yield the task's result.

因此,等待Observable的本质工作方式如下:
// Console.WriteLine will be invoked twice.
var source = Observable.Return(0).Do(Console.WriteLine);
await source.ToTask(); // Subscribe
await source.ToTask(); // Subscribe

然而,在Xamarin Studio中,await observable语法不起作用(截至撰写本文时)。如果您正在使用Xamarin Studio,强烈建议您在最后可能的时刻使用ToTask来模拟Visual Studio的await observable语法的行为。


太棒了,我不知道'.Do(..)'。看起来还有一个LastOrDefaultAsync()方法存在,所以我可以从那里做一个.Wait()。 - Lukie
2
你可以直接使用 await 关键字等待一个 IObservable<>。它将返回序列中的最后一项。 - Cory Nelson
1
@Lukie,在使用LastOrDefaultAsync之前,你可能需要查看我的编辑。过早地将Observable转换为Task可能会对代码的可重用性产生深远的影响。话虽如此,如果你在最后一刻使用LastOrDefaultAsync,那可能是你最好的选择。 - cwharris

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接