RxJava多线程与Realm - 从错误的线程访问Realm

8

背景

我在我的应用程序中使用 Realm。当数据加载完毕后,会进行密集处理,因此处理过程发生在后台线程。

所使用的编码模式是工作单元模式,而 Realm 仅存在于 DataManager 下的存储库中。这里的想法是每个存储库可以有不同的数据库/文件存储解决方案。

我尝试的方法

以下是类似于我在 FooRespository 类中所拥有代码的示例。

这里的想法是获取一个 Realm 实例,用于查询感兴趣的对象的 Realm,将它们返回并关闭 Realm 实例。请注意,这是同步的,并且最后将对象从 Realm 复制到未管理的状态。

public Observable<List<Foo>> getFoosById(List<String> fooIds) {

    Realm realm = Realm.getInstance(fooRealmConfiguration);

    RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);

    for(String id : fooIds) {

        findFoosByIdQuery.equalTo(Foo.FOO_ID_FIELD_NAME, id);
        findFoosByIdQuery.or();
    }

    return findFoosByIdQuery
            .findAll()
            .asObservable()
            .doOnUnsubscribe(realm::close)
            .filter(RealmResults::isLoaded)
            .flatMap(foos -> Observable.just(new ArrayList<>(realm.copyFromRealm(foos))));
}

这段代码随后会与通过RxJava进行的重型处理代码一起使用:

dataManager.getFoosById(foo)
            .flatMap(this::processtheFoosInALongRunningProcess)
            .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
            .subscribe(tileChannelSubscriber);

阅读文档后,我相信上述内容应该可以工作,因为它不是异步的,因此不需要循环线程。我在同一线程内获取了realm实例,因此它没有在线程之间传递,对象也没有被传递。
问题是,当执行上述操作时,我得到了“从错误的线程访问Realm。只能在创建它们的线程上访问Realm对象。”这似乎不正确。我唯一能想到的是,Realm实例池正在为我提供使用主线程创建的另一个进程中创建的现有实例。

你是指要对我发布的整个链进行 flatmap,还是只针对第一部分? - Graham Smith
我不是专家,但我认为您需要从相同的线程中获取Realm对象,该线程是 Schedulers.io 对吗?可能像这样: .flatMap(this::processtheFoosInALongRunningProcess) .subscribeOn(Schedulers.io()) //也可以是 Schedulers.computation() 等等 .subscribe(tileChannelSubscriber);``` 语法方面不太确定。 - wint
或者这个是否有效?只是为了看看我的假设是否正确 :P .findFoosByIdQuery.equalTo(Foo.FOO_ID_FIELD_NAME, id); .findAll() .asObservable() .flatMap(this::processtheFoosInALongRunningProcess) .subscribeOn(Schedulers.io()) //可能是 Schedulers.computation() 等 .subscribe(tileChannelSubscriber);``` - wint
@AssortedTrailmix 错了,copyFromRealm 会将 RealmObject 从底层 Realm 中分离出来,而非受线程限制的未受管控副本。 - EpicPandaForce
@AssortedTrailmix 啊,是的,我想这有点新。正如我们所讨论的,这会破坏Realms的零拷贝和一致性保证,但同时也能够支持目前Realm不支持的架构模式。是的,没错。 - EpicPandaForce
显示剩余4条评论
2个回答

3

Kay so

return findFoosByIdQuery
        .findAll()
        .asObservable()

这是在UI线程上发生的,因为最初你是从那里调用它的。
.subscribeOn(Schedulers.io())

然后你会在 Schedulers.io() 上对它们进行调试。

不是同一个线程!

虽然我不喜欢从零拷贝数据库中 复制 的方法,但由于滥用了 realmResults.asObservable() 导致你当前的方法存在许多问题,因此这里提供了一个提示,告诉你代码应该是什么样子的:

public Observable<List<Foo>> getFoosById(List<String> fooIds) {
   return Observable.defer(() -> {
       try(Realm realm = Realm.getInstance(fooRealmConfiguration)) { //try-finally also works
           RealmQuery<Foo> findFoosByIdQuery = realm.where(Foo.class);
           for(String id : fooIds) {
               findFoosByIdQuery.equalTo(FooFields.ID, id);
               findFoosByIdQuery.or(); // please guarantee this works?
           }
           RealmResults<Foo> results = findFoosByIdQuery.findAll();
           return Observable.just(realm.copyFromRealm(results));
       }
   }).subscribeOn(Schedulers.io());
}

顺便提一下,值得注意的是,另一种解决方案行不通,因为asObservable()会在非循环后台线程上崩溃。只是说一下。 - EpicPandaForce
1
在我的文章中有一个例子 https://medium.com/@Zhuinden/how-to-use-realm-for-android-like-a-champ-and-how-to-tell-if-youre-doing-it-wrong-ac4f66b7f149#.ll9uqyowi 当你到达那里时,你会看到它的(简短的答案是,在looper线程上,也就是UI线程上,它会自动为其添加一个变更监听器来通知您,并在您取消订阅时将其删除)。 - EpicPandaForce
一旦您分离了 RealmObjects,您就失去了自动更新并复制了所有元素,但您可以在不同线程之间随意传递它们。 - EpicPandaForce
由于我自己不使用 Realm,所以我的解决方案可能不像 @EpicPandaForce 所提到的那样有效。你的异常原因仍然是线程混合。只有一个评论/问题:为什么要在 getFoosById() 中添加 subscribeOn()?这不是最好留给客户端自行决定吗? - Wolfram Rittmeyer
1
@Wolfram 我想这是正确的,subscribeOn 不是必需的,只是为了确保它不在前台线程上运行(但是,是的,你的代码将无法工作,在 asObservable 中会崩溃)。 - EpicPandaForce
显示剩余4条评论

2
请注意,您正在创建实例,该实例位于所有RxJava处理管道之外。因此,在调用getFoosById()时,它在主线程(或您所在的任何线程)上运行。
仅因为该方法返回一个Observable并不意味着它在另一个线程上运行。只有由getFoosById()方法的最后一个语句创建的Observable的处理管道才在正确的线程上运行(过滤器filter()flatMap()以及调用者执行的所有处理)。
因此,您必须确保在使用Schedulers.io()的线程上已经完成了对getFoosById()的调用。
实现此目的的一种方法是使用Observable.defer()
Observable.defer(() -> dataManager.getFoosById(foo))
            .flatMap(this::processtheFoosInALongRunningProcess)
            .subscribeOn(Schedulers.io()) //could be Schedulers.computation() etc
            .subscribe(tileChannelSubscriber);

1
沃尔夫拉姆,很高兴你看到了这个。我明天早上会尝试。这段代码可以轻松添加到我的DataManager中,它位于调用者和存储库之间。 - Graham Smith

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接