深入了解Spring Batch,我想知道如何在作业(Job)的不同步骤之间共享数据?
我们可以使用JobRepository来实现这一点吗?如果是,如何做到这一点?
还有其他方法可以做到/达到相同的效果吗?
深入了解Spring Batch,我想知道如何在作业(Job)的不同步骤之间共享数据?
我们可以使用JobRepository来实现这一点吗?如果是,如何做到这一点?
还有其他方法可以做到/达到相同的效果吗?
在一个步骤中,您可以将数据放入StepExecutionContext
。
然后,通过监听器,您可以将数据从StepExecutionContext
提升到JobExecutionContext
。
这个JobExecutionContext
在所有以下的步骤中都可用。
注意:数据必须是短小精悍的。
这些上下文通过序列化保存在JobRepository
中,并且长度受限制(如果我没有记错,是2500个字符)。
因此,这些上下文适合共享字符串或简单值,但不适合共享集合或大量数据。
共享大量数据不是Spring Batch的哲学。 Spring Batch是一组独立的操作,而不是巨大的业务处理单元。
作业存储库间接用于在步骤之间传递数据(Jean-Philippe正确地指出,最好的方法是将数据放入StepExecutionContext
中,然后使用具有冗长名称的ExecutionContextPromotionListener
将步骤执行上下文键提升到JobExecutionContext
中。
值得注意的是,还有一个用于将JobParameter
键提升到StepExecutionContext
的侦听器(名字更长,为JobParameterExecutionContextCopyListener
);如果您的作业步骤不完全独立,您会发现经常使用它们。
否则,您将使用更加复杂的方案来在步骤之间传递数据,例如JMS队列或(倘若真的需要的话)硬编码的文件位置。
至于在上下文中传递的数据大小,我建议您将其保持较小(但我没有任何关于此的具体信息)。
StepContext
并将其提升为JobContext
,然后从每个步骤中访问它,但需要注意大小限制。@JobScope
bean,并向该bean添加数据,在需要的地方@Autowire
它并使用它(缺点是它是内存结构,如果作业失败,数据将丢失,可能会影响可重启性)。JobContext
中保留ids
,需要时进行访问,并在作业成功完成后删除该临时表。以下是我为保存一个可以在各个步骤中访问的对象所做的操作:
@Component("myJobListener")
public class MyJobListener implements JobExecutionListener {
public void beforeJob(JobExecution jobExecution) {
String myValue = someService.getValue();
jobExecution.getExecutionContext().putString("MY_VALUE", myValue);
}
}
<listeners>
<listener ref="myJobListener"/>
</listeners>
@BeforeStep
public void initializeValues(StepExecution stepExecution) {
String value = stepExecution.getJobExecution().getExecutionContext().getString("MY_VALUE");
}
您可以使用Java Bean对象
通过这种方式,如果需要,您可以存储大量数据。
AnyObject yourObject = new AnyObject();
public Job build(Step step1, Step step2) {
return jobBuilderFactory.get("jobName")
.incrementer(new RunIdIncrementer())
.start(step1)
.next(step2)
.build();
}
public Step step1() {
return stepBuilderFactory.get("step1Name")
.<Some, Any> chunk(someInteger1)
.reader(itemReader1())
.processor(itemProcessor1())
.writer(itemWriter1(yourObject))
.build();
}
public Step step2() {
return stepBuilderFactory.get("step2Name")
.<Some, Any> chunk(someInteger2)
.reader(itemReader2())
.processor(itemProcessor2(yourObject))
.writer(itemWriter2())
.build();
}
只需在写入器或任何其他方法中添加数据到对象中,在下一步的任何阶段都可以获取该数据。
class MyTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
getExecutionContext.put("foo", "bar");
}
}
class MyOtherTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
getExecutionContext.get("foo");
}
}
getExecutionContext
在这里是:
ExecutionContext getExecutionContext(ChunkContext chunkContext) {
return chunkContext.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext();
}
将它放在超类中,作为接口的default
方法,或者简单地将其粘贴到您的Tasklet
中。
ExecutionContextPromotionListener
:public class YourItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(List<? extends Object> items) throws Exception {
// Some Business Logic
// put your data into stepexecution context
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(Final StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
@Bean
public Step step1() {
return stepBuilder
.get("step1")<Company,Company> chunk(10)
.reader(reader()).processor(processor()).writer(writer())
.listener(promotionListener()).build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"someKey"});
listener.setStrict(true);
return listener;
}
public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(List<? extends Object> items) throws Exception {
// ...
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}
List<YourObject> yourObjects = (List<YourObject>) chunkContent.getStepContext().getJobExecutionContext().get("someKey");
@Bean
public ExecutionContextPromotionListener promotionListener()
{
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys( new String[] { "entityRef" } );
return listener;
}
2) 然后我将其中一个监听器附加到我的步骤上
Step step = builder.faultTolerant()
.skipPolicy( policy )
.listener( writer )
.listener( promotionListener() )
.listener( skiplistener )
.stream( skiplistener )
.build();
3) 我已经在我的Writer步骤实现中添加了stepExecution作为引用,并在Beforestep中填充了它。
@BeforeStep
public void saveStepExecution( StepExecution stepExecution )
{
this.stepExecution = stepExecution;
}
4)在我的编写步骤的最后,我将值填充到StepExecution中作为以下键:
lStepContext.put( "entityRef", lMap );
5) 在作业执行后,我从lExecution.getExecutionContext()
中检索值,并将其填充为作业响应。
6) 从作业响应对象中,我将获取值并在其余作业中填充所需的值。
上述代码用于使用ExecutionContextPromotionListener将数据从步骤提升到ExecutionContext。可以在任何步骤中完成。
class MyTasklet : Tasklet {
lateinit var myMap: MutableMap<String, String>
override fun execute(contribution: StepContribution, chunkContext: ChunkContext): RepeatStatus? {
myMap.put("key", "some value")
return RepeatStatus.FINISHED
}
}
@Configuration
@EnableBatchProcessing
class BatchConfiguration {
@Autowired
lateinit var jobBuilderFactory: JobBuilderFactory
@Autowired
lateinit var stepBuilderFactory: StepBuilderFactory
var myMap: MutableMap<String, String> = mutableMapOf()
@Bean
fun jobSincAdUsuario(): Job {
return jobBuilderFactory
.get("my-SO-job")
.incrementer(RunIdIncrementer())
.start(stepMyStep())
.next(stepMyOtherStep())
.build()
}
@Bean
fun stepMyStep() = stepBuilderFactory.get("MyTaskletStep")
.tasklet(myTaskletAsBean())
.build()
@Bean
fun myTaskletAsBean(): MyTasklet {
val tasklet = MyTasklet()
tasklet.myMap = myMap // collection gets visible in the tasklet
return tasklet
}
}
MyOtherStep
中,您可以复制在MyStep
中看到的相同习语。这个其他的Tasklet将会看到在MyStep
中创建的数据。@Bean fun
创建的,以便它们可以使用@Autowired
(完整解释)。
- 对于更健壮的实现,Tasklet应该实现InitializingBean
,并在其中添加以下代码:
override fun afterPropertiesSet() { Assert.notNull(myMap, "在调用Tasklet之前必须设置myMap") }