Android:使用Retrofit轮询服务器

7
我正在开发一款Android上的双人游戏。游戏是轮流进行的,所以玩家1等待玩家2输入,反之亦然。我有一个Web服务器,在其中使用Slim框架运行API。在客户端上,我使用Retrofit。因此,在客户端上,我想每隔X秒轮询我的Web服务器(我知道这不是最好的方法),检查是否有来自玩家2的输入,如果有,则更改UI(游戏板)。
在处理Retrofit时,我遇到了RxJava。我的问题是如何确定是否需要使用RxJava?如果需要,是否有任何使用Retrofit进行轮询的非常简单的示例?(因为我只发送了几个键/值对)如果不需要,那么该如何使用Retrofit呢?
我在这里找到了this线程,但它并没有帮助我,因为我仍然不知道是否需要Retrofit + RxJava,或者是否有更简单的方法?

你不一定需要使用RxJava,但它可以很好地与Retrofit配合使用。 - njzk2
那么,我如何在不使用RxJava的情况下使用Retrofit实现轮询呢? - mrp
2个回答

15

假设你为Retrofit定义的接口包含以下方法:

public Observable<GameState> loadGameState(@Query("id") String gameId);

Retrofit的方法可以通过以下三种方式之一定义:

1.) 简单同步方法:

public GameState loadGameState(@Query("id") String gameId);

2.) 接受 Callback 以进行异步处理的函数:

public void loadGameState(@Query("id") String gameId, Callback<GameState> callback);

3.) 返回一个 rxjava Observable 的方法,请查看上文。如果您要将 Retrofit 与 rxjava 结合使用,那么使用此版本是最合理的选择。

这样,您就可以像这样直接使用 Observable 发出单个请求:


mApiService.loadGameState(mGameId)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<GameState>() {

    @Override
    public void onNext(GameState gameState) {
        // use the current game state here
    }

    // onError and onCompleted are also here
});
如果您想使用timer()interval()的版本来反复轮询服务器,可以提供"pulse"。
Observable.timer(0, 2000, TimeUnit.MILLISECONDS)
.flatMap(mApiService.loadGameState(mGameId))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<GameState>() {

    @Override
    public void onNext(GameState gameState) {
        // use the current game state here
    }

    // onError and onCompleted are also here
}).

需要注意的是,在这里我使用了flatMap而不是map。这是因为loadGameState(mGameId)的返回值本身就是一个Observable

但是,在您的更新中使用的版本也应该可以工作:

Observable.interval(2, TimeUnit.SECONDS, Schedulers.io())
.map(tick -> Api.ReceiveGameTurn())
.doOnError(err -> Log.e("Polling", "Error retrieving messages" + err))
.retry()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(sub);

也就是说,如果像我上面的 1.) 那样定义 ReceiveGameTurn() 同步方式,你需要使用 map 而不是 flatMap

在两种情况下,你的 SubscriberonNext 方法会每两秒钟调用一次,并从服务器获取最新的游戏状态。你可以在 subscribe() 前插入 take(1) 将其限制为单个项目,以便一个接一个地处理它们。

然而,关于第一种版本:单个网络错误首先会被传递到 onError,然后 Observable 将停止发出任何更多的项目,使得你的 Subscriber 变得无用且没有输入(记住,onError 只能被调用一次)。为了避免这种情况发生,你可以使用 rxjava 的任何 onError* 方法将故障“重定向”到 onNext。

例如:

Observable.timer(0, 2000, TimeUnit.MILLISECONDS)
.flatMap(new Func1<Long, Observable<GameState>>(){

    @Override
    public Observable<GameState> call(Long tick) {
        return mApiService.loadGameState(mGameId)
        .doOnError(err -> Log.e("Polling", "Error retrieving messages" + err))
        .onErrorResumeNext(new Func1<Throwable, Observable<GameState>(){
            @Override
            public Observable<GameState> call(Throwable throwable) {
                return Observable.emtpy());
            }
        });
    }
})
.filter(/* check if it is a valid new game state */)
.take(1)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<GameState>() {

    @Override
    public void onNext(GameState gameState) {
        // use the current game state here
    }

    // onError and onCompleted are also here
}).

每2秒钟执行以下步骤: * 使用Retrofit从服务器获取当前游戏状态 * 过滤出无效状态 * 取第一个有效状态 * 取消订阅

如果出现错误: * 它将在doOnNext中打印错误消息 * 否则忽略错误: onErrorResumeNext 将“消耗”onError-事件(即您的SubscriberonError不会被调用)并替换为空(Observable.empty())。

关于第二个版本,如果出现网络错误,retry将立即重新订阅间隔 - 并且因为interval在订阅时立即发出第一个整数,下一个请求也将立即发送,而不是您可能想要的3秒后...

最后注意:如果您的游戏状态非常大,您还可以首先轮询服务器以询问是否有新状态可用,并仅在回答肯定的情况下重新加载新游戏状态。

如果需要更详细的示例,请提出要求。

更新:我已经重写了文章并添加了更多信息。

更新2:我已经添加了使用onErrorResumeNext进行完整错误处理的示例。


我稍后需要查看错误处理。但是我们不是使用 doOnError 来处理错误吗? - mrp
这取决于你所说的“处理”是什么意思。doOnError只是在错误通过Observable管道传递时执行某些操作。但我认为它并不是真正意义上解决错误的地方。默认情况下,错误可以在onError中处理,但这也会完全终止Observable。还有其他选项来应对错误,保持订阅的完整性:发出其他东西(onErrorResumeNext),retry等。 - david.mihola

1
感谢您,我最终以类似的方式制作出来了,基于我在问题中提到的post。以下是我的代码:
Subscriber sub =  new Subscriber<Long>() {
        @Override
        public void onNext(Long _EmittedNumber)
        {
            GameTurn Turn =  Api.ReceiveGameTurn(mGameInfo.GetGameID(), mGameInfo.GetPlayerOneID());
            Log.d("Polling", "onNext: GameID - " + Turn.GetGameID());
        }

        @Override
        public void onCompleted() {
            Log.d("Polling", "Completed!");
        }

        @Override
        public void onError(Throwable e) {
            Log.d("Polling", "Error: " + e);
        }
    };

    Observable.interval(3, TimeUnit.SECONDS, Schedulers.io())
            // .map(tick -> Api.ReceiveGameTurn())
            // .doOnError(err -> Log.e("Polling", "Error retrieving messages" + err))
            .retry()
            .subscribe(sub);

现在的问题是,当我得到一个肯定的答案(一个GameTurn)时,我需要终止发射。我了解到takeUntil方法,我需要传递另一个Observable,它会发出一些东西,一旦触发轮询的终止。但我不确定如何实现这一点。根据您的解决方案,您的API方法返回一个Observable,就像Retrofit网站上显示的那样。也许这就是解决方案?那么它将如何工作?
更新: 我考虑了@david.miholas的建议,并尝试了他的重试和过滤建议。下面是游戏初始化的代码。轮询应该完全相同:Player1开始新游戏->轮询对手,Player2加入游戏->服务器向Player1发送对手的ID->轮询终止。
    Subscriber sub =  new Subscriber<String>() {
        @Override
        public void onNext(String _SearchOpponentResult) {}

        @Override
        public void onCompleted() {
            Log.d("Polling", "Completed!");
        }

        @Override
        public void onError(Throwable e) {
            Log.d("Polling", "Error: " + e);
        }
    };

    Observable.interval(3, TimeUnit.SECONDS, Schedulers.io())
            .map(tick -> mApiService.SearchForOpponent(mGameInfo.GetGameID()))
            .doOnError(err -> Log.e("Polling", "Error retrieving messages: " + err))
            .retry()
            .filter(new Func1<String, Boolean>()
            {
                @Override
                public Boolean call(String _SearchOpponentResult)
                {
                    Boolean OpponentExists;
                    if (_SearchOpponentResult != "0")
                    {
                        Log.e("Polling", "Filter " + _SearchOpponentResult);
                        OpponentExists = true;
                    }
                    else
                    {
                        OpponentExists = false;
                    }
                    return OpponentExists;

                }
            })
            .take(1)
            .subscribe(sub);

排放是正确的,但我在每次排放时都会收到此日志消息:
E/Polling﹕ Error retrieving messages: java.lang.NullPointerException

显然,doOnError在每次发射时都会触发。通常情况下,我会在每次发射时得到一些Retrofit调试日志,这意味着mApiService.SearchForOpponent不会被调用。我做错了什么?


也许您可以更详细地解释一下您想要实现的目标:起初我以为您想在程序运行期间每两秒轮询服务器,但现在您说您想在从服务器获得成功响应后终止项目的发射... 您当然可以在 subscribe 前插入 take(1) 来限制发射到一个项目,但您也可以看看 retryWhen - 也许这更接近您实际想要的... - david.mihola
这是一个回合制游戏,因此玩家等待/轮询服务器以获取更改。当另一个玩家设置输入时,他发送自己的回合,并将其存储在数据库中。然后第一个玩家会收到一个带有回合信息的正面MySQL结果,这需要用于更新UI并终止轮询,因为已经接收到了信息。官方Rx wiki说:“如果源Observable发出错误,请将该错误传递给另一个Observable以确定是否要重新订阅源”,但这对我目前没有帮助。 - mrp
我不确定我理解你的问题:你每3秒钟从doOnError获取一条日志消息,但是你没有看到任何Retrofit日志消息?如果是这样,你确定Retrofit日志记录已启用吗?无论之后在doOnError中发生什么,你都应该看到一些Retrofit输出。你可以尝试删除“retry”,并查看传递给你的“Subscriber”中的“onError”的“Throwable”。 - david.mihola
终于搞定了。是我的问题,创建新游戏时mGameInfo没有正确设置,真是太糟糕了!所以上面的代码可以工作!虽然对我来说它仍然更像是一种解决方法而不是解决方案,take(1)filterretry听起来有点混乱,而不是为周期性发射设置终止条件,但我现在会使用它,非常感谢! - mrp
不,它实际上保留了间隔。在我的(微不足道的)情况下,这不会成为问题,我只需要重复发射。 - mrp
显示剩余6条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接