如何在反应式流中包含if-else语句

14
我有一个Spring Webflux响应式服务,它接收DTO并将其插入到多个表中。 有时我们需要根据传入的DTO跳过插入到某些表中。
这是要求:
  1. 创建新客户端
  2. 如果DTO中存在客户推荐,则创建新客户推荐。
  3. 如果DTO中存在客户次要联系人,则创建客户次要联系人
  4. 如果DTO中存在客户电话,则创建客户电话
问题:
  1. 不确定如何在响应式流中应用if条件。
  2. 是否有更好的方法来做到这一点?
  3. 在此处,除第一个操作外,所有其他操作都可以并行运行。
public Mono<ServerResponse> createClientProfile(ServerRequest request) {
        return secContext.retrieveUser().flatMap(usr -> {
            return request.bodyToMono(ClientDto.class).flatMap(client -> {
                return toNewClient(client, usr).flatMap(clientRepository::save).flatMap(clientRes -> {
                    return toNewClientReferral(clientRes.getClientId(), client.getDiscount(), usr)
                            .flatMap(clientReferralRepository::save).flatMap(clientReferralRes -> {
                                return toNewClientSyContact(clientRes.getClientId(), client.getSecondary(), usr)
                                        .flatMap(clientSyContactRepository::save).flatMap(clientSyContactRes -> {
                                            return clientPhoneRepository
                                                    .saveAll(toNewClientPhone(clientRes.getClientId(), client.getPhones(), usr))
                                                    .collectList().flatMap(phoneRes -> {
                                                        return ServerResponse
                                                                .created(URI.create(String.format(CLIENT_URI_FORMAT,
                                                                        clientRes.getClientId())))
                                                                .contentType(APPLICATION_JSON).build();
                                                    });
                                        });
                            });
                });
            });
        });

    }

private Mono<Referral> toNewClientReferral(final long clientId, final Discount dto) {
        Referral referral = Referral.of(clientId, 
                dto.getName(), dto.getType(), dto.getAmount(), dto.getStatus());

        return Mono.just(referral);
    }

client.getDiscount() 可能为空,
client.getSecondary() 可能为空, client.getPhones() 可能为空。

我使用了3个不同的方法来分开流程。

public void createSyContact(ServerRequest request, long clientId) {
        secContext.retrieveUser().flatMap(usr -> {
            return request.bodyToMono(ClientDto.class).flatMap(client -> {
                if (client.getSecondary() != null) {
                    return toNewClientSyContact(clientId, client.getSecondary(), usr)
                            .flatMap(clientSyContactRepository::save).flatMap(clientRes -> {
                                return Mono.just(clientRes.getClientId());
                            });
                } else {
                    return Mono.empty();
                }
            });
        });
    }

    public void createReferral(ServerRequest request, long clientId) {
        secContext.retrieveUser().flatMap(usr -> {
            return request.bodyToMono(ClientDto.class).flatMap(client -> {
                if (client.getDiscount() != null) {
                    return toNewClientReferral(clientId, client.getDiscount(), usr)
                            .flatMap(clientReferralRepository::save).flatMap(clientRes -> {
                                return Mono.just(clientRes.getClientId());
                            });
                } else {
                    return Mono.empty();
                }
            });
        });
    }

    public Mono<Long> createClientWithPhones(ServerRequest request) {
        return secContext.retrieveUser().flatMap(usr -> {
            return request.bodyToMono(ClientDto.class).flatMap(client -> {
                return toNewClient(client, usr).flatMap(clientRepository::save).flatMap(clientRes -> {
                    return clientPhoneRepository
                            .saveAll(toNewClientPhone(clientRes.getClientId(), client.getPhones(), usr)).collectList()
                            .flatMap(phoneRes -> {
                                return Mono.just(clientRes.getClientId());
                            });
                });
            });
        });
    }

在这里,createClientWithPhones是必需的,因此不需要进行if检查。但是另外两种方法createReferral和createSyContact需要进行if检查。需要先执行createClientWithPhones,它将返回clientId。应该在createReferral和createSyContact中使用此clientId。

public Mono<ServerResponse> createClientProfile(ServerRequest request) {
        final List<Long> clinetIdList = new ArrayList<>();
        createClientWithPhones(request).subscribe(result -> {
            clinetIdList.add(result.longValue());
            createSyContact(request, result.longValue());
            createReferral(request, result.longValue());
        });
        return ServerResponse
                .created(URI.create(String.format(CLIENT_URI_FORMAT,
                        clinetIdList.get(0))))
                .contentType(APPLICATION_JSON).build();
        
    }

这种处理方式是否正确?


@K.Nicholas 不确定我们如何在这里进行过滤。 - user1578872
你是否正在使用响应式存储库? - K.Nicholas
是的,Spring Data R2DBC。它是响应式的端到端。公共接口UserRepository扩展了ReactiveCrudRepository<User, Long>。 - user1578872
1
createClientProfile 方法中存在竞态条件。由于它是在组装时执行的,而不是作为反应链的一部分执行的,因此可能会在 createClientWithPhones 完成之前调用 ServerResponse 方法。使用此代码很可能会出现 IndexOutOfBounds 异常。 - Michael McFadyen
正确。不确定如何改进这种方法。它可以与第一种方法一起工作,但使用破损的方法会出现问题。 - user1578872
显示剩余3条评论
3个回答

13

我不认为一般人对反应式库有很好的理解。我的意思是,通常人们像Java 8流一样尝试做函数式编程。当然,反应式库基于函数式编程,但我认为它的目的是异步处理阻塞I/O。考虑WebFlux项目的(当前)主页。

什么是反应式处理? 反应式处理是一种范式,使开发人员能够构建非阻塞、异步应用程序,可以处理背压(流量控制)。

所以,这是说我认为最好关注I/O发生的位置,而不是创建功能代码的冗长方式。如果你需要if语句,那么你需要if语句。与其试图弄清楚如何使用函数式编程来执行if语句,还不如试图找出I/O发生的位置,并以异步方式处理它。我喜欢使用Mono::zipFlux::zip这样的“技巧”。这些函数将许多I/O调用组合成一个发布者返回给客户端。因此,请考虑以下示例代码。

让我们创建一些反应式r2dbc函数:

Mono<Client> save(Client client) {
    client.id = 1L;
    System.out.println("Save client: " + client.id);
    return Mono.just(client);
}
Mono<Phone> save(Phone phone) {
    System.out.println("Save phone: " + phone.clientId);
    return Mono.just(phone);
}
Mono<Referral> save(Referral referral) {
    System.out.println("Save referral: " + referral.clientId);
    return Mono.just(referral);
}
Mono<Contact> save(Contact contact) {
    System.out.println("Save contact: " + contact.clientId);
    return Mono.just(contact);
}
我们需要一些示例类来使用:

We need some example classes to use:

class DTO {
    Client client;
    List<Phone> phones;
    Optional<Contact> contact;
    Optional<Referral> referral;
}

class Client {
    Long id;
}

class Contact {
    Long clientId;
}

class Referral {
    Long clientId;
}

class Phone {
    Long clientId;
}

我们的输入可能是Mono<DTO>,因为这就是请求应该提供的内容,所以我们的Service层需要从那里开始,并返回客户端ID的Mono<Long>

Mono<Long> doWork(Mono<DTO> monoDto) {
    return monoDto.flatMap(dto->{
        return save(dto.client).flatMap(client->{
            List<Mono<?>> publishers = new ArrayList<>();
            dto.phones.forEach(phone->{
                phone.clientId = client.id;
                publishers.add(save(phone));    
            });
            if ( dto.contact.isPresent()) {
                Contact c = dto.contact.get();
                c.clientId = client.id;
                publishers.add(save(c));
            }
            if ( dto.referral.isPresent()) {
                Referral r = dto.referral.get();
                r.clientId = client.id;
                publishers.add(save(r));
            }
            if ( publishers.size() > 0 )
                return Mono.zip(publishers, obs->client.id);
            else
                return Mono.just(client.id);
        });
    });
}

我使用以下示例代码运行了这个程序:

@Override
public void run(ApplicationArguments args) throws Exception {
    saveClient(new Client(), null, null, null).subscribe(System.out::println);
    saveClient(new Client(), new Phone(), null, null).subscribe(System.out::println);
    saveClient(new Client(), new Phone(), new Contact(), null).subscribe(System.out::println);
    saveClient(new Client(), new Phone(), new Contact(), new Referral()).subscribe(System.out::println);
}


private Mono<Long> saveClient(Client client, Phone phone, Contact contact,
        Referral referral) {
    // TODO Auto-generated method stub
    DTO dto = new DTO();
    dto.client = client;
    dto.phones = new ArrayList<>();
    if ( phone != null ) dto.phones.add(phone);     
    dto.contact = Optional.ofNullable(contact);
    dto.referral = Optional.ofNullable(referral);
    return doWork(Mono.just(dto));
}

因此,这里使用了Mono.zip技巧。保存的客户端被压平以便先处理它。然后为所有需要进行的后续保存创建了一个单体列表。这些单体均由Mono.zip函数异步执行。"合并器"函数不对结果做任何操作,只返回所需的客户端ID。Mono.zip将所有单体组合成单个单体返回给客户端。从某种意义上说,这只是将过程式代码包装在反应库中,而不是过度关注功能编程。如果业务“过程”发生变化,这很容易阅读和修改。

如果您喜欢这个起点,这是一个开始。我没有使用Repository::saveAll,因此可以改进。

确保链接所有FluxMono发布者很重要。在最终示例中,您似乎丢弃了它们。仅仅创建它们是不够的,它们都必须以某种方式返回给客户端。另外,您的代码有一个subscribe调用,这是不行的。只有客户端应该订阅。我认为您应该在那里使用map

编辑:修复了一个错误。仔细检查您的代码。

编辑II:我注意到我在类中使用了一个"Optional"作为参数。这是一种反模式。正确的方法是将类型用作参数,并在特殊getter中使用"Optional.of"包装它。


4

例如,在 flatMap 中可以使用简单的 if 语句,然后进行操作。

public Mono<String> foobar() {
    return Mono.just("foo").flatMap(value -> {
        if(value != null)
            return Mono.just("Has value");
        else
            return Mono.empty();
    }
}

foobar()
    .switchIfEmpty(Mono.just("Is empty"))
    .subscribe(output -> System.out.println(output);

我已经根据这个更新了我的答案。但我依然做错了什么。有没有更好的方法? - user1578872

1
你可以使用filterswitchIfEmpty的组合。我认为这是一种相当简洁和易读的方法。
public void write(String content) {
  Mono.just(content)
    .filter(s -> !s.isBlank())
    .switchIfEmpty(Mono.just("No content"))
    .subscribe(System.out::println);
}

filter接受一个谓词来测试提供的值。如果谓词评估为false,则它将在没有值的情况下完成,并调用switchIfEmpty


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接