Spring Boot Starter JTA Atomikos和Spring Boot Starter Batch

3

在单个应用程序中同时使用这两个启动器是否可能?

我想将CSV文件中的记录加载到数据库表中。Spring Batch表存储在不同的数据库中,因此我认为需要使用JTA来处理事务。

每当我将@EnableBatchProcessing添加到@Configuration类中时,它会配置一个PlatformTransactionManager,这会阻止Atomikos自动配置它。

是否有关于spring boot + batch + jta的示例可用,展示如何做到这一点?

非常感谢, 詹姆斯

2个回答

1
我刚刚阅读了这篇文章,发现了一些看似可行的方法。如你所述,@EnableBatchProcessing 会创建一个 DataSourceTransactionManager ,这会导致问题。我在 @EnableBatchProcessing 中使用了 modular=true,因此激活了 ModularBatchConfiguration 类。
我所做的是停止使用 @EnableBatchProcessing,并将整个 ModularBatchConfiguration 类复制到我的项目中。然后,我注释掉了 transactionManager() 方法,因为 Atomikos 配置会创建 JtaTransactionManager。我还不得不覆盖 jobRepository() 方法,因为它被硬编码为使用 DefaultBatchConfiguration 中创建的 DataSourceTransactionManager
我还必须显式导入 JtaAutoConfiguration 类。这样可以正确地连接所有内容(根据执行器的“beans”端点 - 感谢上帝)。但是,当您运行它时,事务管理器会抛出异常,因为某个地方设置了显式事务隔离级别。所以我还编写了一个 BeanPostProcessor 来查找事务管理器并调用 txnMgr.setAllowCustomIsolationLevels(true)
现在一切都正常了,但是在作业运行时,即使我可以在 SQLYog 中看到数据,我也无法使用 JdbcTemplate 从 batch_step_execution 表中获取当前数据。这一定与事务隔离有关,但我还没有理解它。
这是我配置类的内容,从 Spring 复制并按上述说明进行修改。PS,我的指向带有批处理表的数据库的 DataSource 被注释为 @Primary。另外,我将我的 DataSource beans 更改为 org.apache.tomcat.jdbc.pool.XADataSource 实例;我不确定是否需要这样做。
@Configuration
@Import(ScopeConfiguration.class)
public class ModularJtaBatchConfiguration implements ImportAware 
{
    @Autowired(required = false)
    private Collection<DataSource> dataSources;

    private BatchConfigurer configurer;

    @Autowired
    private ApplicationContext context;

    @Autowired(required = false)
    private Collection<BatchConfigurer> configurers;

    private AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();

    @Bean
    public JobRepository jobRepository(DataSource batchDataSource, JtaTransactionManager jtaTransactionManager) throws Exception 
    {
        JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
        factory.setDataSource(batchDataSource);
        factory.setTransactionManager(jtaTransactionManager);
        factory.afterPropertiesSet();
        return  factory.getObject();
    }

    @Bean
    public JobLauncher jobLauncher() throws Exception {
        return getConfigurer(configurers).getJobLauncher();
    }

//  @Bean
//  public PlatformTransactionManager transactionManager() throws Exception {
//      return getConfigurer(configurers).getTransactionManager();
//  }

    @Bean
    public JobExplorer jobExplorer() throws Exception {
        return getConfigurer(configurers).getJobExplorer();
    }

    @Bean
    public AutomaticJobRegistrar jobRegistrar() throws Exception {
        registrar.setJobLoader(new DefaultJobLoader(jobRegistry()));
        for (ApplicationContextFactory factory : context.getBeansOfType(ApplicationContextFactory.class).values()) {
            registrar.addApplicationContextFactory(factory);
        }
        return registrar;
    }

    @Bean
    public JobBuilderFactory jobBuilders(JobRepository jobRepository) throws Exception {
        return new JobBuilderFactory(jobRepository);
    }

    @Bean
    // hopefully this will autowire the Atomikos JTA txn manager
    public StepBuilderFactory stepBuilders(JobRepository jobRepository, JtaTransactionManager ptm) throws Exception {
        return new StepBuilderFactory(jobRepository, ptm);
    }

    @Bean
    public JobRegistry jobRegistry() throws Exception {
        return new MapJobRegistry();
    }

    @Override
    public void setImportMetadata(AnnotationMetadata importMetadata) {
        AnnotationAttributes enabled = AnnotationAttributes.fromMap(importMetadata.getAnnotationAttributes(
                EnableBatchProcessing.class.getName(), false));
        Assert.notNull(enabled,
                "@EnableBatchProcessing is not present on importing class " + importMetadata.getClassName());
    }

    protected BatchConfigurer getConfigurer(Collection<BatchConfigurer> configurers) throws Exception {
        if (this.configurer != null) {
            return this.configurer;
        }
        if (configurers == null || configurers.isEmpty()) {
            if (dataSources == null || dataSources.isEmpty()) {
                throw new UnsupportedOperationException("You are screwed");
            } else if(dataSources != null && dataSources.size() == 1) {
                DataSource dataSource = dataSources.iterator().next();
                DefaultBatchConfigurer configurer = new DefaultBatchConfigurer(dataSource);
                configurer.initialize();
                this.configurer = configurer;
                return configurer;
            } else {
                throw new IllegalStateException("To use the default BatchConfigurer the context must contain no more than" +
                                                        "one DataSource, found " + dataSources.size());
            }
        }
        if (configurers.size() > 1) {
            throw new IllegalStateException(
                    "To use a custom BatchConfigurer the context must contain precisely one, found "
                            + configurers.size());
        }
        this.configurer = configurers.iterator().next();
        return this.configurer;
    }

}

@Configuration
class ScopeConfiguration {

    private StepScope stepScope = new StepScope();

    private JobScope jobScope = new JobScope();

    @Bean
    public StepScope stepScope() {
        stepScope.setAutoProxy(false);
        return stepScope;
    }

    @Bean
    public JobScope jobScope() {
        jobScope.setAutoProxy(false);
        return jobScope;
    }

}

最终,即使这样也对我没有用。我无法在不让Atomikos JTA Txn Mgr发疯、锁定和杀死所有作业的情况下查询数据库。然后我意识到我的第二个数据源仅适用于单个作业的只读模式,因此我将所有配置还原为标准的非JTA配置,完全删除了Atomikos,并将第二个只读数据源创建为Tomcat DataSource池bean,autoCommit=true,并且只在启动特定作业时才创建它。 - Ken DeLong

1

我找到了一个解决方案,可以保留@EnableBatchProcessing,但需要实现BatchConfigurer和atomikos beans,详见我的so answer


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接