如何使用Observable或async/await包装异步回调?

4

我正在使用一个与回调函数一起工作的网络API。基本上,我需要在这个第三方库中使用一堆方法调用,看起来像这样:

void SendNetworkRequest(string requestType, Action<Response> callback)

我发现代码变得有点混乱,因为所有依赖于第三方API的网络资源的方法都需要自己实现回调。例如,在我的主场景中,我可能想要获取玩家信息,我的代码看起来会像这样:

void InitMainScene()
{
       _networkHelper.SendNetworkRequest("GetPlayersInfo",OnPlayerInfoResponse);
}

void OnPlayerInfoResponse(Response response)
{
     _playerInfo = response.Info;
}

我最近开始学习RX,并在我的代码中广泛使用它。我已经阅读了一些有关async/await的知识。我进行了很多实验,特别是与RX一起尝试使用Observable.FromAsync(),但无法使其正常工作。

我错过了什么?如何使用RX或async/await编写更清洁且不需要回调的代码?以下是我要寻找的伪代码:

        void InitMainScene()
        {
            _playerInfo = Overservable.DoMagicThing<Response>( () => {
_networkHelper.SendNetworkRequest("GetPlayersInfo",(r) =>{return r;}); });
        }
3个回答

2

以下是如何使用 Rx 实现。我将 Response 替换为 string 仅为了编译和测试,但很容易切换回去。

public IObservable<string> SendNetworkRequestObservable(string requestType)
{
    return Observable.Create<string>(observer =>  
    {
        SendNetworkRequest(requestType, s => observer.OnNext(s));
        observer.OnCompleted();
        return Disposable.Empty;
    });
}

// Define other methods and classes here
public void SendNetworkRequest(string requestType, Action<string> callback)
{
    callback(requestType); // implementation for testing purposes
}

如果你曾经返回过 Disposable.Empty,那么你可能做错了什么。 - Enigmativity
谢谢@Schlomo!这对我很有帮助!我会尝试使用它并更好地理解Enigmativity所描述的Disposable.Empty的含义。 - user3010678
@Enigmativity,如果你愿意的话可以详细解释一下。我不同意:在这种情况下没有什么需要处理的,但是订阅需要一个可处理的对象。这正是为什么存在Disposable.Empty的完美例子。 - Shlomo
@Shlomo - 我认为 Disposable.Empty 有点像 catch (Exception ex) - 它是一种反模式。通常很容易只需进行几个 .OnNext 调用并返回 Disposable.Empty - 这总是导致同步订阅调用。我认为避免使用 Disposable.Empty 是一个好习惯。不过,你说得对,这可能是它存在的完美例子。 - Enigmativity
@Shlomo - 我有点像 catch (Exception ex) 这样看待 Disposable.Empty - 它有点像是一种反模式。通常可以很容易地进行一些 .OnNext 调用并返回 Disposable.Empty - 这始终会导致同步订阅调用。我认为避免使用 Disposable.Empty 是一个好的做法。不过你说得对,这可能正是它存在的完美例子。 - Enigmativity

1

我不确定 RX。但是,您可以像这样将基于回调的 SendNetworkRequest 转换为可等待任务:

void SendNetworkRequest(string requestType, Action<Response> callback)
        {
            // This code by third party, this is here just for testing
            if (callback != null)
            { 
                var result = new Response();
                callback(result);
            }
        }

        Task<Response> SendNetworkRequestAsync(string requestType)
        {
            return Task.Run(() =>
            {
                var t = new TaskCompletionSource<Response>();

                SendNetworkRequest(requestType, s => t.TrySetResult(s));

                return t.Task;
            });
        }

现在,您可以更自然地使用async/await消耗SendNetworkRequestAsync。

非常感谢!这太棒了!我今天会试着用一下。我有一个问题 - 在我的情况下,你推荐使用这个语义还是Overservable?顺便说一句 - 我还没有使用过async/await,事实上,我正在Unity3d中进行此操作,它仅作为实验功能支持.net 4.6。 - user3010678
如果你正在处理一系列事件,那么RX非常棒。在你的情况下,你调用SendNetworkRequest然后等待任务完成,就这样。因此,在这种情况下,异步/等待比Rx更容易和更合理。干净而简单。 - Dan Nguyen

1
这里是我的 Rx 实现方法。
NetworkHelper 类中添加如下方法:
public IObservable<Response> SendNetworkRequestObservable(string requestType)
{
    return
        Observable
            .Create<Response>(observer =>
            {
                Action<Response> callback = null;
                var callbackQuery =
                    Observable
                        .FromEvent<Response>(h => callback += h, h => callback -= h)
                        .Take(1);
                var subscription = callbackQuery.Subscribe(observer);
                this.SendNetworkRequest(requestType, callback);
                return subscription;
            });
}

这将创建一个可观察对象,它在内部使用Observable.FromEvent(...).Take(1)来创建基于对SendNetworkRequest方法传递的Action<Response>回调期望单个调用的可观察对象。
唯一的问题是调用会在当前线程上直到完成。如果您想要在后台线程上运行此代码,但将结果返回到当前线程,则可以执行以下操作:
public IObservable<Response> SendNetworkRequestObservable(string requestType)
{
    return
        Observable
            .Start(() =>
                Observable
                    .Create<Response>(observer =>
                    {
                        Action<Response> callback = null;
                        var callbackQuery =
                            Observable
                                .FromEvent<Response>(h => callback += h, h => callback -= h)
                                .Take(1);
                        var subscription = callbackQuery.Subscribe(observer);
                        this.SendNetworkRequest(requestType, callback);
                        return subscription;
                    }))
            .Merge();
}

无论哪种方式,你都可以这样调用它:
var _playerInfo =
    _networkHelper
        .SendNetworkRequestObservable("GetPlayersInfo")
        .Wait();

仅作为附注:你的DoMagicThing是同步调用。在Rx中,这是一个.Wait()调用,但最好只在必要时使用普通的.Subscribe


根据评论,您可以使用 async 来完成此操作:
async void InitMainScene()
{
    _playerInfo = await _networkHelper.SendNetworkRequestObservable("GetPlayersInfo");
}

非常感谢您提供的示例。我已经花了一些时间尝试在我的游戏中使其工作...但似乎.Wait()方法导致Unity冻结然后崩溃。我需要深入挖掘原因 - 即使下面的async/await示例也是如此。非常感谢您的帮助!我想问您 - 您认为async/await对于这个问题是更好的解决方案,还是您认为您的解决方案对于这种特定情况更清晰?由于我对两者都不熟悉,很难知道在哪种情况下使用哪种方法。再次感谢! - user3010678
@user3010678 - 你不应该使用.Wait()。你应该使用常规的.Subscribe(...)调用,或者更好的方法是使用async,让调用看起来像这样:var _playerInfo = await _networkHelper.SendNetworkRequestObservable("GetPlayersInfo");。这适用于 observables 并返回生成的最后一个值(如果有多个)。 - Enigmativity

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接