Spring Boot异步与多线程

14

我有一个Spring Boot微服务,在其中调用多个服务(假设为Service A和Service B)。我正在尝试基于某些条件异步地在多个线程上调用这两个服务,并且一旦处理完成,我希望合并来自Service A和Service B的响应。

我知道我们可以使用@Async异步运行进程,并使用ExecutorService为服务启动多个线程。

但我不确定如何将所有东西组合在一起。因此在此寻求任何建议?

              @Async
              Service A(thread1,thread2) \
MicroService /                             (Merge from Response of ServiceA and ServiceB)
             \ @Async
              Service B(thread1,thread2) /

我知道以上大部分都是理论解释,但我尝试了多个网站,大多数文章要么解释Aync或Multithreading,但不确定如何等待并在多线程中异步运行两个进程,并在这两个服务调用完成后继续执行!

欢迎任何建议或线索! TIA :)

2个回答

20

您需要使用Spring的AsyncResult类来包装您的结果,然后使用它的方法.completable()来返回CompletableFuture对象。

在合并Future对象时,请使用CompletableFuture.thenCompose()CompletableFuture.thenApply()方法来合并数据,如下所示:

CompletableFuture<Integer> result = futureData1.thenCompose(fd1Value -> 
                futureData2.thenApply(fd2Value -> 
                        merge(fd1Value, fd2Value)));

这是一个基本示例:

在 Spring Boot 主类上使用 @EnableAsync 注解进行注释。

@SpringBootApplication
@EnableAsync
public class StackOverflowApplication {

    public static void main(String[] args) {
        SpringApplication.run(StackOverflowApplication.class, args);
    }

}
创建一个示例服务,该服务将返回 CompletableFutureAservice.java
@Service
public class Aservice {

    @Async
    public CompletableFuture<Integer> getData() throws InterruptedException {
        Thread.sleep(3000); // sleep for 3 sec
        return new AsyncResult<Integer>(2).completable(); // wrap integer 2
    }
}

Bservice.java

@Service
public class Bservice {

    @Async
    public CompletableFuture<Integer> getData() throws InterruptedException {
        Thread.sleep(2000); // sleep for 2 sec
        return new AsyncResult<Integer>(1).completable(); // wrap integer 1
    }
}

创建另一个服务,用于合并其他两个服务的数据

ResultService.java

@Service
public class ResultService {

    @Autowired
    private Aservice aservice;
    @Autowired
    private Bservice bservice;

    public CompletableFuture<Integer> mergeResult() throws InterruptedException, ExecutionException {
        CompletableFuture<Integer> futureData1 = aservice.getData();
        CompletableFuture<Integer> futureData2 = bservice.getData();

        // Merge futures from Aservice and Bservice
        return futureData1.thenCompose(
            fd1Value -> futureData2.thenApply(fd2Value -> fd1Value + fd2Value));
    }
}

创建一个用于测试的示例控制器

ResultController.java

@RestController
public class ResultController {

    @Autowired
    private ResultService resultService;

    @GetMapping("/result")
    CompletableFuture<Integer> getResult() throws InterruptedException, ExecutionException {
        return resultService.mergeResult();
    }

}

为什么要使用 Thread.join?为什么不让 ResultController 直接返回另一个带有聚合的 CompletableFuture 呢? - Edwin Dalorzo
@EdwinDalorzo,根据您的评论,我已更新答案。 - Ajit Soman
@AjitSoman和Edwin,感谢您们的回复。我会尝试理解这里发生了什么,并尝试一下。 - JingJong
@AjitSoman 再次感谢您提供完整的示例。但是看起来上面只适用于异步处理,对吗?如果要打开多个线程,我可以使用ExecutorService吗? - JingJong
1
@JingJong,是的,您需要像这样为Execotor创建bean:@Bean public Executor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(3); executor.setMaxPoolSize(15); executor.setQueueCapacity(500); executor.setThreadNamePrefix("GithubLookup-"); executor.initialize(); return executor; }。请查看此链接以获取更多信息:https://spring.io/guides/gs/async-method/ - Ajit Soman

2
你可以看一下Java的CompletableFuture。CompletableFuture允许合并多个异步任务(也是CompletableFuture),并等待所有CompletableFuture的结果。我不确定它是否完全适合你的情况,但可能会有所帮助。 https://www.baeldung.com/java-completablefuture#Multiple

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接