领域模型:与干净架构和RxJava2一起使用

4
一些背景信息,我试图将一些清晰的架构应用于我的项目之一,并且我在我的存储库的(Realm)磁盘实现方面遇到了问题。我有一个存储库,根据某些条件(缓存)从不同的数据存储中提取一些数据。这是理论,问题出现在将所有这些与UseCases和RxJava2混合时。
首先,我从Realm获取对象列表,然后手动创建一个Observable。但是,预期的是subscribe在不同的线程上执行,因此realm最终会崩溃...(第二个代码块)
这是我用来创建Observables的代码(从抽象类DiskStoreBase):
Observable<List<T>> createListFrom(final List<T> list) {
    return Observable.create(new ObservableOnSubscribe<List<T>>() {
        @Override
        public void subscribe(ObservableEmitter<List<T>> emitter) throws Exception {
            if (list != null) {
                emitter.onNext(list);
                emitter.onComplete();
            } else {
                emitter.onError(new ExceptionCacheNotFound());
            }
        }
    });
}

我该如何处理这种情况?

DiskStoreForZone更多代码:

@Override
public Observable<List<ResponseZone>> entityList() {
    Realm realm = Realm.getDefaultInstance();
    List<ResponseZone> result = realm.where(ResponseZone.class).findAll();
    return createListFrom(result);
}

具体的崩溃情况:

E/REALM_JNI: jni: ThrowingException 8, Realm accessed from incorrect thread.
E/REALM_JNI: Exception has been thrown: Realm accessed from incorrect thread.

请问您能否分享崩溃日志。 - Gaurav Singh
@Gaurav 好的,已完成。 - iroyo
它不起作用是因为尽管使用了Rx,但您的数据层不是响应式的。 - EpicPandaForce
@EpicPandaForce 嗯,好的,你能详细说明一下吗?我使用远程DataStore时,我使用Retrofit的Observable变体,而在Realm中,我尝试使用内置的Rx机制,但是...我无法将其与代码的其他部分连接起来。你所说的“不具有响应性”是什么意思? - iroyo
2
它不能工作是因为你正在不同于调用entityList()的线程上进行观察。暂时地,你需要避免这样做或使用copyFromRealm()将Realm数据复制到Java内存中。 - Christian Melchior
1个回答

15

由于您的数据层不具备响应式特性,因此它无法正常工作,尽管使用了 Rx。

Realm 本质上是一个具有响应式数据源,其管理的对象也是可变的(由 Realm 原地更新)并且线程限制的(只能在打开 Realm 的同一线程上访问)。

要使您的代码起作用,您需要从 Realm 中复制数据。

@Override
public Single<List<ResponseZone>> entityList() {
    return Single.fromCallable(() -> {
       try(Realm realm = Realm.getDefaultInstance()) {
           return realm.copyFromRealm(realm.where(ResponseZone.class).findAll());
       }
    });
}

我冒昧地将您的Single表示为Single,考虑到它不是Observable,它不会监听变化,只有1个事件,即列表本身。因此,通过ObservableEmitter发送它没有意义,因为它不会发出事件。

因此,这就是我说的原因:您的数据层不具有响应性。您没有监听变化。您只是直接获取数据,并且永远不会收到任何更改的通知;尽管使用了Rx。


我在画图板中画了一些图片来说明我的观点。(蓝色表示的是副作用)

clean-architecture non-reactive

在您的情况下,您调用一次性操作从多个数据源(缓存、本地、远程)检索数据。一旦您获取它,您就不会监听变化;如果您在一个地方编辑数据并在另一个地方,更新的唯一方法是“强制缓存手动检索新数据”;对于此,您必须知道您在其他地方修改了数据。为此,您需要一种直接调用回调或发送消息/事件(通知更改)的方法。

因此,在某种程度上,您必须创建一个缓存失效通知事件。如果您监听它,解决方案可能再次具有响应性。除了您手动执行此操作。

----------------------------------------------------------------------

考虑到Realm已经是一种响应式数据源(类似于用于SQLite的SQLBrite),它能够通过提供更改通知来“使您的缓存失效”。

事实上,如果您的本地数据源是唯一的数据源,并且从网络中的任何写入都是您监听的更改,则您的“缓存”可以写为replay(1).publish().refCount()(重复最新数据以供新订阅者,使用新数据替换数据进行评估),这是RxReplayingShare

clean architecture reactive

使用从处理程序线程的循环器创建的调度程序,您可以在后台线程上监听Realm中的更改,创建一种响应式数据源,返回最新的未管理副本,您可以在线程之间传递它们(尽管直接映射到不可变领域模型比copyFromRealm()更受欢迎,如果您选择这条路线-路线是清晰的架构)。

return io.reactivex.Observable.create(new ObservableOnSubscribe<List<ResponseZone>>() {
    @Override
    public void subscribe(ObservableEmitter<List<ResponseZone>> emitter)
            throws Exception {
        final Realm observableRealm = Realm.getDefaultInstance();
        final RealmResults<ResponseZone> results = observableRealm.where(ResponseZone.class).findAllAsync();
        final RealmChangeListener<RealmResults<ResponseZone>> listener = results -> {
            if(!emitter.isDisposed()) {
                if(results.isValid() && results.isLoaded()) {
                    emitter.onNext(observableRealm.copyFromRealm(results));
                }
            }
        };

        emitter.setDisposable(Disposables.fromRunnable(() -> {
            if(results.isValid()) {
                results.removeChangeListener(listener);
            }
            observableRealm.close();
        }));
        results.addChangeListener(listener);
        // initial value will be handled by async query
    }
}).subscribeOn(looperScheduler).unsubscribeOn(looperScheduler);

Looper调度程序的获取方式为

    handlerThread = new HandlerThread("LOOPER_SCHEDULER");
    handlerThread.start();
    synchronized(handlerThread) {
        looperScheduler  = AndroidSchedulers.from(handlerThread.getLooper());
    }

这就是如何使用Realm创建响应式的清洁架构。


补充:

如果您打算在Realm上实际实施Clean Architecture,那么需要使用LooperScheduler。这是因为默认情况下,Realm鼓励您将数据对象用作域模型,并提供惰性加载的线程本地视图,当更新时会原地变异;但是Clean Architecture则建议您使用不可变的域模型(与数据层无关)。因此,如果您想要创建响应式的清洁架构,在任何时候当Realm发生更改时都要从后台线程复制,则需要一个looper scheduler(或在后台线程上观察,但要从刷新的Realm上进行复制Schedulers.io())。

通常情况下,您会希望使用RealmObjects作为域模型,并依赖于惰性计算。在这种情况下,您不使用copyFromRealm(),也不将RealmResults映射到其他内容;但可以将其公开为FlowableLiveData

您可以在这里阅读相关内容。


嘿,感谢您的回答。它确实有帮助,但我很好奇为什么要使用looper调度程序。我想知道是否一些默认的调度程序,比如io调度程序也可以胜任? - ocross
3
只有当您打算在Realm上实际执行Clean Architecture时,才需要使用LooperScheduler。这是因为Realm鼓励您将数据对象用作域模型,并提供延迟加载的本地线程视图作为好处,在更新时就地进行修改;但是Clean Architecture要求您使用不可变的域模型(独立于数据层)。因此,如果您想创建反应性干净架构,在每次Realm更改时从后台线程复制,则需要一个looper调度程序。 - EpicPandaForce

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接