在集群环境下运行的Spring定时任务

143

我正在编写一个应用程序,其中有一个定时任务,每60秒执行一次。该应用程序已经配置为在需要时扩展到多个实例。我只想在任何节点上每60秒钟执行1个实例的任务。我找不到直接解决这个问题的方法,我很惊讶这个问题以前没有被问过多次。 我正在使用Spring 4.1.6。

    <task:scheduled-tasks>
        <task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
    </task:scheduled-tasks>

10
我认为Quartz对你来说是最好的解决方案: https://dev59.com/EGw15IYBdhLWcg3wWqj- - selalerer
有关在 kubernetes 中使用 CronJob 的任何建议? - ch271828n
@ch271828n 请看我的回答,我发现在集群环境k8s(无论是云端还是非云端)中,FencedLock比依赖于数据库更合适。 - рüффп
11个回答

133

有一个名为 ShedLock 的项目,可以完美地实现此目的。您只需对应该在执行时被锁定的任务进行注释即可。

@Scheduled( ... )
@SchedulerLock(name = "scheduledTaskName")
public void scheduledTask() {
   // do something
}

配置Spring和LockProvider

@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "10m")
class MySpringConfiguration {
    ...
    @Bean
    public LockProvider lockProvider(DataSource dataSource) {
       return new JdbcTemplateLockProvider(dataSource);
    }
    ...
}

1
我只想说“干得好!”但是……如果这个库能够在不在代码中显式提供数据库名称的情况下发现它,那将是一个很好的功能。除此之外,它的表现非常出色! - Krzysiek
1
在我的环境中,使用Oracle和Spring Boot Data JPA Starter是可行的。 - Mahendran Ayyarsamy Kandiar
这个解决方案适用于Spring 3.1.1.RELEASE和Java 6吗?请告诉我。 - Vikas Sharma
1
我尝试了MsSQL和Spring Boot JPA,并使用Liquibase脚本处理了SQL部分...效果很好...谢谢。 - sheetal
确实运行良好。但是我在这里遇到了一个稍微复杂的情况,你能否请看一下。谢谢!!! https://stackoverflow.com/questions/57691205/issue-about-distributed-lock-by-using-shedlock-to-have-threadpooltaskscheduler-p - Dayton Wang

27

16
这是另一种在集群中安全执行任务的简单而健壮的方法。您可以基于数据库,并仅在节点是集群中的“领导者”时执行任务。
此外,当集群中的一个节点失败或关闭时,另一个节点将成为领导者。
您所需要做的就是创建一个“领导者选举”机制,并每次检查自己是否为领导者:
@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
    if (checkIfLeader()) {
        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
        for (EmailTask emailTask : list) {
            dispatchService.sendEmail(emailTask);
        }
    }
}

请按照以下步骤进行:
1.定义包含集群中每个节点的条目的对象和表:
@Entity(name = "SYS_NODE")
public class SystemNode {

/** The id. */
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

/** The name. */
@Column(name = "TIMESTAMP")
private String timestamp;

/** The ip. */
@Column(name = "IP")
private String ip;

/** The last ping. */
@Column(name = "LAST_PING")
private Date lastPing;

/** The last ping. */
@Column(name = "CREATED_AT")
private Date createdAt = new Date();

/** The last ping. */
@Column(name = "IS_LEADER")
private Boolean isLeader = Boolean.FALSE;

public Long getId() {
    return id;
}

public void setId(final Long id) {
    this.id = id;
}

public String getTimestamp() {
    return timestamp;
}

public void setTimestamp(final String timestamp) {
    this.timestamp = timestamp;
}

public String getIp() {
    return ip;
}

public void setIp(final String ip) {
    this.ip = ip;
}

public Date getLastPing() {
    return lastPing;
}

public void setLastPing(final Date lastPing) {
    this.lastPing = lastPing;
}

public Date getCreatedAt() {
    return createdAt;
}

public void setCreatedAt(final Date createdAt) {
    this.createdAt = createdAt;
}

public Boolean getIsLeader() {
    return isLeader;
}

public void setIsLeader(final Boolean isLeader) {
    this.isLeader = isLeader;
}

@Override
public String toString() {
    return "SystemNode{" +
            "id=" + id +
            ", timestamp='" + timestamp + '\'' +
            ", ip='" + ip + '\'' +
            ", lastPing=" + lastPing +
            ", createdAt=" + createdAt +
            ", isLeader=" + isLeader +
            '}';
}

2. 创建服务,a) 在数据库中插入节点,b) 检查领导者。

@Service
@Transactional
public class SystemNodeServiceImpl implements SystemNodeService,    ApplicationListener {

/** The logger. */
private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class);

/** The constant NO_ALIVE_NODES. */
private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}";

/** The ip. */
private String ip;

/** The system service. */
private SystemService systemService;

/** The system node repository. */
private SystemNodeRepository systemNodeRepository;

@Autowired
public void setSystemService(final SystemService systemService) {
    this.systemService = systemService;
}

@Autowired
public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) {
    this.systemNodeRepository = systemNodeRepository;
}

@Override
public void pingNode() {
    final SystemNode node = systemNodeRepository.findByIp(ip);
    if (node == null) {
        createNode();
    } else {
        updateNode(node);
    }
}

@Override
public void checkLeaderShip() {
    final List<SystemNode> allList = systemNodeRepository.findAll();
    final List<SystemNode> aliveList = filterAliveNodes(allList);

    SystemNode leader = findLeader(allList);
    if (leader != null && aliveList.contains(leader)) {
        setLeaderFlag(allList, Boolean.FALSE);
        leader.setIsLeader(Boolean.TRUE);
        systemNodeRepository.save(allList);
    } else {
        final SystemNode node = findMinNode(aliveList);

        setLeaderFlag(allList, Boolean.FALSE);
        node.setIsLeader(Boolean.TRUE);
        systemNodeRepository.save(allList);
    }
}

/**
 * Returns the leaded
 * @param list
 *          the list
 * @return  the leader
 */
private SystemNode findLeader(final List<SystemNode> list) {
    for (SystemNode systemNode : list) {
        if (systemNode.getIsLeader()) {
            return systemNode;
        }
    }
    return null;
}

@Override
public boolean isLeader() {
    final SystemNode node = systemNodeRepository.findByIp(ip);
    return node != null && node.getIsLeader();
}

@Override
public void onApplicationEvent(final ApplicationEvent applicationEvent) {
    try {
        ip = InetAddress.getLocalHost().getHostAddress();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    if (applicationEvent instanceof ContextRefreshedEvent) {
        pingNode();
    }
}

/**
 * Creates the node
 */
private void createNode() {
    final SystemNode node = new SystemNode();
    node.setIp(ip);
    node.setTimestamp(String.valueOf(System.currentTimeMillis()));
    node.setCreatedAt(new Date());
    node.setLastPing(new Date());
    node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll()));
    systemNodeRepository.save(node);
}

/**
 * Updates the node
 */
private void updateNode(final SystemNode node) {
    node.setLastPing(new Date());
    systemNodeRepository.save(node);
}

/**
 * Returns the alive nodes.
 *
 * @param list
 *         the list
 * @return the alive nodes
 */
private List<SystemNode> filterAliveNodes(final List<SystemNode> list) {
    int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class);
    final List<SystemNode> finalList = new LinkedList<>();
    for (SystemNode systemNode : list) {
        if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) {
            finalList.add(systemNode);
        }
    }
    if (CollectionUtils.isEmpty(finalList)) {
        LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list));
        throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list));
    }
    return finalList;
}

/**
 * Finds the min name node.
 *
 * @param list
 *         the list
 * @return the min node
 */
private SystemNode findMinNode(final List<SystemNode> list) {
    SystemNode min = list.get(0);
    for (SystemNode systemNode : list) {
        if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) {
            min = systemNode;
        }
    }
    return min;
}

/**
 * Sets the leader flag.
 *
 * @param list
 *         the list
 * @param value
 *         the value
 */
private void setLeaderFlag(final List<SystemNode> list, final Boolean value) {
    for (SystemNode systemNode : list) {
        systemNode.setIsLeader(value);
    }
}

}

3.向数据库发送ping消息以表明您的在线状态

@Override
@Scheduled(cron = "0 0/5 * * * ?")
public void executeSystemNodePing() {
    systemNodeService.pingNode();
}

@Override
@Scheduled(cron = "0 0/10 * * * ?")
public void executeLeaderResolution() {
    systemNodeService.checkLeaderShip();
}

4. 您已经准备好了!在执行任务之前,请检查您是否是领导者:

@Override
@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
    if (checkIfLeader()) {
        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
        for (EmailTask emailTask : list) {
            dispatchService.sendEmail(emailTask);
        }
    }
}

在这种情况下,SystemService和SettingEnum是什么?看起来非常简单,只返回一个超时值。那么为什么不直接硬编码超时时间呢? - tlavarea
@mspapant,SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT是什么?我应该在这里使用什么最优值? - user525146
@tlavarea,你实现了这段代码吗?我对DateUtils.hasExpired方法有疑问。它是自定义方法还是Apache Common Utils中的方法? - user525146

10

批处理和定时作业通常在单独的服务器上运行,远离面向客户的应用程序,因此在预期在集群上运行的应用程序中包含作业不是常见要求。此外,在集群环境中,作业通常不需要担心同一作业实例的其他实例并行运行,因此隔离作业实例不是一个重要的要求。

一个简单的解决方案是在Spring配置文件中配置您的作业。例如,如果您当前的配置是:

<beans>
  <bean id="someBean" .../>

  <task:scheduled-tasks>
    <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
  </task:scheduled-tasks>
</beans>

将其更改为:

<beans>
  <beans profile="scheduled">
    <bean id="someBean" .../>

    <task:scheduled-tasks>
      <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
    </task:scheduled-tasks>
  </beans>
</beans>

接下来,启动你的应用程序在只有一个机器上激活 scheduled 配置文件(-Dspring.profiles.active=scheduled)。

如果主服务器由于某些原因不可用,只需启动另一个启用了该配置文件的服务器,一切都将正常工作。


如果您还想实现作业的自动故障转移,情况就有所不同了。然后,您需要在所有服务器上保持作业运行,并通过共享资源(例如数据库表、集群缓存、JMX变量等)检查同步。


64
这是一个可行的解决方案,但这将违反具有集群环境的背景下的思想,在这种环境下,如果一个节点挂掉,其他节点可以处理其他请求。在这个解决方案中,如果带有“scheduled”配置文件的节点出现问题,那么这个后台作业就无法运行。 - Ahmed Hashem
3
我认为我们可以使用Redis的原子性“get”和“set”操作来实现这一点。 - Thanh Nguyen Van
1
你的建议存在几个问题:
  1. 通常情况下,您希望集群的每个节点具有完全相同的配置,这样它们将是100%可互换的,并在相同负载下需要相同的资源。
  2. 当“任务”节点关闭时,您的解决方案将需要手动干预。
  3. 它仍然不能保证作业实际上已成功运行,因为“任务”节点在完成当前执行之前关闭,并且新的“任务运行程序”在第一个关闭后创建,不知道它是否已经完成。
- Moshe Bixenshpaner
1
它简单地违反了集群环境的理念,你提出的方法没有任何解决方案。即使是复制配置文件服务器以确保可用性也不行,因为这将导致额外的成本和资源的浪费。@Thanh建议的解决方案比这个更加清晰。把它看作一个MUTEX。运行脚本的任何服务器都会在像redis这样的分布式缓存中获取临时锁,然后按照传统锁定的概念进行操作。 - anuj pradhan

2
我使用数据库表进行锁定。每次只能有一个任务向表中插入数据,否则会出现DuplicateKeyException异常。插入和删除逻辑由@Scheduled注解周围的aspect处理。我正在使用Spring Boot 2.0。
@Component
@Aspect
public class SchedulerLock {

    private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class);

    @Autowired
    private JdbcTemplate jdbcTemplate;  

    @Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))")
    public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable {

        String jobSignature = joinPoint.getSignature().toString();
        try {
            jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()});

            Object proceed = joinPoint.proceed();

            jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature});
            return proceed;

        }catch (DuplicateKeyException e) {
            LOGGER.warn("Job is currently locked: "+jobSignature);
            return null;
        }
    }
}


@Component
public class EveryTenSecondJob {

    @Scheduled(cron = "0/10 * * * * *")
    public void taskExecution() {
        System.out.println("Hello World");
    }
}


CREATE TABLE scheduler_lock(
    signature varchar(255) NOT NULL,
    date datetime DEFAULT NULL,
    PRIMARY KEY(signature)
);

5
你觉得它能够完美运作吗?因为如果其中一个节点在获取锁之后崩溃,其他节点将不知道为什么有锁(在你的情况下是与表中工作对应的行条目)。 - Badman

2

dlock旨在通过使用数据库索引和约束条件仅运行一次任务。您可以简单地执行以下操作。

@Scheduled(cron = "30 30 3 * * *")
@TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES)
public void execute() {

}

请查看有关使用它的文章


4
如果使用dlock,假设我们使用数据库来维护锁。如果在获取锁之后,集群中的某个节点意外宕机,那么会发生什么情况?会陷入死锁状态吗? - Badman

2
你可以在这里使用Zookeeper来选举主实例,而主实例只会运行预定的作业。
其中一种实现方式是使用Aspect和Apache Curator。
@SpringBootApplication
@EnableScheduling
public class Application {

    private static final int PORT = 2181;

    @Bean
    public CuratorFramework curatorFramework() {
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:" + PORT, new ExponentialBackoffRetry(1000, 3));
        client.start();
        return client;
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

Aspect类

 @Aspect
@Component
public class LeaderAspect implements LeaderLatchListener{

    private static final Logger log = LoggerFactory.getLogger(LeaderAspect.class);
    private static final String ELECTION_ROOT = "/election";

    private volatile boolean isLeader = false;

    @Autowired
    public LeaderAspect(CuratorFramework client) throws Exception {
        LeaderLatch ll = new LeaderLatch(client , ELECTION_ROOT);
        ll.addListener(this);
        ll.start();
    }


    @Override
    public void isLeader() {
        isLeader = true;
        log.info("Leadership granted.");
    }

    @Override
    public void notLeader() {
        isLeader = false;
        log.info("Leadership revoked.");
    }


    @Around("@annotation(com.example.apache.curator.annotation.LeaderOnly)")
    public void onlyExecuteForLeader(ProceedingJoinPoint joinPoint) {
        if (!isLeader) {
            log.debug("I'm not leader, skip leader-only tasks.");
            return;
        }

        try {
            log.debug("I'm leader, execute leader-only tasks.");
            joinPoint.proceed();
        } catch (Throwable ex) {
            log.error(ex.getMessage());
        }
    }

}

仅限领导批注

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LeaderOnly {
}

计划任务
@Component
public class HelloWorld {

    private static final Logger log = LoggerFactory.getLogger(HelloWorld.class);


    @LeaderOnly
    @Scheduled(fixedRate = 1000L)
    public void sayHello() {
        log.info("Hello, world!");
    }
}

非常感谢您的帮助。这是一个很棒的解决方案,无需进行数据库层面的锁定。我刚将其集成到我的Zookeeper中,一切都按照预期工作了。 - Arundev

1

您可以使用可嵌入的调度程序,例如db-scheduler来实现此目的。它具有持久执行功能,并使用简单的乐观锁定机制来保证由单个节点执行。

以下是如何实现用例的示例代码:

   RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60)))
    .execute((taskInstance, executionContext) -> {
        System.out.println("Executing " + taskInstance.getTaskAndInstance());
    });

   final Scheduler scheduler = Scheduler
          .create(dataSource)
          .startTasks(recurring1)
          .build();

   scheduler.start();

1
我正在使用一种不需要设置数据库来管理节点之间锁定的不同方法。
这个组件叫做FencedLock,由Hazelcast提供。
我们正在使用它来防止另一个节点执行某些操作(不一定与计划相关),但它也可以用于在计划之间共享锁。
为了实现这一点,我们只需设置两个函数助手,可以创建不同的锁名称:
@Scheduled(cron = "${cron.expression}")
public void executeMyScheduler(){
   
   // This can also be a member of the class.
   HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();

   Lock lock = hazelcastInstance.getCPSubsystem().getLock("mySchedulerName");

   lock.lock();
   try {
      // do your schedule tasks here

   } finally {
      // don't forget to release lock whatever happens: end of task or any exceptions.
      lock.unlock();
   }
}

您还可以在一段时间后自动释放锁定:假设您的定时作业每小时运行一次,您可以设置在50分钟后自动释放,如下所示:
@Scheduled(cron = "${cron.expression}")
public void executeMyScheduler(){
   
   // This can also be a member of the class.
   HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();

   Lock lock = hazelcastInstance.getCPSubsystem().getLock("mySchedulerName");

   if ( lock.tryLock ( 50, TimeUnit.MINUTES ) ) {
      try {
         // do your schedule tasks here
      } finally {
         // don't forget to release lock whatever happens: end of task or any exceptions.
         lock.unlock();
      }
   } else {
     // warning: lock has been released by timeout!
   }
}

请注意,这个Hazelcast组件在云环境(例如k8s集群)中运行非常好,并且不需要支付额外的数据库费用。
以下是您需要配置的内容:
// We need to specify the name otherwise it can conflict with internal Hazelcast beans
@Bean("hazelcastInstance")
public HazelcastInstance hazelcastInstance() {
    Config config = new Config();
    config.setClusterName(hazelcastProperties.getGroup().getName());
    NetworkConfig networkConfig = config.getNetworkConfig();

    networkConfig.setPortAutoIncrement(false);
    networkConfig.getJoin().getKubernetesConfig().setEnabled(hazelcastProperties.isNetworkEnabled())
            .setProperty("service-dns", hazelcastProperties.getServiceDNS())
            .setProperty("service-port", hazelcastProperties.getServicePort().toString());
    config.setProperty("hazelcast.metrics.enabled", "false");

    networkConfig.getJoin().getMulticastConfig().setEnabled(false);

    return Hazelcast.newHazelcastInstance(config);
}

HazelcastProperties 是与属性映射的 ConfigurationProperties 对象。
对于本地测试,您可以在本地配置文件中使用属性来禁用网络配置:
hazelcast:
  network-enabled: false
  service-port: 5701
  group:
    name: your-hazelcast-group-name

0

我正在使用一个名为kJob-Manager的免费HTTP服务。https://kjob-manager.ciesielski-systems.de/

优点是您不需要在数据库中创建新表,也不需要任何数据库连接,因为它只是一个HTTP请求。

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import org.apache.tomcat.util.json.JSONParser;
import org.apache.tomcat.util.json.ParseException;
import org.junit.jupiter.api.Test;

public class KJobManagerTest {

    @Test
    public void example() throws IOException, ParseException {

        String data = "{\"token\":\"<API-Token>\"}";
        URL url = new URL("https://kjob-manager.ciesielski-systems.de/api/ticket/<JOB-ID>");

        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
        connection.setRequestProperty("Content-Type", "application/json");
        connection.setRequestMethod("POST");
        connection.setDoOutput(true);
        connection.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8));

        JSONParser jsonParser = new JSONParser(connection.getInputStream());
        LinkedHashMap<String, LinkedHashMap<String, Object>> result = (LinkedHashMap<String, LinkedHashMap<String, Object>>) jsonParser.parse();

        if ((boolean) result.get("ticket").get("open")) {
            System.out.println("This replica could run the cronjob!");
        } else {
            System.out.println("This replica has nothing to do!");
        }

    }

}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接