RxAndroid和Retrofit:无法创建用于io.reactivex.Observable<retrofit2.Response<okhttp3.ResponseBody>>的调用适配器

17
我正在尝试使用rxJava,rxAndroid,Retrofit2和OkHTTP3从URL端点下载文件。我的代码无法为“Observable >”创建调用适配器。由于这些方法对我来说是新的,因此我认为我错过了重要的概念。非常感谢您提供任何方向或观点。

FATAL EXCEPTION: 主要 进程:com.example.khe11e.rxdownloadfile,PID:14130 java.lang.IllegalArgumentException:无法为io.reactivex.Observable>创建调用适配器 RetrofitInterface.downloadFileByUrlRx方法中 在retrofit2.ServiceMethod$Builder.methodError(ServiceMethod.java:720)处 在retrofit2.ServiceMethod$Builder.createCallAdapter(ServiceMethod.java:234)处 在retrofit2.ServiceMethod$Builder.build(ServiceMethod.java:160)处 在retrofit2.Retrofit.loadServiceMethod(Retrofit.java:166)处 在retrofit2.Retrofit$1.invoke(Retrofit.java:145)处 在java.lang.reflect.Proxy.invoke(Proxy.java:393)处 在$ Proxy0.downloadFileByUrlRx(未知来源)处 在com.example.khe11e.rxdownloadfile.MainActivity.downloadImage(MainActivity.java:46)处 在com.example.khe11e.rxdownloadfile.MainActivity $ 1.onClick(MainActivity.java:39)处 在android.view.View.performClick(View.java:5207)处 在android.view.View $ PerformClick.run(View.java:21168)处 在android.os.Handler.handleCallback(Handler.java:746)处 在android.os.Handler.dispatchMessage(Handler.java:95)处 在android.os.Looper.loop(Looper.java:148)处 在android.app.ActivityThread.main(ActivityThread.java:5491)处 在java.lang.reflect.Method.invoke(Native Method)处 在com.android.internal.os.ZygoteInit $ MethodAndArgsCaller.run(ZygoteInit.java:728)处 在com.android.internal.os.ZygoteInit.main(ZygoteInit.java:618)处 原因是:java.lang.IllegalArgumentException:无法找到io.reactivex.Observable>的调用适配器。 已尝试: *retrofit2.adapter.rxjava.RxJavaCallAdapterFactory *retrofit2.ExecutorCallAdapterFactory 在retrofit2.Retrofit.nextCallAdapter(Retrofit.java:237)处 在retrofit2.Retrofit.callAdapter(Retrofit.java:201)处 在retrofit2.ServiceMethod$Builder.createCallAdapter(ServiceMethod.java:232)处 ... 16 more

build.gradle:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.0.4'
compile 'com.squareup.retrofit2:retrofit:2.1.0'
compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'

RetrofitInterface.java:

package com.example.khe11e.rxdownloadfile;
import io.reactivex.Observable;
import okhttp3.ResponseBody;
import retrofit2.Call;
import retrofit2.Response;
import retrofit2.http.GET;
import retrofit2.http.Streaming;
import retrofit2.http.Url;

public interface RetrofitInterface {
    // Retrofit 2 GET request for rxjava
    @Streaming
    @GET
    Observable<Response<ResponseBody>> downloadFileByUrlRx(@Url String fileUrl);
}

MainActivity.java:

package com.example.khe11e.rxdownloadfile;

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import java.io.File;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import okhttp3.OkHttpClient;
import okhttp3.ResponseBody;
import okio.BufferedSink;
import okio.Okio;
import retrofit2.Response;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava.RxJavaCallAdapterFactory;

public class MainActivity extends AppCompatActivity {

Button downloadImgBtn;

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);
    downloadImgBtn = (Button) findViewById(R.id.downloadImgBtn);
    downloadImgBtn.setOnClickListener(new View.OnClickListener() {
        @Override
        public void onClick(View v) {
            downloadImage();
        }
    });
}

public void downloadImage(){
    RetrofitInterface downloadService = createService(RetrofitInterface.class, "https://www.nasa.gov/");
    downloadService.downloadFileByUrlRx("sites/default/files/iss_1.jpg")
            .flatMap(processResponse())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(handleResult());
}

public <T> T createService(Class<T> serviceClass, String baseUrl){
    Retrofit retrofit = new Retrofit.Builder()
            .baseUrl(baseUrl)
            .client(new OkHttpClient.Builder().build())
            .addCallAdapterFactory(RxJavaCallAdapterFactory.create()).build();
    return retrofit.create(serviceClass);
}

public Function<Response<ResponseBody>, Observable<File>> processResponse(){
    return new Function<Response<ResponseBody>, Observable<File>>() {
        @Override
        public Observable<File> apply(Response<ResponseBody> responseBodyResponse) throws Exception {
            return saveToDiskRx(responseBodyResponse);
        }
    };
}

private Observable<File> saveToDiskRx(final Response<ResponseBody> response){
    return Observable.create(new ObservableOnSubscribe<File>() {
        @Override
        public void subscribe(ObservableEmitter<File> subscriber) throws Exception {
            String header = response.headers().get("Content-Disposition");
            String filename = header.replace("attachment; filename=", "");
            new File("/data/data/" + getPackageName() + "/images").mkdir();
            File destinationFile = new File("/data/data/" + getPackageName() + "/images/" + filename);

            BufferedSink bufferedSink = Okio.buffer(Okio.sink(destinationFile));
            bufferedSink.writeAll(response.body().source());
            bufferedSink.close();

            subscriber.onNext(destinationFile);
            subscriber.onComplete();
        }
    });
}

private Observer<File> handleResult(){
    return new Observer<File>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.d("OnSubscribe", "OnSubscribe");
        }

        @Override
        public void onNext(File file) {
            Log.d("OnNext", "File downloaded to " + file.getAbsolutePath());
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
            Log.d("Error", "Error " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Log.d("OnComplete", "onCompleted");
        }
    };
}
}

我尝试着按照这里提到的方式添加Call,代码如下:

Call<Observable<Response<ResponseBody>>> downloadFileByUrlRx(@Url String fileUrl);

然而,这会导致flatMap函数出现问题,因为它找不到符号方法flatMap(Function< Response< ResponseBody>,Observable< File>>)。
3个回答

53

您正在使用RxJava1适配器来进行Retrofit通信,建议改用RxJava2版本。

//compile 'com.squareup.retrofit2:adapter-rxjava:2.1.0'
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'

更新

从Retrofit版本2.2.0开始,有了一个官方的RxJava2调用适配器:

compile 'com.squareup.retrofit2:retrofit:2.2.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.2.0'

47

更新:2020年5月

RxJava 3的新适配器

implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'
implementation 'io.reactivex.rxjava3:rxjava:3.0.0'
implementation 'com.squareup.retrofit2:adapter-rxjava3:2.9.0'

使用RxJava3CallAdapterFactory.create()代替RxJava2CallAdapterFactory.create()

compile 'io.reactivex.rxjava2:rxandroid:2.1.1'
compile 'io.reactivex.rxjava2:rxjava:2.2.10'
compile 'com.squareup.retrofit2:retrofit:2.6.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.6.0'

同时,Retrofit要求至少使用Java 7或Android 2.3版本

------------------------------------------

对于新手(2017年7月):

你肯定弄错了库的版本。

我一直在使用最新版本的RXAndroid 2.0.1。

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.0'
compile 'com.squareup.retrofit2:retrofit:2.3.0'
compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0'

1)确保你的Retrofit和Retrofit-RxJava适配器版本一致。

2)使用compile

compile 'com.squareup.retrofit2:adapter-rxjava2:2.3.0' 

compile 'com.squareup.retrofit2:adapter-rxjava:2.3.0'

在构建Retrofit时,使用RxJava2CallAdapterFactory.create()替代RxJavaCallAdapterFactory.create()


4
在构建Retrofit时,请使用RxJava2CallAdapterFactory.create()而不是RxJavaCallAdapterFactory.create() - eC Droid
1
谢谢您的留言。我已经更新了答案。 - Zumry Mohamed
1
请使用 io.reactivex.schedulers.Schedulers 替代 rx.schedulers.Schedulers - Vishal Pandey
1
适配器已经于今天合并到了com.squareup.retrofit2:adapter-rxjava3:2.9.0中。 - AdamHurwitz
看起来现在正常了!谢谢。我刚刚同步了所有更新的依赖项。 - Subhojit Halder
显示剩余4条评论

-5

由于您使用响应头,因此请在所有地方将ResponseBody替换为Object


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接