RxJava和Retrofit2:NetworkOnMainThreadException

18

我意识到我正在主线程上使用 subscribeOn()/observeOn()。 我可以向 subscribeOn() 传递哪些选项集? 我可以向 observeOn() 传递哪些选项集?

12-17 21:36:09.154 20550-20550/rx.test D/MainActivity2: [onCreate]
12-17 21:36:09.231 20550-20550/rx.test D/MainActivity2: starting up observable...
12-17 21:36:09.256 20550-20550/rx.test D/MainActivity2: [onError] 
12-17 21:36:09.256 20550-20550/rx.test W/System.err: android.os.NetworkOnMainThreadException

GovService.java

import java.util.List;
import retrofit.Call;
import retrofit.http.GET;
import rx.Observable;

public interface GovService {
    @GET("/txt2lrn/sat/index_1.json")
    Observable<MyTest> getOneTestRx();
}

MyTest.java

public class MyTest {
    private String name, url;
    private int num;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getUrl() {
        return url;
    }

    public void setUrl(String url) {
        this.url = url;
    }

    public int getNum() {
        return num;
    }

    public void setNum(int num) {
        this.num = num;
    }

    @Override
    public String toString() {
        return "Name: " + this.name + ", num: " + this.num + ", url: " + this.url;
    }
}

MainActivity2.java

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.support.v7.widget.DefaultItemAnimator;
import android.support.v7.widget.LinearLayoutManager;
import android.support.v7.widget.RecyclerView;
import android.support.v7.widget.Toolbar;
import android.util.Log;

import retrofit.GsonConverterFactory;
import retrofit.Retrofit;
import retrofit.RxJavaCallAdapterFactory;
import rx.Observable;
import rx.Subscriber;
import rx.android.schedulers.AndroidSchedulers;
import rx.schedulers.Schedulers;

public class MainActivity2 extends AppCompatActivity {
    private final String TAG = getClass().getSimpleName();

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        Log.d(TAG, "[onCreate]");
        setContentView(R.layout.activity_main);
        Toolbar toolbar = (Toolbar) findViewById(R.id.toolbar);
        setSupportActionBar(toolbar);
        RecyclerView mRV = (RecyclerView) findViewById(R.id.rv);
        mRV.setLayoutManager(new LinearLayoutManager(this));// setup LayoutManager
        mRV.setItemAnimator(new DefaultItemAnimator());// setup ItemAnimator

        // setup retrofit
        Retrofit retrofit = new Retrofit.Builder()
                .baseUrl("http://goanuj.freeshell.org")
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                .build();
        GovService service = retrofit.create(GovService.class);

        Log.d(TAG, "starting up observable...");
        Observable<MyTest> o = service.getOneTestRx();
        o.subscribeOn(Schedulers.io());
        o.observeOn(AndroidSchedulers.mainThread());
        o.subscribe(new Subscriber<MyTest>() {
            @Override
            public void onCompleted() {
                Log.d(TAG, "[onCompleted] ");
            }

            @Override
            public void onError(Throwable t) {
                Log.d(TAG, "[onError] ");
                t.printStackTrace();
            }

            @Override
            public void onNext(MyTest m) {
                Log.d(TAG, "[onNext] " + m.toString());
            }
        });
    }
}

尝试使用.subscribeOn(Schedulers.newThread())。这将在新线程中执行Observable。 - M D
3个回答

38
将代码的最后一部分重写为:
service.getOneTestRx()
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<MyTest>() {
        @Override
        public void onCompleted() {
            Log.d(TAG, "[onCompleted] ");
        }

        @Override
        public void onError(Throwable t) {
            Log.d(TAG, "[onError] ");
            t.printStackTrace();
        }

        @Override
        public void onNext(MyTest m) {
            Log.d(TAG, "[onNext] " + m.toString());
        }
    });

@akarnokd的重要提示:

需要提醒的是,必须像这样链接调用,因为Observable不是建造者模式(您修改现有对象的设置的模式)


3
值得一提的是,由于Observable不是建造者模式(可以修改现有对象的设置),所以需要像这样链接调用。 - akarnokd
5
解释也值得一提。为什么OP应该这样重写,为什么你的变体是正确的?我不是为自己问。一个好的答案应该有这个。 - Artem Novikov
1
有没有用 RxJava 2 的想法? - Incinerator

8

您应该调用 Observable.unsubscribeOn(Schedulers.io()),在http请求结束时,retrofit 将取消订阅。

retrofit-rxjava-adapterRxJavaCallAdapterFactory中进行了如下操作。

它的行为就像这样。

if (!subscriber.isUnsubscribed()) {
    subscriber.onCompleted();
}

但是当 subscriber 是一个 SafeSubscriber 时,最终会调用 unsubscribe 方法。

我在我的应用程序中遇到了这个问题。

完整代码:

o.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .unsubscribeOn(Schedulers.io());

1
是的,在这里也是这个解决方案:https://github.com/square/retrofit/issues/1328 - 这似乎将在 retrofit 2 的发布版本中修复,在 okhttp 2.7 发布后(它已经发布了)。 - ahmedre
2
@ahmedre他们永远不会...“这不是Retrofit或OkHttp的问题”。 - Inoy

0

在你的Retrofit构建器中添加RxJavaCallAdapterFactory.createAsync()将解决这个问题并创建异步可观察对象。

Retrofit retrofit = new Retrofit.Builder()
                       .client(client) 
                       .baseUrl(httpsUrl) 
                       .setLogLevel(LogLevel.FULL)
                       .addCallAdapterFactory(RxJavaCallAdapterFactory.createAsync()) 
                       .addConverterFactory(GsonConverterFactory.create()) 
                       .build();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接