RxJava订阅阻塞可观察对象

10
我希望实现以下目标:
String result = myObservable.toBlocking().first();

即,它就像一个普通的函数调用。但是这从来没有发生过,因为你需要订阅它,而我不知道如何去做。如果我订阅它,结果将在另一个作用域中,代码会非常丑陋,因为我只能像普通可观察对象一样获取结果,所以将其转换为阻塞可观察对象没有任何意义。

1个回答

9

它实际上按照你想要的方式工作:

    Observable<String> myObservable = Observable.just("firstValue", "secondValue");
    String result = myObservable.toBlocking().first();
    System.out.println(result); // ---> "firstValue"

在幕后,调用BlockingObservable.first()会为您订阅:
private T blockForSingle(final Observable<? extends T> observable) {
    final AtomicReference<T> returnItem = new AtomicReference<T>();
    final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
    final CountDownLatch latch = new CountDownLatch(1);

    @SuppressWarnings("unchecked")
    Subscription subscription = ((Observable<T>)observable).subscribe(new Subscriber<T>() {
        @Override
        public void onCompleted() {
            latch.countDown();
        }

        @Override
        public void onError(final Throwable e) {
            returnException.set(e);
            latch.countDown();
        }

        @Override
        public void onNext(final T item) {
            returnItem.set(item);
        }
    });
    BlockingUtils.awaitForComplete(latch, subscription);

    if (returnException.get() != null) {
        Exceptions.propagate(returnException.get());
    }

    return returnItem.get();
}
更新: 使用BehaviourSubject加上toBlocking()没有任何意义。请注意,它既是Observable也是Observer,因此必须在某个地方调用myObservable.onNext("value")。如果通过调用toBlocking()阻塞线程,除非myObservable在其他线程中可用并调用了onNext(),否则您将被阻塞。

例如,这是使用`BehaviourSubject的正常用法:

  // observer will receive the "one", "two" and "three" events, but not "zero"
  BehaviorSubject<Object> subject = BehaviorSubject.create("default");
  subject.onNext("zero");
  subject.onNext("one");
  subject.subscribe(observer);
  subject.onNext("two");
  subject.onNext("three");

有趣的是,我忘记了这实际上是一个BehaviorSubject.. 这样做有什么区别吗?它会无限期地阻塞整个线程。 - breakline

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接