Spring Batch 限制并发线程数量为10。

5

我有一个使用Spring Batch框架的Spring Boot应用程序。我的目标很简单-同时运行某个作业。我希望能够同时运行15个线程,并拒绝每一个过多的线程。这是我的配置类:

import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.JobRegistry;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.support.JobRegistryBeanPostProcessor;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration
@EnableBatchProcessing
public class GeneratingReportJobConfiguration {

    @Autowired
    private GeneratingReportTask task;

    @Autowired
    private JobRepository jobRepository;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor(JobRegistry jobRegistry) {
        JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor = new JobRegistryBeanPostProcessor();
        jobRegistryBeanPostProcessor.setJobRegistry(jobRegistry);
        return jobRegistryBeanPostProcessor;
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(15);
        taskExecutor.setMaxPoolSize(15);
        taskExecutor.setQueueCapacity(0);
        return taskExecutor;
    }

    @Bean
    public JobLauncher jobLauncher(TaskExecutor taskExecutor) {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(taskExecutor);
        return jobLauncher;
    }

    @Bean
    public Job job() {
        return jobBuilderFactory.get("generatingReportJob")
                .start(step())
                .build();
    }

    @Bean
    public Step step() {
        return stepBuilderFactory.get("generatingReportTask")
                .tasklet(task)
                .build();
    }
}

无论我做什么,这个应用程序最多同时运行10个作业。我尝试使用TaskExecutor进行实验-我的第一个逻辑选择是SimpleAsyncTaskExecutor,它似乎会将所有过度请求堆积起来,然后以不希望的随机顺序运行它们。
所以后来我试图操纵那个限制,并且如你所见,在代码中我开始使用ThreadPoolTaskExecutor,它允许我将限制设置为5,并且它按预期工作-最多同时可以运行5个线程,而在这5个线程正在运行时,后续的线程都会被拒绝。但是,将限制设置为15会导致与上一个类似的行为。仍然会排队第11个请求,而第16个请求则被拒绝。这几乎是我的预期行为,但我需要能够完全控制线程执行。
作业通过@RestController使用jobLauncher.run()调用。每个过度请求都会导致浏览器加载,直到能够开始执行。似乎程序在jobLauncher.run()内部某处被冻结(当能够启动作业执行时,它就退出了-这就是异步运行的方式)。

我正在学习一些关于参数的知识,因为它很有趣 :) 当你说:“将限制设置为15会导致类似于之前的行为”时,你是指核心还是最大池大小? - rick
我认为你的配置没有问题。你应该能够同时运行超过10个作业(根据你的示例最多可达15个)。可能是你的作业消耗了大量资源,而且你有硬件限制?(例如,你的JVM无法生成超过10个线程?) - Mahmoud Ben Hassine
@rick 都可以。你可以在我的代码预览中的 taskExecutor() 方法下看到这一点。 - dominikbrandon
@MahmoudBenHassine 嗯,工作只是 Thread.sleep() 30 秒,所以我想这不可能是问题的原因。 - dominikbrandon
1
我也尝试过这样做,并且可以同时运行多达15个作业:https://gist.github.com/benas/fb729bd25c75667c8dc24c9c4d9d6b1d。在这种情况下,我不确定为什么您受到了10的限制。 - Mahmoud Ben Hassine
显示剩余2条评论
1个回答

2

你可能是对的,但我无法验证这是否仍然有效。不管怎样,感谢你的回答。 - dominikbrandon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接