在处理Retrofit时,我遇到了RxJava。我的问题是如何确定是否需要使用RxJava?如果需要,是否有任何使用Retrofit进行轮询的非常简单的示例?(因为我只发送了几个键/值对)如果不需要,那么该如何使用Retrofit呢?
我在这里找到了this线程,但它并没有帮助我,因为我仍然不知道是否需要Retrofit + RxJava,或者是否有更简单的方法?
假设你为Retrofit定义的接口包含以下方法:
public Observable<GameState> loadGameState(@Query("id") String gameId);
Retrofit的方法可以通过以下三种方式之一定义:
1.) 简单同步方法:
public GameState loadGameState(@Query("id") String gameId);
2.) 接受 Callback
以进行异步处理的函数:
public void loadGameState(@Query("id") String gameId, Callback<GameState> callback);
3.) 返回一个 rxjava Observable
的方法,请查看上文。如果您要将 Retrofit 与 rxjava 结合使用,那么使用此版本是最合理的选择。
这样,您就可以像这样直接使用 Observable 发出单个请求:
mApiService.loadGameState(mGameId)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<GameState>() {
@Override
public void onNext(GameState gameState) {
// use the current game state here
}
// onError and onCompleted are also here
});
如果您想使用timer()
或interval()
的版本来反复轮询服务器,可以提供"pulse"。Observable.timer(0, 2000, TimeUnit.MILLISECONDS)
.flatMap(mApiService.loadGameState(mGameId))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<GameState>() {
@Override
public void onNext(GameState gameState) {
// use the current game state here
}
// onError and onCompleted are also here
}).
需要注意的是,在这里我使用了flatMap
而不是map
。这是因为loadGameState(mGameId)
的返回值本身就是一个Observable
。
但是,在您的更新中使用的版本也应该可以工作:
Observable.interval(2, TimeUnit.SECONDS, Schedulers.io())
.map(tick -> Api.ReceiveGameTurn())
.doOnError(err -> Log.e("Polling", "Error retrieving messages" + err))
.retry()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(sub);
也就是说,如果像我上面的 1.) 那样定义 ReceiveGameTurn()
同步方式,你需要使用 map
而不是 flatMap
。
在两种情况下,你的 Subscriber
的 onNext
方法会每两秒钟调用一次,并从服务器获取最新的游戏状态。你可以在 subscribe()
前插入 take(1)
将其限制为单个项目,以便一个接一个地处理它们。
然而,关于第一种版本:单个网络错误首先会被传递到 onError
,然后 Observable 将停止发出任何更多的项目,使得你的 Subscriber 变得无用且没有输入(记住,onError
只能被调用一次)。为了避免这种情况发生,你可以使用 rxjava 的任何 onError*
方法将故障“重定向”到 onNext。
例如:
Observable.timer(0, 2000, TimeUnit.MILLISECONDS)
.flatMap(new Func1<Long, Observable<GameState>>(){
@Override
public Observable<GameState> call(Long tick) {
return mApiService.loadGameState(mGameId)
.doOnError(err -> Log.e("Polling", "Error retrieving messages" + err))
.onErrorResumeNext(new Func1<Throwable, Observable<GameState>(){
@Override
public Observable<GameState> call(Throwable throwable) {
return Observable.emtpy());
}
});
}
})
.filter(/* check if it is a valid new game state */)
.take(1)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<GameState>() {
@Override
public void onNext(GameState gameState) {
// use the current game state here
}
// onError and onCompleted are also here
}).
每2秒钟执行以下步骤: * 使用Retrofit从服务器获取当前游戏状态 * 过滤出无效状态 * 取第一个有效状态 * 取消订阅
如果出现错误:
* 它将在doOnNext
中打印错误消息
* 否则忽略错误: onErrorResumeNext
将“消耗”onError
-事件(即您的Subscriber
的onError
不会被调用)并替换为空(Observable.empty()
)。
关于第二个版本,如果出现网络错误,retry
将立即重新订阅间隔 - 并且因为interval
在订阅时立即发出第一个整数,下一个请求也将立即发送,而不是您可能想要的3秒后...
最后注意:如果您的游戏状态非常大,您还可以首先轮询服务器以询问是否有新状态可用,并仅在回答肯定的情况下重新加载新游戏状态。
如果需要更详细的示例,请提出要求。
更新:我已经重写了文章并添加了更多信息。
更新2:我已经添加了使用onErrorResumeNext
进行完整错误处理的示例。
doOnError
来处理错误吗? - mrpdoOnError
只是在错误通过Observable管道传递时执行某些操作。但我认为它并不是真正意义上解决错误的地方。默认情况下,错误可以在onError
中处理,但这也会完全终止Observable。还有其他选项来应对错误,保持订阅的完整性:发出其他东西(onErrorResumeNext
),retry
等。 - david.miholaSubscriber sub = new Subscriber<Long>() {
@Override
public void onNext(Long _EmittedNumber)
{
GameTurn Turn = Api.ReceiveGameTurn(mGameInfo.GetGameID(), mGameInfo.GetPlayerOneID());
Log.d("Polling", "onNext: GameID - " + Turn.GetGameID());
}
@Override
public void onCompleted() {
Log.d("Polling", "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d("Polling", "Error: " + e);
}
};
Observable.interval(3, TimeUnit.SECONDS, Schedulers.io())
// .map(tick -> Api.ReceiveGameTurn())
// .doOnError(err -> Log.e("Polling", "Error retrieving messages" + err))
.retry()
.subscribe(sub);
GameTurn
)时,我需要终止发射。我了解到takeUntil
方法,我需要传递另一个Observable
,它会发出一些东西,一旦触发轮询的终止。但我不确定如何实现这一点。根据您的解决方案,您的API方法返回一个Observable
,就像Retrofit网站上显示的那样。也许这就是解决方案?那么它将如何工作? Subscriber sub = new Subscriber<String>() {
@Override
public void onNext(String _SearchOpponentResult) {}
@Override
public void onCompleted() {
Log.d("Polling", "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d("Polling", "Error: " + e);
}
};
Observable.interval(3, TimeUnit.SECONDS, Schedulers.io())
.map(tick -> mApiService.SearchForOpponent(mGameInfo.GetGameID()))
.doOnError(err -> Log.e("Polling", "Error retrieving messages: " + err))
.retry()
.filter(new Func1<String, Boolean>()
{
@Override
public Boolean call(String _SearchOpponentResult)
{
Boolean OpponentExists;
if (_SearchOpponentResult != "0")
{
Log.e("Polling", "Filter " + _SearchOpponentResult);
OpponentExists = true;
}
else
{
OpponentExists = false;
}
return OpponentExists;
}
})
.take(1)
.subscribe(sub);
E/Polling﹕ Error retrieving messages: java.lang.NullPointerException
显然,doOnError
在每次发射时都会触发。通常情况下,我会在每次发射时得到一些Retrofit调试日志,这意味着mApiService.SearchForOpponent
不会被调用。我做错了什么?
subscribe
前插入 take(1)
来限制发射到一个项目,但您也可以看看 retryWhen
- 也许这更接近您实际想要的... - david.miholamGameInfo
没有正确设置,真是太糟糕了!所以上面的代码可以工作!虽然对我来说它仍然更像是一种解决方法而不是解决方案,take(1)
、filter
、retry
听起来有点混乱,而不是为周期性发射设置终止条件,但我现在会使用它,非常感谢! - mrp