Spring Integration 2与Quartz调度程序

3

我是Spring Integration的新手。

我配置了一个Spring文件入站通道适配器,例如:

<file:inbound-channel-adapter channel="channel1" directory="${location}" prevent-duplicates="true" filename-pattern="*.csv">
        <si:poller>
            <si:interval-trigger interval="1000"/>
        </si:poller>
</file:inbound-channel-adapter>

<si:service-activator input-channel="channel1" output-channel="channel2" ref="filenameGenerator" method="generate"/>

现在这个工作正常。但是需要在集群环境中部署。我想确保集群中的多个实例不会尝试读取相同的文件。那么在这样的环境中,这将起作用吗?

如果不行,我可以像这样使用Quartz调度器吗:

    <file:inbound-channel-adapter channel="channel1" directory="${location}" prevent-duplicates="true" filename-pattern="*.csv">
             <si:poller task-executor="taskExecutor" fixed-rate="1000"/>
    </file:inbound-channel-adapter>

    <si:service-activator input-channel="channel1" output-channel="channel2" ref="filenameGenerator" method="generate"/>

    <bean id="taskExecutor" class="org.springframework.scheduling.quartz.SimpleThreadPoolTaskExecutor">
        <property name="threadCount" value="20"/>
        <property name="threadNamePrefix" value="consumer"/>
    </bean>

这个方法能解决我的问题吗?还是我必须使用事务(Transaction)?

希望我的问题表述清楚了。

谢谢, Adi


重申问题:问题在于集群环境中的“inbound-channel-adapter”文件。例如,当文件放置在文件夹中时,应该被拾取、处理,最后重命名。在集群中,当一个实例拾取了一个特定的文件并仍在处理它时,另一个节点的文件适配器也会拾取并尝试处理。第二个适配器失败,并出现文件找不到异常,因为第一个适配器在此期间进行了处理和重命名。那么我该怎么做才能避免这种情况发生? - adi
2个回答

2
当多个进程从同一目录中读取时,锁定文件以防止它们同时被选中是很有必要的。您可以使用FileLocker来实现这一点。
请查看关于文件锁定器的文档here。似乎您可以像下面这样做:
<file:inbound-channel-adapter ... >
  <file:nio-locker/>
</file:inbound-channel-adapter>

当多个进程从同一目录中读取时,为防止它们同时被获取,锁定文件可能是有用的。要实现这一点,您可以使用FileLocker。

我现在遇到了这个错误:java.io.IOException: 由于另一个进程锁定了文件的某个部分,因此无法访问该文件... 我有另一个通道可以重命名该文件... - adi

1
为了确保在集群中只执行一次石英计划作业,请配置持久化的、集群化的石英作业计划。以下是Quartz 1.6.6的示例配置:
  <bean id="scheduler" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
    <!--        Set whether any jobs defined on this SchedulerFactoryBean should
            overwrite existing job definitions.
      --> 
    <property name="overwriteExistingJobs" value="true" /> 
  <property name="dataSource" ref="myTransactionalDataSource" /> 

<!-- nonTransactionalDataSource is only necessary with clustered Quartz with an XA DataSource.  
  --> 
  <property name="nonTransactionalDataSource" ref="myNonTransactionalDataSource" /> 

 <property name="quartzProperties">
  <props>
  <prop key="org.quartz.jobStore.selectWithLockSQL">SELECT * FROM {0}LOCKS WITH(UPDLOCK,HOLDLOCK) WHERE LOCK_NAME = ?</prop> 
  <!-- 
    Run in cluster.  Quartz ensures persisted jobs are executed once within the 
                      cluster
  --> 
  <prop key="org.quartz.jobStore.isClustered">true</prop> 

 <!--   Each node in the cluster must have a unique instance id.  
  --> 
  <prop key="org.quartz.scheduler.instanceId">AUTO</prop> 
 <!--   Default clusterCheckinInterval is 15000 
  --> 
  <!--  <prop key="org.quartz.jobStore.clusterCheckinInterval">20000</prop> 
  --> 
 </props>
  </property>
  <property name="transactionManager" ref="transactionManager" /> 
- <!-- 
        In Quartz 1.6.6, Quartz's ThreadPool interface is used when firing job triggers, 
        in org.quartz.core.QuartzSchedulerThread. 
        Quartz 1.x still starts some unmanaged threads, notably org.quartz.impl.jdbcjobstore.JobStoreSupport's
        ClusterManager which is used when clustered=true. Quartz 2.0 should correct this problem.       
  --> 
  <property name="taskExecutor" ref="myTaskExecutor" /> 
  </bean>

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接