在Activity生命周期中,处理RxJava/RxAndroid订阅的正确方法是什么?

18

我刚开始学习RxJava/RxAndroid。我想避免上下文泄漏,所以我创建了一个类似于BaseFragment的基础片段:

public abstract class BaseFragment extends Fragment {

    protected CompositeSubscription compositeSubscription = new CompositeSubscription();

    @Override
    public void onDestroy() {
        super.onDestroy();

        compositeSubscription.unsubscribe();
    } 
} 

在扩展BaseFragment的我的片段中,我正在执行以下操作:

protected void fetchNewerObjects(){
        if(!areNewerObjectsFetching()){ //if it is not already fetching newer objects

            Runtime.getRuntime().gc();//clean out memory if possible

            fetchNewObjectsSubscription = Observable
                .just(new Object1())
                .map(new Func1<Object1, Object2>() {
                    @Override
                    public Object2 call(Object1 obj1) {
                        //do bg stuff
                        return obj2;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Object2>() {
                    @Override
                    public void onCompleted() {
                        compositeSubscription.remove(fetchNewObjectsSubscription);
                        fetchNewObjectsSubscription = null;
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(ArrayList<NewsFeedObject> newsFeedObjects) {
                        //do stuff
                    }
                });

        //add subscription to composite subscription so it can be unsubscribed onDestroy()
        compositeSubscription.add(fetchNewObjectsSubscription);
    }
}

protected boolean areNewerObjectsFetching(){
    if(fetchNewObjectsSubscription == null || fetchNewObjectsSubscription.isUnsubscribed()){ //if its either null or is in a finished status
        return false;
    }
    return true;
}

因此,我想问两个问题:

  1. 我在onDestroy()上取消订阅,这会阻止上下文泄漏吗?

  2. 我是否正确地跟踪观察器是否正在“运行”,并在完成后将订阅设置为null并检查其是否为空?

2个回答

12
  1. 是的,它会停止,但你也应该在 onError 中将订阅设置为 null (或者在出现错误后,你将不再加载项目)。

    同时,请不要忘记片段可以停止,但不会被销毁(例如在返回堆栈中),你可能不希望在这种情况下观察任何内容。如果你将 unsubscribeonDestroy 移动到 onStop,请不要忘记在每次创建视图时在 onCreateView 中初始化 compositeSubscription(因为在 CompositeSubscription 被取消订阅后,你就不能再在其中添加订阅了)。

  2. 是的,正确的。但我认为可以省略 compositeSubscription.remove,因为你已经检查了是否为 null。


如果我在onStop中取消订阅,而它从未更新我的视图,那么我是否需要在onResume中重新运行fetchNewObjects()? - Sree
2
还有,为什么是onCreate?你是不是想说onCreateView? - Sree
我更喜欢在 onCreate 中执行,因为它在 onCreateView 之前被调用,所以 compositeSubscription 在使用前已经准备就绪。 - marwinXXII
2
但根据文档,它永远不会触发onCreate()。它从onDestroyView()到onCreateView()。http://developer.android.com/intl/zh-cn/guide/components/fragments.html - Sree
抱歉,在onCreateView中,我更新了答案。关于onStop - 取消订阅“停止”处理可观察对象,所以如果数据尚未加载,是的,您可能需要再次加载数据。 - marwinXXII

6
您不需要第三方库来管理Activity的生命周期。请尝试以下代码:

您不需要任何第三方库来管理Activity生命周期。请尝试以下代码:

public class LifecycleBinder {

    public static <R> Observable.Transformer<R, R> subscribeUtilEvent(final Activity target, LifecycleEvent event) {
        final Application app = target.getApplication();
        final PublishSubject<LifecycleEvent> publishSubject = PublishSubject.create();
        final Application.ActivityLifecycleCallbacks callbacks = new Application.ActivityLifecycleCallbacks() {
            @Override
            public void onActivityCreated(Activity activity, Bundle savedInstanceState) {

            }

            @Override
            public void onActivityStarted(Activity activity) {

            }

            @Override
            public void onActivityResumed(Activity activity) {

            }

            @Override
            public void onActivityPaused(Activity activity) {
                if (activity == target)
                    publishSubject.onNext(LifecycleEvent.ON_PAUSED);
            }

            @Override
            public void onActivityStopped(Activity activity) {
                if (activity == target)
                    publishSubject.onNext(LifecycleEvent.ON_STOPPED);
            }

            @Override
            public void onActivitySaveInstanceState(Activity activity, Bundle outState) {
                if (activity == target)
                    publishSubject.onNext(LifecycleEvent.ON_SAVE_INSTANCE_STATE);
            }

            @Override
            public void onActivityDestroyed(Activity activity) {
                if (activity == target)
                    publishSubject.onNext(LifecycleEvent.ON_DESTROYED);
            }
        };

        app.registerActivityLifecycleCallbacks(callbacks);
        return subscribeUtilEvent(publishSubject, event, new Action0() {
            @Override
            public void call() {
                app.unregisterActivityLifecycleCallbacks(callbacks);
            }
        });
    }

    public static <R> Observable.Transformer<R, R> subscribeUtilEvent(final Fragment target, LifecycleEvent event) {
        final FragmentManager manager = target.getFragmentManager();
        if (manager == null) {
            throw new NullPointerException("fragment manager is null!");
        }

        final PublishSubject<LifecycleEvent> publishSubject = PublishSubject.create();
        final FragmentManager.FragmentLifecycleCallbacks callbacks = manager.new FragmentLifecycleCallbacks() {

            @Override
            public void onFragmentPreAttached(FragmentManager fm, Fragment f, Context context) {
            }

            @Override
            public void onFragmentAttached(FragmentManager fm, Fragment f, Context context) {
            }

            @Override
            public void onFragmentCreated(FragmentManager fm, Fragment f, Bundle savedInstanceState) {
            }

            @Override
            public void onFragmentActivityCreated(FragmentManager fm, Fragment f, Bundle savedInstanceState) {
            }

            @Override
            public void onFragmentViewCreated(FragmentManager fm, Fragment f, View v, Bundle savedInstanceState) {
            }

            @Override
            public void onFragmentStarted(FragmentManager fm, Fragment f) {
            }

            @Override
            public void onFragmentResumed(FragmentManager fm, Fragment f) {
            }

            @Override
            public void onFragmentPaused(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_PAUSED);
            }

            @Override
            public void onFragmentStopped(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_STOPPED);
            }

            @Override
            public void onFragmentSaveInstanceState(FragmentManager fm, Fragment f, Bundle outState) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_SAVE_INSTANCE_STATE);
            }

            @Override
            public void onFragmentViewDestroyed(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_VIEW_DESTORYED);
            }

            @Override
            public void onFragmentDestroyed(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_DESTROYED);
            }

            @Override
            public void onFragmentDetached(FragmentManager fm, Fragment f) {
                if (f == target)
                    publishSubject.onNext(LifecycleEvent.ON_DESTROYED);
            }
        };
        manager.registerFragmentLifecycleCallbacks(callbacks, true);

        return subscribeUtilEvent(publishSubject, event, new Action0() {
            @Override
            public void call() {
                manager.unregisterFragmentLifecycleCallbacks(callbacks);
            }
        });
    }

    private static <R, T> Observable.Transformer<R, R> subscribeUtilEvent(final Observable<T> source, final T event, final Action0 doOnComplete) {
        return new Observable.Transformer<R, R>() {
            @Override
            public Observable<R> call(Observable<R> rObservable) {
                return rObservable.takeUntil(takeUntilEvent(source, event)).doOnCompleted(doOnComplete);
            }
        };
    }

    private static <T> Observable<T> takeUntilEvent(final Observable<T> src, final T event) {
        return src.takeFirst(new Func1<T, Boolean>() {
            @Override
            public Boolean call(T lifecycleEvent) {
                return lifecycleEvent.equals(event);
            }
        });
    }
}

生命周期事件:
public enum LifecycleEvent {
    ON_PAUSED,
    ON_STOPPED,
    ON_SAVE_INSTANCE_STATE,
    ON_DESTROYED,
    ON_VIEW_DESTORYED,
    ON_DETACHED,
}

使用方法:

myObservable
   .compose(LifecycleBinder.subscribeUtilEvent(this, LifecycleEvent.ON_DESTROYED))
   .subscribe();

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接