使 RxJava 异步任务线程安全化

3
在我的应用中,我有一个服务来跟踪用户位置,然后使用 RxJava 将其发送到服务器。如果请求成功,我会收到插入的 ID,然后可以从本地数据库中删除它们。
以下是需要执行的步骤:
  1. 查询要发送的点
  2. 如果不为空,则将数据库中收集到的所有点都发送出去
  3. 如果请求成功,则从数据库中删除已发布的点
我的问题是,在上一个任务结束之前,我很快调用了该 observable,因此服务器接收到重复的点(两个请求的相同点)。我需要在单个线程上执行 Observable 以避免在上一个任务结束之前进行另一个任务的查询。我创建了一个 Looper 线程,但仍然发送重复内容,我不知道为什么。现在似乎服务器请求等待结束后才执行下一个请求,但在下一个请求中,它仍然发送相同的点!
    final StoreChangeEvent finalEvent = event;
    Observable
            .defer(() -> Observable.just(database.getAllPoints()))
            .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
                    .map(result -> deletePoint(result))
                    .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
                    .doOnCompleted(() -> emitStoreChange(finalEvent))
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribeOn(AndroidSchedulers.from(backgroundLooper)))
            .subscribe();

看起来database.getAllPoints()被过早调用了...我应该添加一个.blocking()吗?

假设我有5个点要发布到服务器(A,B,C,D)

  1. 我查询数据库并将A-B-C-D发送到服务器
  2. 我从设备接收到另一个点(E)
  3. 我查询数据库并发送(A-B-C-D-E)
  4. 我从第一个请求成功响应后,从本地数据库中删除A-B-C-D
  5. 我从第二个请求成功响应后,从本地数据库中删除A-B-C-D-E

结果:由于发送了相同的点,A-B-C-D在服务器数据库中出现了两次


1
不太清楚你的意思。您可能会订阅多次,并希望确保它们按顺序执行? - Artem Novikov
是的,只有在前一个Observable完成后才会查询数据库并发送该点...我在问题底部添加了详细信息。 - Jaythaking
1个回答

2

选项1 - Subject

您可以尝试使用Subject - 一种既充当观察者又充当可观察对象的东西。

订阅一次并在其他地方有新事件时通知它(onNext())。订阅者随后处理事件。

如果您将从不同线程调用notifyNewEvent(),则我使用了SerializedSubject,否则您可以使用BehaviourSubject

SerializedSubject<StoreChangeEvent, StoreChangeEvent> subject = new SerializedSubject(BehaviorSubject.create())

public void initialize() {
    // Since you access your incoming event from doOnCompleted,
    // need this extra flatMap function so that you can access your event
    // outside rx java chain.
    subject.flatMap(new Func1() {
        @Override
        public Observable call(StoreChangeEvent event) {
            return Observable
                    .just(database.getAllPoints())
                    .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
                            .map(result -> deletePoint(result))
                            .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
                            .doOnCompleted(() -> emitStoreChange(finalEvent)));
        }
    })
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(AndroidSchedulers.from(backgroundLooper))
            .subscribe();
}

public void notifyNewEvent(StoreChangeEvent event) {
    subject.onNext(event);
}

选项2 - 执行器

如果您不需要访问UI,为什么还要使用subjects、observeOn和subscribeOn呢?创建一个只有一个线程的执行器(所有任务都会依次执行),并将任务提交到其中,利用RxJava的有用功能。

ExecutorService executorService = Executors.newSingleThreadExecutor();

public void notifyNewEvent(final StoreChangeEvent event) {
    executorService.execute(new Runnable() {
        public void run() {
            Observable.just(database.getAllPoints())
                // Blocking, so that the thread doesn't exit
                // and blocks on subscribe() till completion.
                .toBlocking() 
                .flatMap(pointsList -> (pointsList.isEmpty()) ? Observable.empty() : amazonRetrofit.postAmazonPoints(pointsList)
                    .map(result -> deletePoint(result))
                    .doOnError(error -> emitStoreChange(new ErrorMessageEvent(error.getMessage())))
                    .doOnCompleted(() -> emitStoreChange(finalEvent)))
                .subscribe();
        }
    });
}

传入的事件并不来自.doOnComplete()。它来自我拥有的某个调度器类,当从GPS检索到位置点时会触发该事件。抱歉,由于我的代码不够清晰,可能造成了误解。 - Jaythaking
当然可以,在doOnComplete中访问它。如果没有将其保存在某个地方,您将无法访问它,因为在链中它已经丢失了(转换)。 - Artem Novikov
我必须承认,我有点迷失了…… 我尝试了两种方法,但都失败了。因此,一旦请求完成,我必须通知视图并访问UI…… 在我的架构中,我使用StoreEventChange来通知视图。 - Jaythaking
第一种选项会出现什么错误? - Artem Novikov
finalEvent 变量在任何地方都找不到...而且,我不明白它是如何工作的以及 initialize() 的目的,你有时间进行私聊吗? - Jaythaking

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接