在Spring Batch中,我们如何在不同步骤之间共享数据?

92

深入了解Spring Batch,我想知道如何在作业(Job)的不同步骤之间共享数据?

我们可以使用JobRepository来实现这一点吗?如果是,如何做到这一点?

还有其他方法可以做到/达到相同的效果吗?

12个回答

53

在一个步骤中,您可以将数据放入StepExecutionContext。 然后,通过监听器,您可以将数据从StepExecutionContext提升到JobExecutionContext

这个JobExecutionContext在所有以下的步骤中都可用。

注意:数据必须是短小精悍的。 这些上下文通过序列化保存在JobRepository中,并且长度受限制(如果我没有记错,是2500个字符)。

因此,这些上下文适合共享字符串或简单值,但不适合共享集合或大量数据。

共享大量数据不是Spring Batch的哲学。 Spring Batch是一组独立的操作,而不是巨大的业务处理单元。


6
如何分享可能很大的数据集合?我的itemProcessor生成一个列表(记录要删除的内容),我需要将该列表传递到任务流中供tasklet进行处理(实际删除记录)。谢谢。 - Micho Rizo
工作范围能在这种情况下有所帮助吗? - gstackoverflow
@MichoRizo我建议使用像redis/ecache这样的缓存,如果列表很大。我喜欢保持上下文中的对象相对较小。 - vijayakumarpsg587

51

作业存储库间接用于在步骤之间传递数据(Jean-Philippe正确地指出,最好的方法是将数据放入StepExecutionContext中,然后使用具有冗长名称的ExecutionContextPromotionListener将步骤执行上下文键提升到JobExecutionContext中。

值得注意的是,还有一个用于将JobParameter键提升到StepExecutionContext的侦听器(名字更长,为JobParameterExecutionContextCopyListener);如果您的作业步骤不完全独立,您会发现经常使用它们。

否则,您将使用更加复杂的方案来在步骤之间传递数据,例如JMS队列或(倘若真的需要的话)硬编码的文件位置。

至于在上下文中传递的数据大小,我建议您将其保持较小(但我没有任何关于此的具体信息)。


4
这在此文档和示例确认:http://docs.spring.io/spring-batch/trunk/reference/html/patterns.html#passingDataToFutureSteps - Sébastien Nussbaumer
4
该死,五年过去了,这个问题仍然有影响力。Spring Batch 厉害了 :) - WineSoaked
工作范围能在这种情况下有所帮助吗? - gstackoverflow

30
我认为你有三个选择:
  1. 使用StepContext并将其提升为JobContext,然后从每个步骤中访问它,但需要注意大小限制。
  2. 创建@JobScope bean,并向该bean添加数据,在需要的地方@Autowire它并使用它(缺点是它是内存结构,如果作业失败,数据将丢失,可能会影响可重启性)。
  3. 我们需要处理跨步骤的较大数据集(读取csv中每一行并写入DB,从DB读取,聚合并发送到API),因此我们决定在与Spring Batch元表相同的数据库中对数据进行建模,在JobContext中保留ids,需要时进行访问,并在作业成功完成后删除该临时表。

3
关于您的第二个选项。我能否通过这种方式从写入器类访问读取器类中的bean集? - Bill Goldberg
你是怎么理解从reader中设置的?我们在配置文件之外创建了bean,并在需要的地方注入它。你可以尝试并查看如何将一些东西从reader提升到作业范围,但对我来说,在reader中定义作业范围的解决方案似乎很奇怪。 - Nenad Bozic
工作范围能在这种情况下有所帮助吗? - gstackoverflow

12

以下是我为保存一个可以在各个步骤中访问的对象所做的操作:

  1. 创建了一个用于将该对象设置到作业上下文中的监听器
@Component("myJobListener")
public class MyJobListener implements JobExecutionListener {

    public void beforeJob(JobExecution jobExecution) {

        String myValue = someService.getValue();
        jobExecution.getExecutionContext().putString("MY_VALUE", myValue);
    }
}
  1. 在作业上下文中定义了监听器
<listeners>
         <listener ref="myJobListener"/>
</listeners>
  1. 使用BeforeStep注释在步骤中获取值
@BeforeStep
public void initializeValues(StepExecution stepExecution) {

String value = stepExecution.getJobExecution().getExecutionContext().getString("MY_VALUE");

}

9

您可以使用Java Bean对象

  1. 执行一步操作
  2. 将结果存储在Java对象中
  3. 下一步将使用同一个Java对象来获取步骤1存储的结果

通过这种方式,如果需要,您可以存储大量数据。


24
下一步我该如何得到第一步的物品?问题的核心在于此。 - Elbek
2
@Elbek 自动装配它。第一步中的类已经自动装配了POJO并设置了数据,第二步中的类也自动装配了相同的对象(除非你正在进行远程分区),并使用getter方法。 - IceBox13
1
你是如何在步骤1中自动装配新创建的实例的?你如何将新实例附加到Spring上下文中? - Chandru
2
在POJO中使用@Component注解,在第一步中使用@Autowired + Setters,在后续步骤中使用@Autowired + Getters。同时,在Tasklets中使用JobScope注解。 - Robert Kirsten

8
您可以将数据存储在简单对象中,例如:
AnyObject yourObject = new AnyObject();

public Job build(Step step1, Step step2) {
    return jobBuilderFactory.get("jobName")
            .incrementer(new RunIdIncrementer())
            .start(step1)
            .next(step2)
            .build();
}

public Step step1() {
    return stepBuilderFactory.get("step1Name")
            .<Some, Any> chunk(someInteger1)
            .reader(itemReader1())
            .processor(itemProcessor1())
            .writer(itemWriter1(yourObject))
            .build();
}

public Step step2() {
    return stepBuilderFactory.get("step2Name")
            .<Some, Any> chunk(someInteger2)
            .reader(itemReader2())
            .processor(itemProcessor2(yourObject))
            .writer(itemWriter2())
            .build();
}

只需在写入器或任何其他方法中添加数据到对象中,在下一步的任何阶段都可以获取该数据。


6
另一种非常简单的方法,留在这里供日后参考:
class MyTasklet implements Tasklet {
    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
        getExecutionContext.put("foo", "bar");
    }
}

并且。
class MyOtherTasklet implements Tasklet {
    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
        getExecutionContext.get("foo");
    }   
}

getExecutionContext 在这里是:

ExecutionContext getExecutionContext(ChunkContext chunkContext) {
    return chunkContext.getStepContext()
                       .getStepExecution()
                       .getJobExecution()
                       .getExecutionContext();
}     

将它放在超类中,作为接口的default方法,或者简单地将其粘贴到您的Tasklet中。


4
使用ExecutionContextPromotionListener:
public class YourItemWriter implements ItemWriter<Object> {
    private StepExecution stepExecution;
    public void write(List<? extends Object> items) throws Exception {
        // Some Business Logic

        // put your data into stepexecution context
        ExecutionContext stepContext = this.stepExecution.getExecutionContext();
        stepContext.put("someKey", someObject);
    }
    @BeforeStep
    public void saveStepExecution(Final StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }
}

现在,您需要将promotionListener添加到您的工作中。
@Bean
public Step step1() {
        return stepBuilder
        .get("step1")<Company,Company>  chunk(10)
        .reader(reader()).processor(processor()).writer(writer())
        .listener(promotionListener()).build();
}

@Bean
public ExecutionContextPromotionListener promotionListener() {
    ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
    listener.setKeys(new String[] {"someKey"});
    listener.setStrict(true);
    return listener;
}

现在,在第二步中,从作业ExecutionContext获取数据。
public class RetrievingItemWriter implements ItemWriter<Object> {
    private Object someObject;
    public void write(List<? extends Object> items) throws Exception {
        // ...
    }
    @BeforeStep
    public void retrieveInterstepData(StepExecution stepExecution) {
        JobExecution jobExecution = stepExecution.getJobExecution();
        ExecutionContext jobContext = jobExecution.getExecutionContext();
        this.someObject = jobContext.get("someKey");
    }
}

如果您正在使用tasklets,则可以使用以下方法获取或放置ExecutionContext。
List<YourObject> yourObjects = (List<YourObject>) chunkContent.getStepContext().getJobExecutionContext().get("someKey");

2
从官方文档中复制和粘贴代码很容易。为什么不提供您自己的实现?每个人都知道它是写在文档中的。 - Eddy Bayonne
2
这就是我所做的。我提供了易于理解的代码部分。那么,文档中是否有相同的内容可用?我不知道。 - Nikhil Pareek

1
我被分配了一个任务,需要逐个调用批处理作业。每个作业都依赖于另一个作业。第一个作业的结果需要执行后续作业程序。我正在寻找如何在作业执行后传递数据。我发现 "ExecutionContextPromotionListener" 很有用。
1)我已经添加了一个 "ExecutionContextPromotionListener" bean,如下所示:
@Bean
public ExecutionContextPromotionListener promotionListener()
{
    ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
    listener.setKeys( new String[] { "entityRef" } );
    return listener;
}

2) 然后我将其中一个监听器附加到我的步骤上

Step step = builder.faultTolerant()
            .skipPolicy( policy )
            .listener( writer )
            .listener( promotionListener() )
            .listener( skiplistener )
            .stream( skiplistener )
            .build();

3) 我已经在我的Writer步骤实现中添加了stepExecution作为引用,并在Beforestep中填充了它。

@BeforeStep
public void saveStepExecution( StepExecution stepExecution )
{
    this.stepExecution = stepExecution;
}   

4)在我的编写步骤的最后,我将值填充到StepExecution中作为以下键:

lStepContext.put( "entityRef", lMap );

5) 在作业执行后,我从lExecution.getExecutionContext()中检索值,并将其填充为作业响应。

6) 从作业响应对象中,我将获取值并在其余作业中填充所需的值。

上述代码用于使用ExecutionContextPromotionListener将数据从步骤提升到ExecutionContext。可以在任何步骤中完成。


1
使用Tasklet的简单解决方案。无需访问执行上下文。我使用了地图作为数据元素进行移动。(Kotlin代码。)
Tasklet
class MyTasklet : Tasklet {

    lateinit var myMap: MutableMap<String, String>

    override fun execute(contribution: StepContribution, chunkContext: ChunkContext): RepeatStatus? {
        myMap.put("key", "some value")
        return RepeatStatus.FINISHED
    }

}

批量配置

@Configuration
@EnableBatchProcessing
class BatchConfiguration {

    @Autowired
    lateinit var jobBuilderFactory: JobBuilderFactory

    @Autowired
    lateinit var stepBuilderFactory: StepBuilderFactory

    var myMap: MutableMap<String, String> = mutableMapOf()

    @Bean
    fun jobSincAdUsuario(): Job {
        return jobBuilderFactory
                .get("my-SO-job")
                .incrementer(RunIdIncrementer())
                .start(stepMyStep())    
                .next(stepMyOtherStep())        
                .build()
    }

    @Bean
    fun stepMyStep() = stepBuilderFactory.get("MyTaskletStep")        
        .tasklet(myTaskletAsBean())
        .build()

    @Bean
    fun myTaskletAsBean(): MyTasklet {
        val tasklet = MyTasklet()
        tasklet.myMap = myMap      // collection gets visible in the tasklet
        return tasklet
    }
}

然后在MyOtherStep中,您可以复制在MyStep中看到的相同习语。这个其他的Tasklet将会看到在MyStep中创建的数据。
重要提示:
- Tasklet是通过@Bean fun创建的,以便它们可以使用@Autowired完整解释)。 - 对于更健壮的实现,Tasklet应该实现InitializingBean,并在其中添加以下代码:
    override fun afterPropertiesSet() {
        Assert.notNull(myMap, "在调用Tasklet之前必须设置myMap")
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接