Mono switchIfEmpty() 总是被调用

64

我有两种方法。
主要方法:

@PostMapping("/login")
public Mono<ResponseEntity<ApiResponseLogin>> loginUser(@RequestBody final LoginUser loginUser) {
    return socialService.verifyAccount(loginUser)
            .flatMap(socialAccountIsValid -> {
                if (socialAccountIsValid) {
                    return this.userService.getUserByEmail(loginUser.getEmail())
                            .switchIfEmpty(insertUser(loginUser))
                            .flatMap(foundUser -> updateUser(loginUser, foundUser))
                            .map(savedUser -> {
                                String jwts = jwt.createJwts(savedUser.get_id(), savedUser.getFirstName(), "user");
                                return new ResponseEntity<>(HttpStatus.OK);
                            });
                } else {
                    return Mono.just(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
                }
            });

}

这个被调用的方法(该服务调用一个外部 API):

public Mono<User> getUserByEmail(String email) {
    UriComponentsBuilder builder = UriComponentsBuilder
            .fromHttpUrl(USER_API_BASE_URI)
            .queryParam("email", email);
    return this.webClient.get()
            .uri(builder.toUriString())
            .exchange()
            .flatMap(resp -> {
                if (Integer.valueOf(404).equals(resp.statusCode().value())) {
                    return Mono.empty();
                } else {
                    return resp.bodyToMono(User.class);
                }
            });
} 
在上面的示例中,即使返回结果为Mono.empty()switchIfEmpty()总是从主方法调用。
我找不到这个简单问题的解决方案。 以下内容也无法工作:
Mono.just(null) 

由于该方法会抛出一个NullPointerException异常。

我也无法使用flatMap方法来检查foundUser是否为null。
不幸的是,如果我返回Mono.empty(),则根本不会调用flatMap方法,因此我也不能在这里添加条件。

@SimY4

   @PostMapping("/login")
    public Mono<ResponseEntity<ApiResponseLogin>> loginUser(@RequestBody final LoginUser loginUser) {
        userExists = false;
        return socialService.verifyAccount(loginUser)
                .flatMap(socialAccountIsValid -> {
                    if (socialAccountIsValid) {
                        return this.userService.getUserByEmail(loginUser.getEmail())
                                .flatMap(foundUser -> {
                                    return updateUser(loginUser, foundUser);
                                })
                                .switchIfEmpty(Mono.defer(() -> insertUser(loginUser)))
                                .map(savedUser -> {
                                    String jwts = jwt.createJwts(savedUser.get_id(), savedUser.getFirstName(), "user");
                                    return new ResponseEntity<>(HttpStatus.OK);
                                });
                    } else {
                        return Mono.just(new ResponseEntity<>(HttpStatus.UNAUTHORIZED));
                    }
                });

    }

2
我不确定我是否理解这句话正确。switchIfEmpty()总是从主方法中调用,即使返回了一个带有Mono.empty()的结果。它应该被调用,对吗? - Barath
1
你能详细阐述一下你的问题吗?即使返回了一个空的 Mono,switchIfEmpty() 方法始终是从主方法调用的。这是预期的行为。 - Prashant Pandey
1
@PrashantPandey 请查看上面的评论。 - html_programmer
问题在于,当Mono不为空时,也会调用switchIfEmpty,这是我不想要的。条件应为->如果为空则插入,否则更新。 - html_programmer
一定不可能。请启用日志并分享日志输出。 - Barath
显示剩余7条评论
2个回答

100

switchIfEmpty之所以能够生效,是因为它“按值”接受Mono。这意味着即使在您订阅Mono之前,该替代Mono的运算已经被触发。

想象一下这样的一个方法:

Mono<String> asyncAlternative() {
    return Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }));
}

如果您定义代码如下:

Mono<String> result = Mono.just("Some payload").switchIfEmpty(asyncAlternative());

在流构造期间,它将始终触发替代操作。为了解决这个问题,您可以使用Mono.defer延迟评估第二个单体。

Mono<String> result = Mono.just("Some payload")
        .switchIfEmpty(Mono.defer(() -> asyncAlternative()));

这样做只会在请求另一种方式时打印“Hi there”。

更新:

稍微解释一下我的回答。你面临的问题与Reactor无关,而是与Java语言本身以及它如何解析方法参数有关。让我们来看一下我提供的第一个例子中的代码。

Mono<String> result = Mono.just("Some payload").switchIfEmpty(asyncAlternative());

我们可以将其重写为:

Mono<String> firstMono = Mono.just("Some payload");
Mono<String> alternativeMono = asyncAlternative();
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

这两个代码片段在语义上是等价的。我们可以继续展开它们,看看问题出在哪里:

Mono<String> firstMono = Mono.just("Some payload");
CompletableFuture<String> alternativePromise = CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }); // future computation already tiggered
Mono<String> alternativeMono = Mono.fromFuture(alternativePromise);
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

正如您所看到的,未来计算已在我们开始组合Mono类型时触发。为了防止不必要的计算,我们可以将未来包装成一个延迟评估:

正如您所看到的,我们在开始组合Mono类型时就已经触发了未来计算。为了避免出现不必要的计算,我们可以将未来计算进行延迟评估:

Mono<String> result = Mono.just("Some payload")
        .switchIfEmpty(Mono.defer(() -> asyncAlternative()));

它将展开成

Mono<String> firstMono = Mono.just("Some payload");
Mono<String> alternativeMono = Mono.defer(() -> Mono.fromFuture(CompletableFuture.supplyAsync(() -> {
        System.out.println("Hi there");
        return "Alternative";
    }))); // future computation defered
Mono<String> result = firstMono.switchIfEmpty(alternativeMono);

在第二个示例中,未来被困在一个懒惰的供应商中,并且只有在被请求时才会被执行。UPD: 2022: 自从一段时间以来,项目反应器提供了另一种API,用于包装急切计算未来的结果相同-将急切计算困在懒惰的供应商中。
Mono<String> result = Mono.just("Some payload")
        .switchIfEmpty(Mono.fromCompletionStage(() -> alternativePromise()));

2
@Trace,你能给我展示一下你尝试过的内容吗?因为defer的唯一目的就是在规定时间之前不允许进行评估。 - Alex
4
无论如何,在流构建过程中它总是会触发备选方案。那么它有什么用呢?它是一个名为switchIfEmpty的方法。有些东西真的没有意义。 - html_programmer
谢谢你的解释。但是,正如你在我代码示例中看到的那样,我使用了你的延迟方法,但回调仍然始终会执行,尽管使用了 defergetUserByEmail 返回一个 Mono,insertUserupdateUser 方法也是如此。我真的不明白为什么会这样。 - html_programmer
2
你的答案是正确的。我进行了分析,结果发现 switchIfEmpty 被触发的原因是 updateUser 实际上返回了一个带有 HTTP 状态码 204 的空响应体!我有些不情愿地修改了 API,但现在它可以正常工作了。感谢你的帮助! - html_programmer
@Akshay,经验法则是:如果备选分支具有副作用,则推迟它。Mono.defer(() -> "Alternative")没有副作用,因此不需要推迟。 - Alex
显示剩余5条评论

41

尽管有一个很好的回答,但仍有人不理解为什么会出现这种行为:

反应器源(Mono.xxx 和 Flux.xxx)要么是:

  • 惰性评估:只有在订阅者订阅它时,才会评估/触发源内容;

  • 急切地评估:源内容会在订阅者订阅之前立即进行评估。

Mono.just(xxx)Flux.just(xxx)Flux.fromIterable(x,y,z)这样的表达式是急切的。

通过使用defer(),您可以强制使源进行惰性评估。这就是为什么被接受的答案有效的原因。

所以做这个:

 someMethodReturningAMono()
  .switchIfEmpty(buildError());

buildError() 方法依赖于一个急切的源来创建一个替代的 Mono,在订阅之前总是会被评估:

Mono<String> buildError(){
       return Mono.just("An error occured!"); //<-- evaluated as soon as read
}

为了防止这种情况,请执行以下操作:

 someMethodReturningAMono()
  .switchIfEmpty(Mono.defer(() -> buildError()));

阅读此答案以获取更多信息。


1
简单地解释一下,谢谢。 - Arundev

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接