RxJava Subject的真实世界应用案例

11

有很多很棒的文章介绍了RxJava,但几乎没有一篇文章用真实的例子来解释概念。

所以我基本上理解 RxJava Subject 的概念就像一条管道,既是 observable 也是 observer

但是我不知道这个RxJava Subject在Android开发领域中的真实用途是什么。你能详细说明一下吗?

4个回答

1
主题在编程中有很多“现实世界”的应用,特别是当你逐渐将代码库从命令式转换为反应式风格时。它可以作为这两个世界之间的桥梁,使你能够用非反应式代码影响流中的内容。
但是,正如你所要求的那样,这里有一个例子。最近我正在实现用户尝试从活动返回时的自定义行为。RxJava为我提供了非常优雅的解决方案,因此我需要组合一系列与用户想要返回对应的事件流。我故意避免使用“按返回按钮”这个短语,因为在代码库中有几个地方我可以模拟返回情况,而且它总是通过onBackPressed()方法进行。
将其转化为单个流需要进行大量的重构,目前并没有预算。但我不想放弃RxJava的解决方案,因为它可以使代码更加简洁。使用BehaviorSubject给出了答案,因为我只需要在onBackPressed()方法中发出一个事件即可。

1
我正在为一个Android应用程序构建一个通用的ReportDownloadManager,其中我们需要使用Observer来消耗Observable并将文件下载并存储在本地。成功或失败下载的事件需要由管理器处理,但还需要向使用此DownloadManager的Activities/Services公开Observable。我认为这是使用Subject的一个很好的用例,既可以消耗初始Observable,又可以为客户端Observable生成事件。
import android.app.DownloadManager;
import android.content.Context;
import android.webkit.MimeTypeMap;

import java.io.File;

import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.PublishSubject;

public class ReportDownloadManager {

    private final DownloadManager platformDownloadManager;

    public ReportDownloadManager(Context context) {
        this.platformDownloadManager = (DownloadManager) context.getSystemService(Context.DOWNLOAD_SERVICE);
    }

    public Observable<Object> download(final File file, DownloadAction downloadAction) {
        final PublishSubject<Object> subject = PublishSubject.create();
        downloadAction.execute(file.getName())
                .subscribe(new Consumer<Object>() {
                    @Override
                    public void accept(Object o) throws Exception {
                        platformDownloadManager.addCompletedDownload(file.getName(), "No description", false,
                                MimeTypeMap.getSingleton().getMimeTypeFromExtension("pdf"), file.getAbsolutePath(),
                                file.length(), true);
                        subject.onNext(new Object());
                        subject.onComplete();
                    }
                }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        subject.onError(throwable);
                    }
                });
        return subject;
    }

    interface DownloadAction {
        Observable<Object> execute(String fileAbsolutePath);
    }

}

1
在我的情况下,是因为我有一个Observable在等待另一个Observable发出的项目,而这个Observable是异步的,因为它是Interval。
Scheduler scheduler = RxHelper.scheduler(vertx.getOrCreateContext());


Observable.just(callAnotherObservable)
          .subscribe(item -> System.out.println(item)

public Observable<String> callAnotherObservable(Scheduler scheduler, ){
          Subject subject = ReplaySubject.create(1);
          Observable.interval(100,TimeUnit.MILLISECONDS)
          .map(i->"item to be passed to the other observable")
          .subscribe(subject);
          return subject.observeOn(scheduler).first();//Here we wait for the first emission of the interval Observable.

}

如您所见,我们在此使用 subject.first() 等待 interval observable 的第一个发射,该 observable 在另一个线程中运行。

如果您想查看更多“hotObservables”的示例https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/connectable/HotObservable.java


0
如果您需要像我在图表中展示的这样的案例。

enter image description here


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接