响应式编程:Spring WebFlux:如何构建微服务调用链?

3

Spring Boot 应用程序:

一个 @RestController 接收以下有效负载:

{
  "cartoon": "The Little Mermaid",
  "characterNames": ["Ariel", "Prince Eric", "Sebastian", "Flounder"]
}

我需要按照以下方式处理它:
  1. 获取每个角色名称的唯一ID:向“cartoon-characters”微服务发出HTTP调用,该微服务通过名称返回ID
  2. 转换控制器接收到的数据: 使用从“cartoon-characters”微服务在上一步骤中接收到的适当ID替换角色名称。 { "cartoon": "小美人鱼", "characterIds": [1, 2, 3, 4] }

  3. 将转换后的数据发送到“cartoon-db”微服务的HTTP POST请求。

  4. 将来自“cartoon-db”的响应映射到控制器返回值的内部表示形式。

我的问题:

我需要使用Reactive Programming(非阻塞/异步处理)的编程范例以及Spring WebFluxMono|Flux)和Spring Reactive WebClient实现所有这些步骤 - 但我对这个技术栈没有任何经验,正在尽可能多地阅读相关文档并进行大量搜索,但仍然有很多未解答的问题,例如:

Q1. 我已经配置了反应式Web客户端,向“cartoon-characters”微服务发送请求:

      public Mono<Integer> getCartoonCharacterIdbyName(String characterName) {
    return WebClient.builder().baseUrl("http://cartoon-characters").build()
        .get()
        .uri("/character/{characterName}", characterName)
        .retrieve()
        .bodyToMono(Integer.class);
  }

您可能已经看到,我有一个卡通人物名称列表,对于每个名称,我需要调用getCartoonCharacterIdbyName(String name)方法,我不确定连续调用它是否是正确的选择,我相信正确的选择是并行执行。

编写了以下方法:

  public List<Integer> getCartoonCharacterIds(List<String> names) {
Flux<Integer> flux = Flux.fromStream(names.stream())
    .flatMap(this::getCartoonCharacterIdbyName);

return StreamSupport.stream(flux.toIterable().spliterator(), false)
    .collect(Collectors.toList());

但我有疑虑,这段代码是否可以并行执行WebClient,而且代码调用了flux.toIterable()阻塞了线程,因此我失去了非阻塞机制。

我的假设正确吗?

我需要如何重写才能实现并行和非阻塞?

Q2. 技术上是否可能以反应式样式转换控制器接收到的输入数据(我的意思是将名称替换为ID):当我们使用Flux<Integer> characterIds进行操作时,而不是使用List<Integer> characterIds?

Q3.步骤2之后,是否可能获得不仅是转换后的Data对象,还可以在步骤3中由另一个WebClient消耗的Mono<>?


你应该使用collectList操作符而不是toIterable,它是非阻塞的,并给你一个Mono<List>。 - Martin Tarjányi
1个回答

6
实际上这是个好问题,因为要理解WebFlux或者Project Reactor框架,当需要链接微服务时需要几个步骤。
第一步是意识到WebClient应该接受一个发布者并返回一个发布者。将其推广到4个不同的方法签名来帮助思考。
- Mono -> Mono - Flux -> Flux - Mono -> Flux - Flux -> Mono
在所有情况下,无疑都是Publisher->Publisher, 但在你更好地理解之前,暂且只考虑前两种。你只需使用".map(...)"来处理流中的对象即可,但你需要学习如何处理后两个。正如上面所评论的,从Flux -> Mono可以使用".collectList()"或".reduce(...)"完成。而从Mono -> Flux似乎通常使用".flatMapMany"或".flatMapIterable"或其某种变体。可能还有其他技巧。在任何WebFlux代码中都不应使用".block()",如果尝试这样做,通常会收到运行时错误。
在你的示例中,你想要转换成
(Mono -> Flux) -> (Flux -> Flux) -> (Flux -> Flux)
就像你说的那样,你想要
Mono -> Flux -> Flux
第二部分是了解如何链接流(chaining Flows)。你可以这样做 p3(p2(p1(object)));
这将链式调用p1 -> p2 -> p3,但我总觉得将其转化为"Service Layer"更容易理解。
o2 = p1(object); o3 = p2(o2); result = p3(o3);
这段代码更容易阅读和维护,随着一些成熟的经验,你会明白这种说法的价值。
我在您的示例中遇到的唯一问题是使用WebClient作为@RequestBody来执行Flux<String>。这不起作用。请参见WebClient bodyToFlux(String.class) for string list doesn't separate individual values。除此之外,这是一个非常直接的应用程序。当您调试它时,您会发现它在执行Flux<Integer> ids = mapNamesToIds(fn)行之前就已经到达了.subscribe(System.out::println)行。这是因为在执行之前设置了流量。需要花些时间理解,但这是项目反应器框架的重点。
@SpringBootApplication
@RestController
@RequestMapping("/demo")
public class DemoApplication implements ApplicationRunner {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    Map<Integer, CartoonCharacter> characters;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        String[] names = new String[] {"Ariel", "Prince Eric", "Sebastian", "Flounder"};
        characters = Arrays.asList( new CartoonCharacter[] {
                new CartoonCharacter(names[0].hashCode(), names[0], "Mermaid"), 
                new CartoonCharacter(names[1].hashCode(), names[1], "Human"), 
                new CartoonCharacter(names[2].hashCode(), names[2], "Crustacean"), 
                new CartoonCharacter(names[3].hashCode(), names[3], "Fish")} 
        )
        .stream().collect(Collectors.toMap(CartoonCharacter::getId, Function.identity()));
        // TODO Auto-generated method stub
        CartoonRequest cr = CartoonRequest.builder()
        .cartoon("The Little Mermaid")
        .characterNames(Arrays.asList(names))
        .build();
        thisLocalClient
            .post()
            .uri("cartoonDetails")
            .body(Mono.just(cr), CartoonRequest.class)
            .retrieve()
            .bodyToFlux(CartoonCharacter.class)
            .subscribe(System.out::println);
    }

    @Bean
    WebClient localClient() {
        return WebClient.create("http://localhost:8080/demo/");
    }

    @Autowired
    WebClient thisLocalClient;

    @PostMapping("cartoonDetails")
    Flux<CartoonCharacter> getDetails(@RequestBody Mono<CartoonRequest> cartoonRequest) {
        Flux<StringWrapper> fn = cartoonRequest.flatMapIterable(cr->cr.getCharacterNames().stream().map(StringWrapper::new).collect(Collectors.toList()));
        Flux<Integer> ids = mapNamesToIds(fn);
        Flux<CartoonCharacter> details = mapIdsToDetails(ids);
        return details;
    }
    //  Service Layer Methods
    private Flux<Integer> mapNamesToIds(Flux<StringWrapper> names) {
        return thisLocalClient
            .post()
            .uri("findIds")
            .body(names, StringWrapper.class)
            .retrieve()
            .bodyToFlux(Integer.class);
    }
    private Flux<CartoonCharacter> mapIdsToDetails(Flux<Integer> ids) {
        return thisLocalClient
            .post()
            .uri("findDetails")
            .body(ids, Integer.class)
            .retrieve()
            .bodyToFlux(CartoonCharacter.class);
    }
    // Services
    @PostMapping("findIds")
    Flux<Integer> getIds(@RequestBody Flux<StringWrapper> names) {
        return names.map(name->name.getString().hashCode());
    }
    @PostMapping("findDetails")
    Flux<CartoonCharacter> getDetails(@RequestBody Flux<Integer> ids) {
        return ids.map(characters::get);
    }
}

另外:

@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class StringWrapper {
    private String string;
}
@Data
@Builder
public class CartoonRequest {
    private String cartoon;
    private List<String> characterNames;
}
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class CartoonCharacter {
    Integer id;
    String name;
    String species;
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接