RxJava在concat map后更改线程

4

大家好,我在进行 RxJava 和 SQLite 的开发时遇到了一些死锁问题。我的问题是:

  1. 我在一个线程上开始一个事务
  2. 调用 Web 服务并将一些内容保存到数据库中
  3. 连接映射到另一个可观察的函数
  4. 尝试写入其他内容到数据库——> 得到死锁

以下是我的代码:

//define a scheduler for managing transaction in the same thread
private Scheduler mScheduler = Schedulers.from(Executors.newSingleThreadExecutor());


Observable.just(null)
            /* Go to known thread to open db transaction */
            .observeOn(mScheduler)
            .doOnNext(o -> myStore.startTransaction())
            /* Do some treatments that change thread */
            .someWebServiceCallWithRetrofit()
            /* Return to known thread to save items in db */
            .observeOn(mScheduler)
            .flatMap(items -> saveItems(items))
            .subscribe();

public Observable<Node> saveItems(List<Item> items) {
    Observable.from(items)
            .doOnNext(item -> myStore.saveItem(item)) //write into the database OK
            .concatMap(tab -> saveSubItems(item));
}

public Observable<Node> saveSubItems(Item item) {
    return Observable.from(item.getSubItems())
            .doOnNext(subItem -> myStore.saveSubItems(subItem)) //DEADLOCK thread is different
}

突然间RxJava为什么要更改线程呢?即使我已经指定了要在自己的调度程序上观察。我通过在saveSubItem之前添加另一个observeOn来进行了一次粗略修复,但这可能不是正确的解决方案。
我知道当使用retrofit调用Web服务时,响应会被转发到新线程(这就是为什么我创建了自己的调度程序以返回我启动sql事务的线程)。但是,我真的不明白RxJava如何管理线程。
非常感谢您的帮助。
2个回答

1

副作用操作符(如flatMap)会在调用它的线程上同步执行。可以尝试以下代码:

Observable.just(null)            
          .doOnNext(o -> myStore.startTransaction())
          .subscribeOn(mScheduler)      // Go to known thread to open db transaction 
            /* Do some treatments that change thread */
          .someWebServiceCallWithRetrofit()                      
          .flatMap(items -> saveItems(items))
          .subscribeOn(mScheduler) // Return to known thread to save items in db
          .observeOn(mScheduler) // Irrelevant since we don't observe anything
          .subscribe();

-1
根据我的了解,doOnNext方法在不同的线程中调用,而它之前的代码是在其余序列之外异步运行的。
例如:您可以进行多个REST调用,将其保存到数据库中,并在doOnNext(...)内部通知视图/Presenter/Controller进度。您可以在保存到数据库之前或之后执行此操作。 我建议您使用"flatMapping"代码。
因此,如果myStore.saveSubItems返回结果,saveItems方法将如下所示:
public Observable<Node> saveSubItems(Item item) {
return Observable.from(item.getSubItems())
        .flatMap(subItem -> myStore.saveSubItems(subItem))
}

使用“flatMapping”可以保证操作在与前一个序列相同的线程上运行,而序列会在flaMap函数结束后继续。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接