Mono.defer()是什么意思?

64
我在一些Spring webflux代码中遇到了Mono.defer()。
我在文档中查找了这个方法,但不理解其中的解释:
"创建一个Mono提供程序,该程序将为每个下游订阅者提供目标Mono"
请给我一个解释和一个例子。有没有一个包含Reactor示例代码(单元测试)的地方可以供我参考。
谢谢

你查看过Javadoc了吗?Flux/Mono的大部分方法都包含有关响应式流如何工作的图表。 - Darren Forsythe
3
是的,上面引用的语句来自Javadoc。个人觉得这种语言难以理解;一些简单的代码示例会对我有所帮助。我正在克隆reactor-core项目,并将查看单元测试以确定是否有所帮助。 - James Render
例子代码、链接和评论的组合有助于我理解,在阅读RxJava示例时应该使用Mono或Flux替代Observable。 - James Render
16
反应堆代码难以阅读和预测,至少在最初的阶段是这样的。此外,这些图表是另一种需要教程才能理解的语言。 - bodrin
4个回答

145

这可能有些简化,但在概念上,Reactor的源要么是惰性的,要么是急切的。像HTTP请求这样的高级请求,期望被惰性地评估。而最简单的源,例如Mono.justFlux.fromIterable,则是急切的。

我意思是说,调用Mono.just(System.currentTimeMillis())将立即调用currentTimeMillis()方法并捕获结果。只有在订阅时,该结果才会被Mono 发出。多次订阅也不会改变值:

Mono<Long> clock = Mono.just(System.currentTimeMillis());
//time == t0

Thread.sleep(10_000);
//time == t10
clock.block(); //we use block for demonstration purposes, returns t0

Thread.sleep(7_000);
//time == t17
clock.block(); //we re-subscribe to clock, still returns t0
defer 操作符用于使源变得惰性,每当有新的订阅者时重新评估 lambda 的内容。
Mono<Long> clock = Mono.defer(() -> Mono.just(System.currentTimeMillis()));
//time == t0

Thread.sleep(10_000);
//time == t10
clock.block(); //invoked currentTimeMillis() here and returns t10

Thread.sleep(7_000);
//time == t17
clock.block(); //invoke currentTimeMillis() once again here and returns t17

1
当我理解这个答案时,我从来没有喜欢过这种解释方式,因为它有点暗示了库的选择Mono.just(System.currentTimeMillis())在调用时会立即执行。但这只是Java的工作原理,没有人会期望某种方法调用会被库删除并延迟。 - GotoFinal
4
没错,这就是Java的工作方式,没有被包裹在lambda中的方法调用无法自动变为惰性调用。我并不想让它像库在做一个“选择”。 - Simon Baslé
@SimonBaslé 我编写了 Netty HTTP 客户端(为什么我不能使用 Reactor Netty 是另一个话题),它返回 Flux。在有人订阅之前,我不希望发生任何发射,并且正在使用 defer 如下。这是正确的用法吗?publisher = UnicastProcessor.create<ByteBuf>(); chunks = Flux.defer { publisher }. 将 return chunks 返回给调用者并在 publisher 上发布。 - Abhijit Sarkar
看起来你的库本身保证了1个订阅者(否则你会在单播处理器中看到错误)。我认为用defer包装没有任何价值,unicastProcessor已经实现了Flux。还有一个你应该看看的Flux.create。 - Simon Baslé
@SimonBaslé,你能否看一下我的问题https://dev59.com/1XkPtIcB2Jgan1znrN5r吗?非常感谢! - Honza Zidek
@SimonBaslé,你能否看一下我的问题https://stackoverflow.com/q/75475170/2886891吗?非常感谢! - Honza Zidek

35

简单说 如果你第一眼看起来像 Mono.just() 但实际上不是。 当你运行 Mono.just() 时,它会立即创建一个 Observable(Mono) 并重复使用它,但当你使用 defer 时,它不会立即创建,而是每次订阅时都会创建一个新的 Observable。

一个用例来看到差异

    int a = 5;
@Override
public void run(String... args) throws Exception {

    Mono<Integer> monoJust = Mono.just(a);
    Mono<Integer> monoDefer = Mono.defer(() -> Mono.just(a));

    monoJust.subscribe(integer1 -> System.out.println(integer1));
    monoDefer.subscribe(integer1 -> System.out.println(integer1));

    a = 7;
    monoJust.subscribe(integer1 -> System.out.println(integer1));
    monoDefer.subscribe(integer1 -> System.out.println(integer1));
}

打印:

5
5
5
7

如果你看到 mono.just 创建了 Observable 并且立即执行,即使值已改变也不会更改,但是 defer 在订阅时创建 Observable,因此你将使用当前 onSubscribe 值。


1
你的解释并不完全正确。不存在“创建新 Observable”这样的东西。just 和 defer 只是创建了一个 Observable。Mono.just 对每个订阅都返回相同的值,而 Mono.defer 则评估 Supplier 函数。 - HTN
我不是在谈论Observable作为对象,而是作为逻辑。Mono是一个Observable。Mono.just创建新的Observable,这意味着创建新的Mono @htn。 - Ricard Kollcaku
@jrender 在 Mono.just() 的情况下,它不能真正地应用于状态的快照。在这个例子中,他使用了一个原始类型,所以结果不会改变,但是如果你使用一个对象(比如列表)并且在两个订阅之间对它进行了修改(添加一个新值),你将会得到2个不同的结果,但它仍然是同一个对象。 - HTN
@RicardKollcaku 不用担心,我非常了解defer运算符的工作原理。 对于Spring Reactor来说,“Observable”这个术语不存在,因此对我而言,“Observable = Mono | Flux”(可观察对象=单一元素流 | 多元素流)。 - HTN
@RichardKollcaku,学习project-reactor时RxJava的多少内容是适用的? - James Render
显示剩余6条评论

6
我尝试使用defer解决不同的用例。我编写了下面的代码进行检查,并分享出来,希望能帮助其他人。我的用例是链接两个Mono并确保第一个完成后才开始处理第二个。第二个包含一个阻塞调用,其结果用于响应一个Mono,要么是empty,要么是error响应。如果没有defer,我的阻塞调用会在第一个完成的情况下执行。但是,在使用defer时,只有第一个Mono完成后,阻塞调用才会执行。如下所示:
public static void main(String[] args) {
    long cur = System.currentTimeMillis();
    boolean succeed = true;

    Mono<Integer> monoJust = Mono.create(consumer -> {
        System.out.println("MonoJust inside " + (System.currentTimeMillis() - cur));
        if (succeed) {
            consumer.success(1);
        } else {
            consumer.error(new RuntimeException("aaa"));
        }
    });

    Mono<String> monoJustStr = Mono.create(consumer -> {
        System.out.println("MonoJustStr inside: " + (System.currentTimeMillis() - cur));
        consumer.success("one");
    });

    System.out.println("##1##: Begin");
    monoJust.then(evaluator() ? Mono.empty() : monoJustStr).subscribe(d -> System.out.println("##1##: "+d), e-> System.err.println(e));
    System.out.println("##1##: Done: "+(System.currentTimeMillis() - cur));

    System.out.println("\n\n\n##2##: Begin");
    monoJust.then(Mono.defer(() -> evaluator() ? Mono.empty() : monoJustStr)).subscribe(d -> System.out.println("##2##: "+d), e-> System.err.println(e));
    System.out.println("##2##: Done: " + (System.currentTimeMillis() - cur));

}

private static boolean evaluator() {
    System.out.println("Inside Evaluator");
    return false;
}

succeed=true的输出 - 观察"Inside Evaluator"和"MonoJust inside"的顺序

##1##: Begin
Inside Evaluator
MonoJust inside 540
MonoJustStr inside: 542
##1##: one
##1##: Done: 542



##2##: Begin
MonoJust inside 544
Inside Evaluator
MonoJustStr inside: 544
##2##: one
##2##: Done: 544

以下是 succeed = false 的输出结果 - 注意评估器未被调用。
##1##: Begin
Inside Evaluator
MonoJust inside 565
java.lang.RuntimeException: aaa
##1##: Done: 567



##2##: Begin
MonoJust inside 569
java.lang.RuntimeException: aaa
##2##: Done: 569

2

初学者的简单答案:

当在monoJust变量上调用subscribe时,它将打印一个随机整数三次。但是在monoDefer变量上调用subscribe时,它可以每次打印一个随机数。

   Mono<Integer> justMono = Mono.just((new Random()).nextInt(10));

    //this will print same random number thrice
    for(int i=0;i<3;i++)
        justMono.subscribe(x -> {System.out.println("Just Mono: " + x);});

    Mono<Integer> deferMono = Mono.defer(() -> Mono.just((new Random()).nextInt(10)));

    //this might print three different random numbers
    for(int i=0;i<3;i++)
        deferMono.subscribe(x -> {System.out.println("Defer Mono: " + x);});

在 Mono.just() 中,实例化只会在第一次订阅时发生。而在 Mono.defer() 中,每次调用订阅时都会进行实例化。

如需参考,请查看: https://www.youtube.com/watch?v=eupNfdKMFL4&t=381s,3:15 分钟处


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接