Spring Batch 服务器故障后如何恢复执行

20
我正在使用Spring Batch解析文件,并且有以下情况: 我正在运行一个作业。此作业必须解析给定的文件。由于意外原因(比如停电),服务器失败并且我必须重新启动机器。现在,在重新启动服务器后,我希望从停电前停止的点恢复作业。这意味着如果系统已经读取了10,000行中的1,300行,现在必须从第1,301行开始读取。 如何使用Spring Batch实现这种情况? 关于配置:我使用Spring Integration轮询新文件所在的目录。当文件到达时,Spring Integration会创建Spring Batch作业。此外,Spring Batch使用FlatFileItemReader来解析文件。

1
自从这个问题被提出以来已经很长时间了。现在是否有官方支持的解决方案? - Steffen Harbich
有没有解决这个问题的办法?我尝试过以下方法,但它会创建一个具有null工作参数的新作业实例,因此无法处理输入文件。我正在使用文件轮询器,该轮询器应在文件可用时启动作业。一旦失败,重新启动应用程序后,作业应从离开的同一输入行开始。但现在却没实现。 - raj
4个回答

4

以下是在JVM崩溃后重新启动作业的完整解决方案。

  1. 将restarable属性设置为"true",使作业可以重新启动

<job id="jobName" xmlns="http://www.springframework.org/schema/batch" restartable="true">

2. 重新启动作业的代码

import java.util.Date;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.batch.core.BatchStatus;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobInstance;
import org.springframework.batch.core.explore.JobExplorer;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.launch.JobOperator;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.beans.factory.annotation.Autowired;

public class ResartJob {

    @Autowired
    private JobExplorer jobExplorer;
    @Autowired
    JobRepository jobRepository;
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired 
    JobOperator jobOperator;

    public void restart(){
        try {
            List<JobInstance> jobInstances = jobExplorer.getJobInstances("jobName",0,1);// this will get one latest job from the database
            if(CollectionUtils.isNotEmpty(jobInstances)){
               JobInstance jobInstance =  jobInstances.get(0);
               List<JobExecution> jobExecutions = jobExplorer.getJobExecutions(jobInstance);
               if(CollectionUtils.isNotEmpty(jobExecutions)){
                   for(JobExecution execution: jobExecutions){
                       // If the job status is STARTED then update the status to FAILED and restart the job using JobOperator.java
                       if(execution.getStatus().equals(BatchStatus.STARTED)){ 
                           execution.setEndTime(new Date());
                           execution.setStatus(BatchStatus.FAILED);                               
                           execution.setExitStatus(ExitStatus.FAILED);                               
                           jobRepository.update(execution);
                           jobOperator.restart(execution.getId());
                       }
                   }
               }
            }
        } catch (Exception e1) {
            e1.printStackTrace();
        }
    }
}

3.

<bean id="jobRepository" class="org.springframework.batch.core.repository.support.JobRepositoryFactoryBean" p:dataSource-ref="dataSource" p:transactionManager-ref="transactionManager" p:lobHandler-ref="oracleLobHandler"/>

<bean id="oracleLobHandler" class="org.springframework.jdbc.support.lob.DefaultLobHandler"/>


<bean id="jobExplorer" class="org.springframework.batch.core.explore.support.JobExplorerFactoryBean" p:dataSource-ref="dataSource" />

<bean id="jobRegistry" class="org.springframework.batch.core.configuration.support.MapJobRegistry" />

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
        <property name="jobRepository" ref="jobRepository" />
        <property name="taskExecutor" ref="jobLauncherTaskExecutor" /> 
</bean> <task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />

<bean id="jobOperator" class="org.springframework.batch.core.launch.support.SimpleJobOperator" p:jobLauncher-ref="jobLauncher" p:jobExplorer-re`enter code here`f="jobExplorer" p:jobRepository-ref="jobRepository" p:jobRegistry-ref="jobRegistry"/>

以上逻辑如果有任何作业步骤处于“已启动”状态,则不会重新启动作业。这些步骤执行还需要标记为“失败”,除了作业执行状态之外。此外,对于长时间运行的单步作业,重新启动变得毫无意义,因为逻辑从头开始启动步骤,忽略已处理的块。如何实现考虑到已处理的块的步骤级别重新启动/恢复?我的意思是,纯粹的恢复 - Sabir Khan
大家好,有人知道在哪里放置RestartJob代码或调用重启函数吗? - raj
@raj 你可以在轮询或调度程序中调用RestartJob。 - Sumit Sundriyal
@raj 我有类似的流程。我使用一个每分钟运行的Spring Job来处理这个问题。该Job负责启动新的Job或恢复先前停止的Job。如果您没有自定义Job,则必须查看API文档,扩展和覆盖提供的处理方法(这些方法由Spring Batch内部调用)。 - Sumit Sundriyal
1
嗨Sumit,感谢您的指导。我是通过将上面的代码片段插入但不运行jobOperator.run()来解决它,只需在使用jobLauncher.run()启动作业的CustomJobLaunchingMessageHandler中编辑jobreposotory数据库即可。这样就可以自动处理重新启动。jobOperator.run()也使用jobLauncher.run(),因此从代码中删除了那一部分。工作得很好。非常感谢,它节省了我的夜晚,并且我在这里学到了基础知识。 - raj
显示剩余6条评论

3

Spring batch 4的最新解决方案已更新。考虑了JVM启动时间,以便检测破损作业。请注意,在多个服务器启动作业的集群环境中,此方法可能无效。

@Bean
public ApplicationListener<ContextRefreshedEvent> resumeJobsListener(JobOperator jobOperator, JobRepository jobRepository,
        JobExplorer jobExplorer) {
    // restart jobs that failed due to
    return event -> {
        Date jvmStartTime = new Date(ManagementFactory.getRuntimeMXBean().getStartTime());

        // for each job
        for (String jobName : jobExplorer.getJobNames()) {
            // get latest job instance
            for (JobInstance instance : jobExplorer.getJobInstances(jobName, 0, 1)) {
                // for each of the executions
                for (JobExecution execution : jobExplorer.getJobExecutions(instance)) {
                    if (execution.getStatus().equals(BatchStatus.STARTED) && execution.getCreateTime().before(jvmStartTime)) {
                        // this job is broken and must be restarted
                        execution.setEndTime(new Date());
                        execution.setStatus(BatchStatus.FAILED);
                        execution.setExitStatus(ExitStatus.FAILED);

                        for (StepExecution se : execution.getStepExecutions()) {
                            if (se.getStatus().equals(BatchStatus.STARTED)) {
                                se.setEndTime(new Date());
                                se.setStatus(BatchStatus.FAILED);
                                se.setExitStatus(ExitStatus.FAILED);
                                jobRepository.update(se);
                            }
                        }

                        jobRepository.update(execution);
                        try {
                            jobOperator.restart(execution.getId());
                        }
                        catch (JobExecutionException e) {
                            LOG.warn("Couldn't resume job execution {}", execution, e);
                        }
                    }
                }
            }
        }
    };
}

你有关于如何在集群环境中处理这种情况的想法吗?我现在遇到了这个问题 https://dev59.com/Ya3la4cB1Zd3GeqPJTus - alexanoid
@alexanoid 不是很确定。 - Steffen Harbich
通常,批处理逻辑会被放置在自己的应用程序中,而不是与集群化的 Web 应用程序结合使用。然而,这并非总是可能的。为了确保此逻辑仅在一个集群上运行,一种简单的解决方案是在此逻辑周围放置分布式锁,以便集群中只有一个节点可以同时调用此逻辑。请查看 Hazelcast 的分布式锁功能。 - mad_fox

0

你也可以像下面这样写:

    @RequestMapping(value = "/updateStatusAndRestart/{jobId}/{stepId}", method = GET)
    public ResponseEntity<String> updateBatchStatus(@PathVariable("jobId") Long jobExecutionId ,@PathVariable("stepId")Long stepExecutionId )throws Exception {

       StepExecution stepExecution =  jobExplorer.getStepExecution(jobExecutionId,stepExecutionId);
            stepExecution.setEndTime(new Date(System.currentTimeMillis()));
            stepExecution.setStatus(BatchStatus.FAILED);
            stepExecution.setExitStatus(ExitStatus.FAILED);
        jobRepository.update(stepExecution);

       JobExecution jobExecution =  stepExecution.getJobExecution();
            jobExecution.setEndTime(new Date(System.currentTimeMillis()));
            jobExecution.setStatus(BatchStatus.FAILED);
            jobExecution.setExitStatus(ExitStatus.FAILED);
        jobRepository.update(jobExecution);
        jobOperator.restart(execution.getId());
        
        return new ResponseEntity<String>("<h1> Batch Status Updated !! </h1>", HttpStatus.OK);
    }

在这里,我使用了restApi端点来传递jobExecutionId和stepExecutionId,并将job_execution和step_execution的状态都设置为FAIL。然后使用batch操作员重新启动。


有没有任何方法可以重新启动作业/步骤,从它停止的地方继续执行? - PAA
在这种情况下,您需要根据数据库中的失败步骤找到jobExecutionIdStepExecutionId,然后可以在请求参数中不提供jobIdstepId的情况下重新启动。 - Afsar Ali

0
在你的情况下,我会创建一个步骤来记录文件中上次处理的行。然后创建第二个作业来读取这个文件,并从特定的行号开始处理。
因此,如果作业由于任何原因停止,您将能够运行新作业,以恢复处理。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接