Rx中用于回调的Observable

11

我正在寻找一种优雅的方法,使用Rx从普通回调委托创建一个Observable,类似于Observable.FromEventPattern的方式?

比如说,我正在封装Win32 EnumWindows API,该API会回调我提供的EnumWindowsProc

我知道我可以为此回调创建一个临时的C#事件适配器,并将其传递给FromEventPattern。此外,我可能还可以手动实现IObservable,以便它能从我的EnumWindowsProc回调中调用IObserver.OnNext

我是否忽略了一种将回调封装在Rx中的现有模式?


1
或者你可以简单地使用一个Subject并在回调中调用OnNext - Dirk
@Dirk,有趣的问题,谢谢。那么当没有更多项时,是先使用Subject.OnNext 再使用 Subject.OnComplete 吗? - avo
1
是的,Subject 实现了 IObservableIObserver。调用 OnNext/OnError/OnCompleted 方法向 Subject 的订阅者发出这些命令。它们作为一种从非 Rx 到 Rx 代码的网关。 - Dirk
@Dirk,谢谢,也许你应该把它发布为答案。 - avo
2个回答

10

您可以使用Subject<T>,从命令式编程世界进入Rx的函数式世界。

Subject<T>实现了IObservable<T>IObserver<T>接口,所以你可以调用它的OnNextOnErrorOnCompleted方法通知订阅者。

如果想将Subject<T>暴露为属性,则应使用.AsObservable()。这样可以隐藏IObservable<T>实际上是Subject<T>的事实。这使得像((Subject<string>) obj.Event).OnNext("Foo")这样的写法变得不可能。


这可能是正确的方法。在接受之前,我想听听别人的意见。 - avo
1
@avo 这确实是一个简单的解决方案,但有些人不喜欢使用 Subject,因为这是 Rx 中为数不多的非功能性东西之一。我认为它有一些好的用途,但如果有另一种方法 - 比如 FromEventPattern - 那么我会选择使用它。 - Dirk

5
请注意,像在EnumWindows中使用的回调函数与Rx略有不同。具体来说,回调可以通过其返回值向调用方进行通信,但Rx观察者无法这样做。此外,回调可以接收多个参数,但Rx观察者只接收单个值。因此,您需要将多个参数包装成单个对象。
考虑到这一点,使用Observable.Create作为Subject的替代方法。这样,只有在实际存在观察者时才注册回调,并在该观察者取消订阅时注销它。
对于您使用的同步API的示例,您可能会像这样做。请注意,在此示例中,实际上没有办法在流中间取消注册回调,因为所有操作都在我们能够返回取消订阅可处理对象之前同步发生。
public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
    return Observable.Create<Foo>(observer =>
    {
        FooApi.enumerate(arg1, arg2, e =>
        {
            observer.OnNext(new Foo(e));
            return true;
        });

        // In your case, FooApi.enumerate is actually synchronous
        // so when we get to this line of code, we know
        // the stream is complete.
        observer.OnCompleted();
        return Disposable.Empty;
    });
}

// Usage
WrapFooApi("a", "b").Take(1).Subscribe(...); // only takes first item

我们可以通过引入一些异步性来解决无法提前停止的问题,这将给观察者时间获取一个可处理的对象以通知您。我们可以使用CreateAsync来获取一个CancellationToken,当观察者取消订阅时,它将被取消。并且我们可以在Task.Run中运行FooApi代码:
public static IObservable<Foo> WrapFooApi(string arg1, string arg2)
{
    return Observable.CreateAsync<Foo>(async (observer, ct) =>
    {
        await Task.Run(() => FooApi.register_callback(arg1, arg2, e =>
        {
            observer.OnNext(e);

            // Returning false will stop the enumeration
            return !ct.IsCancellationRequested;
        }));
        observer.OnCompleted();
    });
}

在传统的异步回调API中,您需要在某个时刻注册并在另一个时刻取消注册。代码可能会像这样:
public static IObservable<Foo> WrapFooApi(string args)
{
    return Observable.Create<Foo>(observer =>
    {
        FooToken token = default(FooToken);
        var unsubscribe = Disposable.Create(() => FooApi.Unregister(token));
        token = FooApi.Register(args, e =>
        {
            observer.OnNext(new Foo(e));
        });

        return unsubscribe;
    });
}

1
在这种特定情况下,我认为您不能使用任何异步API来完成此操作 - EnumWindows回调是从调用者同步调用的(即想象EnumWindows API只是使用for循环并为每个项目调用回调)。 - Ana Betts

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接