如何检查 Mono 是否为空?

58

我正在使用Spring Boot 2.0和 Kotlin开发一个应用,使用WebFlux框架。

我想在保存交易之前检查用户ID是否已存在。我卡在了一个简单的事情上,就是验证Mono是否为空。

fun createTransaction(serverRequest: ServerRequest) : Mono<ServerResponse> {
    val transaction = serverRequest.body(BodyExtractors.toMono(Transaction::class.java))

    transaction.flatMap {
        val user = userRepository.findById(it.userId)
        // If it's empty, return badRequest() 
    } 

    return transaction.flatMap { transactionRepository.save(it).then(created(URI.create("/transaction/" + it.id)).build()) }
}

我想做的事情是可行的吗?

5个回答

60

允许检查Flux/Mono是否为空的技术

使用操作符.switchIfEmpty/.defaultIfEmpty/Mono.repeatWhenEmpty

使用上述操作符,您将能够对流已完成但未发出任何元素的情况做出反应。

首先,请记住,如果没有调用onNext,那么诸如.map.flatMap.filter等操作符根本不会被调用。 这意味着在您的情况下,下一个代码

transaction.flatMap {
    val user = userRepository.findById(it.userId)
    // If it's empty, return badRequest() 
} 

return transaction.flatMap { transactionRepository.save(it).then(created(URI.create("/transaction/" + it.id)).build()) }

如果transaction为空,则根本不会被调用。

如果您的流程为空,但有处理需求,您应该考虑像以下这样使用next操作符:

transaction
   .flatMap(it -> {
      val user = userRepository.findById(it.userId)
   })
   .swithIfEmpty(Flux.defer(() -> Flux.just(badRequest())));

实际解决方案

此外,我注意到您从主要的事务中创建了两个子流程。 实际上,以下代码根本不会被执行:

transaction.flatMap {
    val user = userRepository.findById(it.userId)
    // If it's empty, return badRequest() 
}  

只有最后一个被执行并从该方法返回,这是因为您没有使用运算符 .subscribe(...) 进行订阅。

第二点,您不能多次订阅相同的请求主体(是 WebClient 响应的一种限制)。因此,您需要以以下方式共享请求主体,以完成示例:

fun createTransaction(serverRequest: ServerRequest): Mono<ServerResponse> {
    val transaction = serverRequest.body(BodyExtractors.toMono(Transaction::class.java)).cache()

    transaction
            .flatMap { userRepository.findById(it.userId) }
            .flatMap { transaction.flatMap { transactionRepository.save(it) } }
            .flatMap { ServerResponse.created(URI.create("/transaction/" + it.id)).build() }
            .switchIfEmpty(transaction.flatMap { ServerResponse.badRequest().syncBody("missed User for transaction " + it.id) })
}

或者更简单的情况是不共享事务流,而是使用Tuple

fun createTransaction(serverRequest: ServerRequest): Mono<ServerResponse> {
    val emptyUser = !User()
    val transaction = serverRequest.body<Mono<Transaction>>(BodyExtractors.toMono(Transaction::class.java))

    transaction
            .flatMap { t ->
                userRepository.findById(t.userId)
                        .map { Tuples.of(t, it) }
                        .defaultIfEmpty(Tuples.of(t, emptyUser))
            }
            .flatMap {
                if (it.t2 != emptyUser) {
                    transactionRepository.save(it.t1)
                            .flatMap { ServerResponse.created(URI.create("/transaction/" + it.id)).build() }
                } else {
                    ServerResponse.badRequest().syncBody("missed User for transaction " + it.t1.id)
                }
            }
}

第一个解决方案无法编译...你能检查一下吗?另外,没有带有字符串参数的badRequest方法。 - voliveira89
@voliveira,您能告诉我这个示例是用哪种语言编写的吗?因为我刚试图遵循您的代码约定。 - Oleh Dokuka
@voliveira89已修复。 - Oleh Dokuka
还不能编译呢!你给出的例子(跟我代码里写的一模一样)是有意义的,但 IntelliJ 报了一个错误,指向了关闭函数括号处:“需要在一个块函数中包含一个 'return' 表达式 ('{...}')”。 - voliveira89
请问您能否指出上面代码片段中的特定行? - Oleh Dokuka
显示剩余4条评论

7
您可以使用Mono提供的方法hasElement()来检查它,该方法类似于Optional的isPresent()。该方法定义如下:
Mono<Boolean> hasElement()

更多详细信息请查看:Project Reactor文档

如果您需要根据此值执行某些操作,可以进一步使用switchIfEmpty()提供备用Publisher。


3

使用 Mono 和 Optional:

return findExistingUserMono
  .map(Optional::of)
  .defaultIfEmpty(Optional.empty())
  .flatMap(optionalUser -> {
    if(optionalUser.isPresent()) {
      return Mono.error('xxxx');
    }
    return this.userService.create(optionalUser.get());
  });

通过这种方式,它将始终发出可选值,使流永远不会中断。


2

首先,我是一个关于响应式(Java)和这个论坛的新手。

我认为你不能真正检查一个mono是否为空,因为mono代表将在以后执行的代码,所以在这个代码主体中,你还不知道它是否为空。这有意义吗?

我刚刚用Java写了类似的东西,似乎可以工作(但并不完美,这可能也不是最好的方法):

    public Mono<ServerResponse> queryStore(ServerRequest request) { 

        Optional<String> postalCode = request.queryParam("postalCode");                            

        Mono<ServerResponse> badQuery = ServerResponse.badRequest().build();
        Mono<ServerResponse> notFound = ServerResponse.notFound().build();

        if (!postalCode.isPresent()) { return  badQuery; }

        Flux<Store> stores = this.repository
                .getNearByStores(postalCode.get(), 5);

        return ServerResponse.ok().contentType(APPLICATION_JSON)
                .body(stores, Store.class)
                .switchIfEmpty(notFound);
}

0
我们可以使用 switchIfEmpty 方法来实现这个功能。
在下面的例子中,我正在检查用户是否存在于电子邮件中,如果不存在,则添加它。
userRepository.findByEmail(user.getEmail())
                .switchIfEmpty(s -> {
                    user.setStatus("InActive");
                    String encodedPassword = DigestUtils.sha256Hex(user.getPassword());
                    user.setPassword(encodedPassword);
                    userRepository.save(user).subscribe();
                    s.onComplete();
                }).then(Mono.just(user));

没有带有 switchIfEmpty(Consumer<T>) 签名的方法。只有 switchIfEmpty(Mono<? extends T> alternate),请参见 https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html#switchIfEmpty-reactor.core.publisher.Mono-。 - Honza Zidek

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接