RxJava的回退可观察对象

13

我正在寻找一种更好的方法,在使用RxJava时实现一个简单的Observable回退系统来处理空结果。我的想法是,如果一个本地查询返回了零项数据,那么应该执行一个备用查询(可以是网络调用或其他内容)。目前,我的代码如下:

Observable.create(new Observable.OnSubscribe<Object>() {
  @Override
  public void call(Subscriber<? super Object> subscriber) {
     List<Object> results = queryLocalDatabase();
     if (results.isEmpty()) {
       subscriber.onError(new Throwable("Empty results"));
     } else {
       // Process results...
     }
  }
}).onErrorResumeNext(/* Fallback Observable goes here */);

虽然这样做可以达到效果,但是对于一个空的结果集抛出异常并不是很合理。我注意到有一些条件操作符可用,例如isEmpty,但它似乎不能满足我的需求。例如,使用isEmpty...

localObservable = Observable.create(new Observable.OnSubscribe<Object>() {
  @Override
  public void call(Subscriber<? super Object> subscriber) {
     List<Object> results = queryLocalDatabase();
     for (Object obj : results) {
       // Process object
       subscriber.onNext(object);           
     }
     subscriber.onCompleted();
  }
});

localObservable.isEmpty()
  .switchMap(new Func1<Boolean, Observable<? extends Object>>() {
    @Override
    public Observable<? extends Object> call(Boolean isEmpty) {
      if (isEmpty) {
        // Return fallback Observable.
        return fallbackObservable;
      }

      // Return original Observable, which would mean running the local query again... Not desired.
      return localObservable;
    }
  });

这几乎把我带到了我想去的地方,但如果有非空项,localObservable 将似乎运行两次,这是无法接受的。

6个回答

13

使用switchIfEmpty操作符。

以下是使用示例:

Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        // return no item
        //subscriber.onNext(...);
        System.out.println("Generating nothing :)");
        subscriber.onCompleted();
    }
}).switchIfEmpty(Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        System.out.println("Generating item");
        subscriber.onNext("item");
        subscriber.onCompleted();
    }
})).subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError");
    }

    @Override
    public void onNext(String s) {
        System.out.println("onNext: " + s);
    }
});

使用lambda简化:

Observable.create(subscriber -> {
    // return no item
    //subscriber.onNext(...);
    System.out.println("Generating nothing :)");
    subscriber.onCompleted();
}).switchIfEmpty(Observable.create(subscriber -> {
    System.out.println("Generating item");
    subscriber.onNext("item");
    subscriber.onCompleted();
})).subscribe(
    s -> System.out.println("onNext: " + s),
    throwable -> System.out.println("onError"),
    () -> System.out.println("onCompleted")
);

1
这也是一个不错的方法,但 RxAndroid(Android 上的 RxJava 端口)奇怪地不支持此方法... 我认为使用转换器可能是更健壮/通用的解决方案。 - Alex Fu

3

我相信解决这个问题还有更优雅的方法,但在这种情况下,您可以简单地使用flatMap。由于您已经处理了从queryLocalDatabase()方法返回的List<Object>,因此您可以像这样做。

Observable.create(new Observable.OnSubscribe<List<String>>() {
    @Override
    public void call(Subscriber<? super List<String>> subscriber) {
        List<String> results = queryLocalDatabase();
        subscriber.onNext(results);
        subscriber.onCompleted();
    }

    private List<String> queryLocalDatabase() {
        return Arrays.asList();
    }

}).flatMap(new Func1<List<String>, Observable<String>>() {
    @Override
    public Observable<String> call(List<String> list) {
        if (list.isEmpty()) {
            return getFallbackObservable();
        } else {
            return Observable.from(list);
        }
    }

    private Observable<String> getFallbackObservable() {
        return Observable.from("3", "4");
    }

}).subscribe(new Observer<String>() {
    @Override
    public void onCompleted() {
        System.out.println("onCompleted");
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("onError");
    }

    @Override
    public void onNext(String s) {
        System.out.println("onNext: " + s);
    }
});

为了这段代码的目的,我已经将Object替换为String。


这是一个有趣的方法。我喜欢它比我的任何解决方案都要好。 - Alex Fu
我很高兴你喜欢它。希望能够看到其他解决该问题的方案。 - Miguel

0

经过一些研究和查看其他人的回复,我相信使用Transformer是最强大的解决方案,就像这样...

Observable.from(Collections.emptyList())
  .compose(new Observable.Transformer<List<Object>, List<Object>>() {
    @Override
    public Observable<List<Object> call(Observable<List<Object>> source) {
      boolean isEmpty = observable.isEmpty().toBlocking().first();
      if (isEmpty) {
        return backupObservable();
      } else {
        return source;
      }
    }
  });

0

你也可以将你的Observable连接起来:

Observable<Integer> o1 = Observable.empty();
Observable<Integer> o2 = Observable.empty();
Observable<Integer> o3 = Observable.just(3);
Observable<Integer> o4 = Observable.just(4);
Observable<Integer> os = Observable.concat(o1, o2, o3, o4);
Integer v = os.toBlocking().first();  // returns 3

0
我找到了一个更优雅的解决方案来解决这个问题。
Observable<String> cachedObservable =
        Observable.from(Collections.<String>emptyList()) // Swap this line with the below one to test the different cases
        //Observable.from(Arrays.asList("1", "2"))
                .cache();

cachedObservable.isEmpty().switchMap(isEmpty -> {
    if (isEmpty)
        return Observable.from("3", "4");
    else
        return cachedObservable;
}).subscribe(System.out::println);

这里的关键竞争者是cache,它会重放原始发出的值。

请记住有关cache的javadoc。

当您使用缓存观察者时,您牺牲了从源取消订阅的能力,因此请注意不要在发出无限或非常大量的项目并将使用内存的Observable上使用此Observer。


0

另一种变体。在我的情况下,我需要首先尝试一个缓存条目,如果不存在,则懒惰地尝试另一个:

public void test() {
    String cacheRowKey = "row";
    Observable<String> = cacheLookup(cacheRowKey, "newVersion").switchIfEmpty(
        Observable.defer(() -> cacheLookup(cacheRowKey, "legacyVersion").switchIfEmpty(
        Observable.defer(() -> onMiss(cacheRowKey;
}

private Observable<String> cacheLookup(String key, String version) {
    // Delegate to the real cache - will return effectively
    // Observable.just(xxx) or Observable.empty()
    ...
}

private Observable<String> onMiss(String key) {
    // do the expensive lookup to then populate the cache
    ...
}
  1. 首先尝试在缓存中查找键和“新”版本
  2. 如果缓存未命中,则再次尝试缓存以获取“旧”版本
  3. 如果“新”和“旧”版本的缓存都未命中,则执行昂贵的查找(可能是为了填充缓存)。

所有这些操作都是按需惰性完成的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接