有很多很棒的文章介绍了RxJava,但几乎没有一篇文章用真实的例子来解释概念。
所以我基本上理解 RxJava Subject 的概念就像一条管道,既是 observable
也是 observer
。
但是我不知道这个RxJava Subject在Android开发领域中的真实用途是什么。你能详细说明一下吗?
onBackPressed()
方法进行。onBackPressed()
方法中发出一个事件即可。import android.app.DownloadManager;
import android.content.Context;
import android.webkit.MimeTypeMap;
import java.io.File;
import io.reactivex.Observable;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.PublishSubject;
public class ReportDownloadManager {
private final DownloadManager platformDownloadManager;
public ReportDownloadManager(Context context) {
this.platformDownloadManager = (DownloadManager) context.getSystemService(Context.DOWNLOAD_SERVICE);
}
public Observable<Object> download(final File file, DownloadAction downloadAction) {
final PublishSubject<Object> subject = PublishSubject.create();
downloadAction.execute(file.getName())
.subscribe(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
platformDownloadManager.addCompletedDownload(file.getName(), "No description", false,
MimeTypeMap.getSingleton().getMimeTypeFromExtension("pdf"), file.getAbsolutePath(),
file.length(), true);
subject.onNext(new Object());
subject.onComplete();
}
}, new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
subject.onError(throwable);
}
});
return subject;
}
interface DownloadAction {
Observable<Object> execute(String fileAbsolutePath);
}
}
Scheduler scheduler = RxHelper.scheduler(vertx.getOrCreateContext());
Observable.just(callAnotherObservable)
.subscribe(item -> System.out.println(item)
public Observable<String> callAnotherObservable(Scheduler scheduler, ){
Subject subject = ReplaySubject.create(1);
Observable.interval(100,TimeUnit.MILLISECONDS)
.map(i->"item to be passed to the other observable")
.subscribe(subject);
return subject.observeOn(scheduler).first();//Here we wait for the first emission of the interval Observable.
}
如您所见,我们在此使用 subject.first() 等待 interval observable 的第一个发射,该 observable 在另一个线程中运行。
如果您想查看更多“hotObservables”的示例https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/connectable/HotObservable.java