".switchIfEmpty() 被急切地评估的目的是什么?"

6
即使我的流不为空,后备流也总是会被创建?这样做的目的是什么?这极不符合惯用法。
另一方面,onErrorResume会被惰性地评估。
请问有人能解释一下为什么switchIsEmpty要急切地评估吗?
下面是代码:
  public static void main(String[] args) {
    Mono<Integer> m = Mono.just(1);
    m.flatMap(a -> Mono.delay(Duration.ofMillis(5000)).flatMap(p -> Mono.empty()))
        .switchIfEmpty(getFallback())
        .doOnNext(a -> System.out.println(a))
        .block();
  }

  private static Mono<Integer> getFallback() {
    System.out.println("In Here");
    return Mono.just(5);
  }

输出结果为:

In Here (printed immediately)
5 (after 5s)

你可能犯了在返回switchIfEmpty要使用的流之前进行初始化的错误。请提供你遇到问题的代码。 - akarnokd
@akarnokd,我已经添加了代码。谢谢。 - Prashant Pandey
如果在 main 中只写 getFallback(); 而没有使用 switchIfEmpty 和其他结构,会发生什么?为什么? - akarnokd
该方法在主线程上执行吗?看起来我漏掉了一些基础知识。 - Prashant Pandey
3个回答

19
在这里,您需要理解的是汇编时间和订阅时间之间的区别。 汇编时间是指通过构建操作器链来创建管道的时间。此时,您的发布者尚未订阅,您需要以某种命令方式思考。 订阅时间是指通过订阅触发执行并开始数据流经过您的管道的时间。这是您需要以回调、lambda、惰性执行等方式反应式地思考的时候。
有关详细信息,请参见Simon Baslé的精彩文章
正如@akarnokd在他的答案中提到的那样,由于getFallback()方法不是定义为lambda而只是常规方法调用,因此在汇编时间以命令方式调用它。
您可以通过以下方法之一实现真正的惰性: 1、您可以使用Mono.fromCallable并将日志放在lambda内部:
public static void main(String[] args) {
    Mono<Integer> m = Mono.just(1);

    m.flatMap(a -> Mono.delay(Duration.ofMillis(5000)).flatMap(p -> Mono.empty()))
     .switchIfEmpty(getFallback())
     .doOnNext(a -> System.out.println(a))
     .block();
}

private static Mono<Integer> getFallback() {
    System.out.println("Assembly time, here we are just in the process of creating the mono but not triggering it. This is always called regardless of the emptiness of the parent Mono.");
    return Mono.fromCallable(() -> {
        System.out.println("Subscription time, this is the moment when the publisher got subscribed. It is got called only when the Mono was empty and fallback needed.");
        return 5;
    });
}

2、您可以使用 Mono.defer 延迟执行和组装内部Mono直到订阅:

public static void main(String[] args) {
    Mono<Integer> m = Mono.just(1);
    m.flatMap(a -> Mono.delay(Duration.ofMillis(5000)).flatMap(p -> Mono.empty()))
     .switchIfEmpty(Mono.defer(() -> getFallback()))
     .doOnNext(a -> System.out.println(a))
     .block();
}

private static Mono<Integer> getFallback() {
    System.out.println("Since we are using Mono.defer in the above pipeline, this message gets logged at subscription time.");
    return Mono.just(5);
}
请注意,您的原始解决方案也完全没问题。只需意识到在返回 Mono 之前执行代码的是汇编时间。

4

如果你在它周围加上括号,为什么它会在其他地方执行?这种误解经常出现,不确定源头是什么。

当你重新编写代码时,发生的事情应该变得更加明显:

Mono<Integer> m = Mono.just(1);
Mono<Integer> m2 = m.flatMap(a -> Mono.delay(Duration.ofMillis(5000))
                                      .flatMap(p -> Mono.empty()));

Mono<Integer> theFallback = getFallback(); // <------------------ still on the main thread!

m2.switchIfEmpty(theFallback)
    .doOnNext(a -> System.out.println(a))
    .block();

getFallback之所以运行是因为它的父方法正在那里执行。这与响应式编程无关,但是它是大多数编程语言的基本属性。


2
这让我强烈地想起了 java.util.Optional。例如:
String input = "not null"; // change to null

String result = Optional.ofNullable(input)
            .orElse(fallback());

System.out.println(result);

private static String fallback() {
    System.out.println("inside fallback");
    return "fallback";
}

无论input的值是null还是非null,它都会评估fallback方法。然而,与Mono不同,Optional提供了通过java.util.Function进行惰性评估的orElseGet。在我看来,使用.switchIfEmpty(Mono.defer(() -> getFallback()))会很奇怪。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接