RxJava + Realm从错误的线程访问

4

我在读写Realm时遇到了这个异常

06-19 09:49:26.352 11404-11404/****** E/ContentValues: loadData: OnError Realm访问错误的线程。Realm对象只能在创建它们的线程上访问。 java.lang.IllegalStateException: Realm访问错误的线程。Realm对象只能在创建它们的线程上访问。 at io.realm.BaseRealm.checkIfValid(BaseRealm.java:385) at io.realm.RealmResults.isLoaded(RealmResults.java:115) at io.realm.OrderedRealmCollectionImpl.size(OrderedRealmCollectionImpl.java:307) at io.realm.RealmResults.size(RealmResults.java:60) at java.util.AbstractCollection.isEmpty(AbstractCollection.java:86) at /****** .lambda$loadData$0(SplashPresenter.java:42) at /****** $$Lambda$1.test(Unknown Source) at io.reactivex.internal.operators.observable.ObservableFilter$FilterObserver.onNext(ObservableFilter.java:45) at io.reactivex.observers.SerializedObserver.onNext(SerializedObserver.java:111) at io.reactivex.internal.operators.observable.ObservableDelay$DelayObserver$1.run(ObservableDelay.java:84) at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59) at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51) at java.util.concurrent.FutureTask.run(FutureTask.java:237) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:272) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1133) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:607) at java.lang.Thread.run(Thread.java:761)

这是代码:

  mSubscribe = Observable.just(readData())
            .delay(DELAY, TimeUnit.SECONDS)
            .filter(value -> !value.isEmpty())
            .switchIfEmpty(createRequest())
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()).subscribe(data -> {
                getView().hideLoading();
                writeData(data);
            }, 
           (throwable -> {
            }));

读取数据

  private List<CategoryModel> readData() {
    Realm defaultInstance = Realm.getDefaultInstance();
    List<CategoryModel> title = defaultInstance.where(CategoryModel.class).findAllSorted("title");

    defaultInstance.close();
    return title;
}

写入数据

private void writeData(List<CategoryModel> categoryModels) {

        try {
            Realm defaultInstance = Realm.getDefaultInstance();
            defaultInstance.executeTransactionAsync(realm -> realm.insertOrUpdate(categoryModels));
            defaultInstance.close();
        } finally {
            getView().notifyActivity(categoryModels);
        }
    }

我该如何使用正确的线程来遵循这个逻辑?

删除 .observeOn(AndroidSchedulers.mainThread()) 应该可以解决问题,并且只要读取操作也在写入事务内部发生,这种方法应该是可靠的。 - EpicPandaForce
顺便提一下,阅读 https://realm.io/docs/java/latest/#using-a-realm-across-threads。 - EpicPandaForce
似乎没有主线程,因为我正在调用视图来处理加载。android.view.ViewRootImpl$CalledFromWrongThreadException: 只有创建视图层次结构的原始线程才能触摸其视图。 - rafaelasguerra
然后它应该是subscribeOn(AndroidSchedulers.mainThread()).observeOn(AndroidSchedulers.mainThread()); - EpicPandaForce
你的悬赏即将到期,伙计。 - EpicPandaForce
4个回答

6

在跨线程使用Realm时唯一的规则是要记住,不能在不同线程之间传递Realm、RealmObject或RealmResults实例

当您想从不同的线程访问相同的数据时,应该简单地获取一个新的Realm实例(即Realm.getDefaultInstance()),并通过查询获取您的对象(然后在线程结束时关闭Realm)。

对象将映射到相同的磁盘上的数据,并且可以从任何线程读取和写入!您还可以使用realm.executeTransactionAsync()在后台线程上运行代码,例如this


我刚刚更新了帖子。它仍然存在相同的问题。 - rafaelasguerra
很奇怪,但是返回异常的那一行是: .filter(value -> !value.isEmpty()) - rafaelasguerra
“value”在这里是什么意思?它是来自readData()函数的List<CategoryModel>吗? - Ajeet Choudhary
是的,我的方法readData返回List<CategoryModel>。CategoryModel继承自RealmObject。 - rafaelasguerra
请尝试直接在readData()中使用,而不是value.isEmpty(),这样可以在非UI线程中工作,并在UI线程中获取数据。 - Ajeet Choudhary
让我们在聊天室继续这个讨论。http://chat.stackoverflow.com/rooms/147033/discussion-between-rguerra-and-ajeet-choudhary - rafaelasguerra

2

如何使用正确的线程来遵循这个逻辑?

不要试图在UI线程上读取 Schedulers.io()(毕竟,Realm提供了自动更新的懒加载代理视图,在UI线程上为您的数据提供更改通知)。


因此,应该这样做:

 mSubscribe = Observable.just(readData())
        .delay(DELAY, TimeUnit.SECONDS)
        .filter(value -> !value.isEmpty())
        .switchIfEmpty(createRequest())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread()).subscribe(data -> {
            getView().hideLoading();
            writeData(data);
        }, 
       (throwable -> {
        }));

private List<CategoryModel> readData() {
    Realm defaultInstance = Realm.getDefaultInstance();
    List<CategoryModel> title = defaultInstance.where(CategoryModel.class).findAllSorted("title");

    defaultInstance.close();
    return title;
}

private void writeData(List<CategoryModel> categoryModels) {
    try {
        Realm defaultInstance = Realm.getDefaultInstance();
        defaultInstance.executeTransactionAsync(realm -> realm.insertOrUpdate(categoryModels));
        defaultInstance.close();
    } finally {
        getView().notifyActivity(categoryModels);
    }
}

您应该拥有类似以下的内容:

private Observable<List<CategoryModel>> readData() { // Flowable with LATEST might be better.
    return io.reactivex.Observable.create(new ObservableOnSubscribe<List<CategoryModel>>() {
        @Override
        public void subscribe(ObservableEmitter<List<CategoryModel>> emitter)
                throws Exception {
            final Realm observableRealm = Realm.getDefaultInstance();
            final RealmResults<CategoryModel> results = observableRealm.where(CategoryModel.class).findAllSortedAsync("title");
            final RealmChangeListener<RealmResults<CategoryModel>> listener = results -> {
                if(!emitter.isDisposed() && results.isLoaded()) {
                    emitter.onNext(results);
                }
            };

            emitter.setDisposable(Disposables.fromRunnable(() -> {
                if(results.isValid()) {
                    results.removeChangeListener(listener);
                }
                observableRealm.close();
            }));
            results.addChangeListener(listener);
        }
    }).subscribeOn(AndroidSchedulers.mainThread())
            .unsubscribeOn(AndroidSchedulers.mainThread());
}

private void setSubscription() {
    mSubscribe = readData()
            .doOnNext((list) -> {
                if(list.isEmpty()) {
                    Single.fromCallable(() -> this::createRequest)
                            .subscribeOn(Schedulers.io())
                            .subscribe((data) -> {
                                writeData(data);
                            });
                }
            }).subscribe(data -> {
                if(!data.isEmpty()) {
                    getView().hideLoading();
                    getView().notifyActivity(data);
                }
            }, throwable -> {
                throwable.printStackTrace();
            });
}

private void writeData(List<CategoryModel> categoryModels) {
    try(Realm r = Realm.getDefaultInstance()) {
        r.executeTransaction(realm -> realm.insertOrUpdate(categoryModels));
    }
}

void unsubscribe() {
    mSubscribe.dispose();
    mSubscribe = null;
}

如果我没有搞错的话,按照这种方式,您最终将拥有此处此处描述的响应式数据层,但不需要映射整个结果的额外开销。
编辑:自Realm 4.0以来,可以直接将RealmResults公开为Flowable(在UI线程或后台循环线程上)。
public Flowable<List<MyObject>> getLiveResults() {
    try(Realm realm = Realm.getDefaultInstance()) {
        return realm.where(MyObject.class) 
                    .findAllAsync()
                    .asFlowable()
                    .filter(RealmResults::isLoaded);
    }
}

@Radu 取决于你遇到了什么问题。 - EpicPandaForce
我遇到了这段代码产生的绝对垃圾问题。Realm与线程不兼容,因此也与Rxjava不兼容。使用GSON自动反序列化的对象不能传递到UI线程,因为它们是Realm对象...绝对垃圾。我必须创建两个POJO,一个用于Realm,另一个用于UI线程和网络请求... - Radu
GSON反序列化的对象应该在后台线程中保存到Realm中,而RealmResults + RealmChangeListener(或作为Flowable公开的RealmResults)将接收新状态。没有必要直接将GSON响应发送到UI线程。也没有必要手动处理线程之间的传递,因为RealmResults已经在内部管理了这一点,所以您只需要订阅它即可。你看过文档了吗? - EpicPandaForce
我不知道你在说什么。我在RxJava 2内部使用GSON来反序列化我的POJOs,然后在UI线程上接收它们并希望使用它们。这会导致问题,因为在后台线程上创建的Realm对象无法在UI线程上访问。根据文档... https://dev59.com/21sX5IYBdhLWcg3wHcTA https://gist.github.com/cmelchior/ddac8efd018123a1e53a - Radu

1
你需要从领域对象中提取所需数据到POJO,并使用map运算符发出POJO,以便视图对象可以使用Android主线程上的pojo更新来自领域的数据。

-1

你只能在事务中或者在读/写这些对象的线程中操作Realm对象。在你的情况下,你从readData方法获取了一个RealmResult,并使用RxJava切换了线程,导致了异常。使用copyFromRealm从realm中获取数据,它将返回纯对象而不是realm对象。


1
显然,我们懂英语。如何解决这个问题才是关键! - Idee

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接