Spring Webflux:如何为请求和响应使用不同的线程

4

我正在使用Spring Webflux,据我所知,通过使用它,用于接收请求和响应的线程应该是不同的。然而,无论我使用netty还是undertow,最终都会使用相同的线程。

我的应用程序是一个简单的MySQL DB crud应用程序。我没有使用r2dbc,而是使用与Executor和Scheduler耦合的jdbc。

如下日志所示,请求由线程[XNIO-1 I/O-6]处理,响应由同一线程给出。 因此,我假设该线程在等待db操作完成时被阻塞了。我该如何解决这个问题?

以下是日志:

2019-07-23 17:49:10.051  INFO 132 --- [           main] org.xnio                                 : XNIO version 3.3.8.Final
2019-07-23 17:49:10.059  INFO 132 --- [           main] org.xnio.nio                             : XNIO NIO Implementation Version 3.3.8.Final
2019-07-23 17:49:10.114  INFO 132 --- [           main] o.s.b.w.e.undertow.UndertowWebServer     : Undertow started on port(s) 8080 (http)
2019-07-23 17:49:10.116  INFO 132 --- [           main] c.n.webflux.demo.WebfluxFunctionalApp    : Started WebfluxFunctionalApp in 1.262 seconds (JVM running for 2.668)
2019-07-23 17:49:10.302 DEBUG 132 --- [   XNIO-1 I/O-6] o.s.w.s.adapter.HttpWebHandlerAdapter    : [4c85975] HTTP GET "/api/findall"
2019-07-23 17:49:10.322 DEBUG 132 --- [   XNIO-1 I/O-6] s.w.r.r.m.a.RequestMappingHandlerMapping : [4c85975] Mapped to public reactor.core.publisher.Mono<java.util.List<com.webflux.demo.model.TypeStatus>> com.webflux.demo.controller.MonitoringController.findAll()
2019-07-23 17:49:10.337 DEBUG 132 --- [   XNIO-1 I/O-6] o.s.w.r.r.m.a.ResponseBodyResultHandler  : Using 'application/json;charset=UTF-8' given [*/*] and supported [application/json;charset=UTF-8, application/*+json;charset=UTF-8, text/event-stream]
2019-07-23 17:49:10.338 DEBUG 132 --- [   XNIO-1 I/O-6] o.s.w.r.r.m.a.ResponseBodyResultHandler  : [4c85975] 0..1 [java.util.List<com.webflux.demo.model.TypeStatus>]
2019-07-23 17:49:10.347  INFO 132 --- [pool-1-thread-1] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2019-07-23 17:49:10.785  INFO 132 --- [pool-1-thread-1] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2019-07-23 17:49:10.838 DEBUG 132 --- [pool-1-thread-1] org.springframework.web.HttpLogging      : [4c85975] Encoding [[com.webflux.demo.model.TypeStatus@7b4509cb, com.webflux.demo.model.TypeStatus@22676ebe, (truncated)...]
2019-07-23 17:49:10.949 DEBUG 132 --- [   XNIO-1 I/O-6] o.s.w.s.adapter.HttpWebHandlerAdapter    : [4c85975] Completed 200 OK

同时我的dao是

@Repository
public class TypeStatusJdbcTemplate {
    private JdbcTemplate jdbcTemplate;

    public TypeStatusJdbcTemplate(JdbcTemplate jdbcTemplate) {
        this.jdbcTemplate = jdbcTemplate;
    }

    private final static String SQL_FIND_ALL = "select * from `monitoring`.`type_status` limit 3";


    public List<TypeStatus> findAll() {
        return jdbcTemplate.query(SQL_FIND_ALL,
                new TypeStatusMapper());
    }
}

服务是

@Service
public class MonitoringService {
    private final Scheduler scheduler;
    private TypeStatusJdbcTemplate repository;

    public MonitoringService(Scheduler scheduler, TypeStatusJdbcTemplate repository) {
        this.scheduler = scheduler;
        this.repository = repository;
    }

    public Mono<List<TypeStatus>> findAll() {
        return Mono.fromCallable(repository::findAll).subscribeOn(scheduler);
    }

}

控制器是

@RestController
@RequestMapping("/api")
public class MonitoringController {
    private final MonitoringService monitoringService;
    private static final Logger logger = LoggerFactory.getLogger(MonitoringController.class);

    public MonitoringController(MonitoringService monitoringService) {
        this.monitoringService = monitoringService;
    }

    @GetMapping(value="/findall")
    public Mono<List<TypeStatus>> findAll() {
        return monitoringService.findAll();
    }
}

主文件(显示调度程序)

@SpringBootApplication
public class WebfluxFunctionalApp {
    public static void main(String[] args){
        SpringApplication.run(WebfluxFunctionalApp.class, args);
    }


    @PostConstruct
    public void init(){
        // Setting Spring Boot SetTimeZone
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
    }


    @Bean
    public Scheduler jdbcScheduler() {
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(30));
    }
}

自动装配的“Scheduler”是什么类型? - arap
无论如何,通过使用subscribeOn,所有阻塞数据库工作都被委托给另一个线程,这就是为什么XNIO-1 I/O-6不会被阻塞的原因。除非您没有自动装配Netty调度程序 ;) - arap
我添加了展示调度程序的代码。我不太明白你在第二条评论中的意思。我期望XNIO-1 I/O-6不被阻塞。然而,我假设它被阻塞了,因为它用于处理请求和响应,而不是另一个线程接手这项工作。 - yhware
也许你对“应该”这个词的理解太严格了...试着多加载一些!还有:我从“pool-”线程中看到(Mono)日志记录! - xerx593
尝试在Mono.fromCallable(()->{})块中放置类似于System.out.println("callable thread: "+Thread.currentThread().getName());的内容。您将看到类似于callable thread: pool-1-thread-1的东西。它显示请求是由XNIO-1线程处理的,然后将控制传递给线程池中的线程,当响应准备就绪时,它将被传回到NIO-1。我认为这基本上回答了您的问题 - XNIO-1没有被阻塞,因为所有阻塞工作都在另一个线程上执行,而XNIO-1仅用于处理请求和响应。 - arap
1个回答

2

线程执行并不总是不同的线程。摘自响应式文档:

响应式调度器

获取Flux或Mono并不一定意味着它将在专用线程中运行。相反,大多数操作符继续在上一个操作符执行的线程上工作。除非指定,否则顶层操作符(源)本身在进行subscribe()调用的线程上运行。

因此,并没有要求必须是新线程。

Translated:

除非特别指定 - 在这里通过将 subscribeOn 放在链中进行指定。 - arap

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接