当使用RxJava时,如何在HTTP错误(401)的情况下重试Retrofit调用?

3

我目前的Android应用程序使用Retrofit(2.4.0)RxJava(2.1.16)执行Web服务调用。

我正在使用Google登录进行用户身份验证。

我希望我的Retrofit调用能够检测到HTTP 401(未授权)并尝试使用Google登录进行静默登录,然后重试Retrofit调用。

我的Retrofit调用类似于以下内容

@Headers(HEADER_ACCEPT_JSON)
@GET("resources")
Observable<Response<String>> getResources(@Header(HEADER_AUTHORIZATION) @NonNull final String authenticationToken, @QueryMap(encoded = true) @NonNull Map<String, Object> queryMap);



API_SERVICE.getResources(Login.getAuthorizationToken(), id)
                .subscribeOn(Schedulers.io())
                .subscribe(Network::manageResource, Network::handle));

从搜索结果可以看出,retry/retryWhen只会在RxJava链中发生错误时触发,然而HTTP 401错误不会引起这种情况。

作为RxJava的新手,我该如何检测我的HTTP 401代码并且...

a). 执行Google SignIn静默登录

b). 静默登录完成后,重试我的API调用?

更新

以下代码让我更接近答案

@Headers(HEADER_ACCEPT_JSON)
@GET("resources")
Single<Response<String>> getResources(@Header(HEADER_AUTHORIZATION) @NonNull final String authenticationToken, @QueryMap(encoded = true) @NonNull Map<String, Object> queryMap);



API_SERVICE.getResources(Login.getAuthorizationToken(), id)
  .subscribeOn(Schedulers.io())
  .flatMap(new Function<Response<Article>,                                           
 SingleSource<Response<Article>>>() {
                        @Override
                        public SingleSource<Response<Article>> apply(final Response<Article> response) {
                            Log.d(TAG, "apply() called with: response = [" + response + "]");
                            if (response.isSuccessful()) {
                                return Single.just(response);
                            } else {
                                return Single.error(new RuntimeException());
                            }
                        }
                    })
                    .retryWhen(errors -> errors.take(1).flatMap(new Function<Throwable, Publisher<?>>() {
                        @Override
                        public Publisher<?> apply(final Throwable throwable) {
                            Log.d(TAG, "apply() called with: throwable = [" + throwable + "]");
                            Login.loginSilently().subscribe();

                            return Flowable.just("DELAY").delay(10, TimeUnit.SECONDS);
                        }
                    }))
                        .subscribe(Network::manageResource, Network::handle));

我不喜欢Flowable.just("DELAY").delay()的调用,即使我现在捕获了异常并且能够静默登录,我仍然会遇到这个异常。

09-10 16:39:29.878 7651-7718/research.android E/Network: handle: 
    java.util.NoSuchElementException
        at io.reactivex.internal.operators.flowable.FlowableSingleSingle$SingleElementSubscriber.onComplete(FlowableSingleSingle.java:116)
        at io.reactivex.subscribers.SerializedSubscriber.onComplete(SerializedSubscriber.java:168)
        at io.reactivex.internal.operators.flowable.FlowableRepeatWhen$WhenReceiver.onComplete(FlowableRepeatWhen.java:119)
        at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drainLoop(FlowableFlatMap.java:426)
        at io.reactivex.internal.operators.flowable.FlowableFlatMap$MergeSubscriber.drain(FlowableFlatMap.java:366)
        at io.reactivex.internal.operators.flowable.FlowableFlatMap$InnerSubscriber.onComplete(FlowableFlatMap.java:673)
        at io.reactivex.subscribers.SerializedSubscriber.onComplete(SerializedSubscriber.java:168)
        at io.reactivex.internal.operators.flowable.FlowableDelay$DelaySubscriber$OnComplete.run(FlowableDelay.java:139)
        at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:66)
        at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:57)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:301)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1162)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:636)
        at java.lang.Thread.run(Thread.java:764)
09-10 16:39:29.878 7651-7678/research.android  D/OkHttp: <-- HTTP FAILED: java.io.IOException: Canceled

我该如何让retrywhen等待silentLogin完成呢?
还有,是什么导致了NoSuchElementException?
3个回答

2
据我记得,如果您的错误代码> 300,则会调用onError()并传递Throwable参数,可以将其转换为HttpException,从中获取服务器返回的错误代码,然后您可以调用其他函数进行“静默调用”。

根据我的测试,当我收到HTTP 401时,onError()没有被调用。 - Hector

0

当您初始化客户端时:

Retrofit.Builder()
    .baseUrl(baseUrl)
    .client(createClient())
    .addConverterFactory(GsonConverterFactory.create())
    .addCallAdapterFactory(ApiHandler(Schedulers.io()))
    .build()

错误处理程序:

class ApiHandler(scheduler: Scheduler) : CallAdapter.Factory() {

private val original: RxJava2CallAdapterFactory
        = RxJava2CallAdapterFactory.createWithScheduler(scheduler)

override fun get(returnType: Type, annotations: Array<Annotation>, retrofit: Retrofit): CallAdapter<*, *>?
        = original.get(returnType, annotations, retrofit)?.let { Wrapper(it) }

private class Wrapper<R>(private val wrapped: CallAdapter<R, *>) : CallAdapter<R, Any> {
    override fun adapt(call: Call<R>?): Any? {
        call ?: return null
        val result = wrapped.adapt(call)

        return when (result) {
            is Maybe<*>      -> result.onErrorResumeNext(Function { Maybe.error(wrap(it)) })
            is Single<*>     -> result.onErrorResumeNext { Single.error(wrap(it)) }
            is Completable   -> result.onErrorResumeNext { Completable.error(wrap(it)) }
            is Flowable<*>   -> result.onErrorResumeNext(Function { Flowable.error(wrap(it)) })
            is Observable<*> -> result.onErrorResumeNext(Function { Observable.error(wrap(it)) })

            else             -> result
        }
    }

    override fun responseType(): Type = wrapped.responseType()

    private fun wrap(throwable: Throwable) = when (throwable) {
        is HttpException          -> {
            val exception = ApiException.http(throwable)
            toLog("ex - ${exception.message}")
            exception
        } // We had non-200 http error
        is JsonSyntaxException    -> ApiException.parse(throwable) // We had json parsing error
        is SocketTimeoutException -> ApiException.timeout(throwable) // A network error happened
        is IOException            -> ApiException.network(throwable) // A network error happened

        else                      -> ApiException.unknown(throwable) // We don't know what happened. We need to simply convert to an unknown error
    }
}
}

API 异常类:

class ApiException internal constructor(message: String,
                                    /** Response object containing status code, headers, body, etc.  */
                                    val response: ErrorResponse?,
                                    /** The event kind which triggered this error.  */
                                    @ApiError val error: Int,
                                    exception: Throwable?) : RuntimeException(message, exception) {

companion object {
    fun http(exception: HttpException): ApiException {
        val response = exception.response()
        var errorResponse: ErrorResponse? = null
        val message = if (response == null) {
            if (exception.message().isEmpty()) exception.code().toString() else exception.message()

        } else {

            // here you can check error code and throw needed exception

            val errorBody = response.errorBody()?.string().toString()
            if (errorBody.isNotEmpty()) {
                toLog("ApiException: $errorBody")
            }
            try {
                errorResponse = GsonBuilder().create().fromJson(errorBody, ErrorResponse::class.java)
                errorResponse?.toString() ?: errorBody

            } catch (e: Exception) {
                e.printStackTrace()
                response.raw().message()
            }
        }
        return ApiException(message, errorResponse, ApiError.HTTP, exception)
    }

    fun network(exception: IOException): ApiException {
        return ApiException(exception.message ?: "network", null, ApiError.NETWORK, exception)
    }

    fun parse(exception: JsonSyntaxException): ApiException {
        return ApiException(exception.message ?: "parse", null, ApiError.CONVERSION, exception)
    }

    fun unknown(exception: Throwable): ApiException {
        return ApiException(exception.message ?: "unknown", null, ApiError.UNKNOWN, exception)
    }

    fun timeout(exception: SocketTimeoutException): ApiException {
        return ApiException("Connection timed out", null, ApiError.TIMEOUT_EXCEPTION, exception)
    }
}
}

当调用请求时

yourRequest.compose { observable ->
                observable.retryWhen { flow ->
                    flow.ofType(ApiException::class.java).flatMap {
                        when {
                            it.error == ApiError.TIMEOUT_EXCEPTION -> Flowable.empty<T>()
                            it.error == ApiError.NETWORK           -> getSnackBarFlowable().flatMap { if (it) Flowable.just(it) else Flowable.empty<T>() }

                            else                                   -> Flowable.error(it)
                        }
                    }
                }
            }.subscribe({}, {})

getSnackBarFlowable() 是从 fragment 中获取的。你也可以使用其他方法。

fun getSnackBarFlowable(): Flowable<Boolean> = Flowable.create({ subscriber ->
    if (view == null) {
        subscriber.onNext(false)

    } else {
        val snackBar = Snackbar.make(activity!!.currentFocus, R.string.error_connection_fail, Snackbar.LENGTH_INDEFINITE)
        snackBar.setAction("Retry") { subscriber.onNext(true) }
        snackBar.show()
    }
}, LATEST)

我知道,代码已经够多了。但是这个解决方案对我在不同的项目中非常有帮助。


0

要解决401未经授权的错误,请尝试将AuthInterceptor实现到您的OkHttpClient中。

BasicAuthInterceptor interceptorAuth = new BasicAuthInterceptor(yourToken);

OkHttpClient client = new OkHttpClient.Builder()
        .addInterceptor(interceptorAuth)
        .build();

builder.client(client);

如果您的authToken已过期或无效,请尝试获取新的。
public class BasicAuthInterceptor implements Interceptor {

private String yourToken;

public BasicAuthInterceptor(String token) {
    this.yourToken = token;
}

@Override
public Response intercept(Chain chain) throws IOException {
    Request request = chain.request();
    Request authenticatedRequest = request.newBuilder()
        .header("Authorization", format("token %s", yourToken)).build();

    Response response = chain.proceed(authenticatedRequest);

    boolean unauthorized = response.code() == 401;
    if (unauthorized) {

        Request modifiedRequest = request.newBuilder()
                .header("Authorization", format("token %s", getNewToken())).build();
        response = chain.proceed(modifiedRequest);
        }

    return response;
    }

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接