Spring Scheduled Task running in clustered environment
Asked Answered
B

12

144

I am writing an application that has a cron job that executes every 60 seconds. The application is configured to scale when required onto multiple instances. I only want to execute the task on 1 instance every 60 seconds (On any node). Out of the box I can not find a solution to this and I am surprised it has not been asked multiple times before. I am using Spring 4.1.6.

    <task:scheduled-tasks>
        <task:scheduled ref="beanName" method="execute" cron="0/60 * * * * *"/>
    </task:scheduled-tasks>
Baccate answered 8/7, 2015 at 9:36 Comment(3)
I think Quartz is the best solution for you: #6663682Cowden
Any suggestions on using CronJob in kubernetes?Bureaucrat
@Bureaucrat See my answer, I found the FencedLock more appropriate for cluster environment k8s (in cloud or not) than relying on a DB.Padraig
S
137

There is a ShedLock project that serves exactly this purpose. You just annotate tasks which should be locked when executed

@Scheduled( ... )
@SchedulerLock(name = "scheduledTaskName")
public void scheduledTask() {
   // do something
}

Configure Spring and a LockProvider

@Configuration
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "10m")
class MySpringConfiguration {
    ...
    @Bean
    public LockProvider lockProvider(DataSource dataSource) {
       return new JdbcTemplateLockProvider(dataSource);
    }
    ...
}
Snowber answered 25/12, 2016 at 9:59 Comment(5)
I just want to say "Good work!". But... The nice feature will be if the library could discover database name without providing it explicit in code... Except that it works excellent!Moreau
Works for me with Oracle and Spring boot data jpa starter.Asthmatic
Does this solution works for Spring 3.1.1.RELEASE and java 6 ? Please tell.Worthen
I tried with MsSQL and Spring boot JPA and I used liquibase script fro the SQL part.. works well .. ThanksAtropine
It's indeed working well. However I met a little bit complex case here could you please take a look. Thanks!!! #57691705Doom
L
27

I think you have to use Quartz Clustering with JDBC-JobStore for this purpose

Luehrmann answered 10/3, 2016 at 9:0 Comment(0)
G
16

The is another simple and robust way to safe execute a job in a cluster. You can based on database and execute the task only if the node is the "leader" in the cluster.

Also when a node is failed or shutdown in the cluster another node became the leader.

All you have is to create a "leader election" mechanism and every time to check if your are the leader:

@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
    if (checkIfLeader()) {
        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
        for (EmailTask emailTask : list) {
            dispatchService.sendEmail(emailTask);
        }
    }
}

Follow those steps:

1.Define the object and table that holds one entry per node in the cluster:

@Entity(name = "SYS_NODE")
public class SystemNode {

/** The id. */
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;

/** The name. */
@Column(name = "TIMESTAMP")
private String timestamp;

/** The ip. */
@Column(name = "IP")
private String ip;

/** The last ping. */
@Column(name = "LAST_PING")
private Date lastPing;

/** The last ping. */
@Column(name = "CREATED_AT")
private Date createdAt = new Date();

/** The last ping. */
@Column(name = "IS_LEADER")
private Boolean isLeader = Boolean.FALSE;

public Long getId() {
    return id;
}

public void setId(final Long id) {
    this.id = id;
}

public String getTimestamp() {
    return timestamp;
}

public void setTimestamp(final String timestamp) {
    this.timestamp = timestamp;
}

public String getIp() {
    return ip;
}

public void setIp(final String ip) {
    this.ip = ip;
}

public Date getLastPing() {
    return lastPing;
}

public void setLastPing(final Date lastPing) {
    this.lastPing = lastPing;
}

public Date getCreatedAt() {
    return createdAt;
}

public void setCreatedAt(final Date createdAt) {
    this.createdAt = createdAt;
}

public Boolean getIsLeader() {
    return isLeader;
}

public void setIsLeader(final Boolean isLeader) {
    this.isLeader = isLeader;
}

@Override
public String toString() {
    return "SystemNode{" +
            "id=" + id +
            ", timestamp='" + timestamp + '\'' +
            ", ip='" + ip + '\'' +
            ", lastPing=" + lastPing +
            ", createdAt=" + createdAt +
            ", isLeader=" + isLeader +
            '}';
}

}

2.Create the service that a) insert the node in database , b) check for leader

@Service
@Transactional
public class SystemNodeServiceImpl implements SystemNodeService,    ApplicationListener {

/** The logger. */
private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class);

/** The constant NO_ALIVE_NODES. */
private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}";

/** The ip. */
private String ip;

/** The system service. */
private SystemService systemService;

/** The system node repository. */
private SystemNodeRepository systemNodeRepository;

@Autowired
public void setSystemService(final SystemService systemService) {
    this.systemService = systemService;
}

@Autowired
public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) {
    this.systemNodeRepository = systemNodeRepository;
}

@Override
public void pingNode() {
    final SystemNode node = systemNodeRepository.findByIp(ip);
    if (node == null) {
        createNode();
    } else {
        updateNode(node);
    }
}

@Override
public void checkLeaderShip() {
    final List<SystemNode> allList = systemNodeRepository.findAll();
    final List<SystemNode> aliveList = filterAliveNodes(allList);

    SystemNode leader = findLeader(allList);
    if (leader != null && aliveList.contains(leader)) {
        setLeaderFlag(allList, Boolean.FALSE);
        leader.setIsLeader(Boolean.TRUE);
        systemNodeRepository.save(allList);
    } else {
        final SystemNode node = findMinNode(aliveList);

        setLeaderFlag(allList, Boolean.FALSE);
        node.setIsLeader(Boolean.TRUE);
        systemNodeRepository.save(allList);
    }
}

/**
 * Returns the leaded
 * @param list
 *          the list
 * @return  the leader
 */
private SystemNode findLeader(final List<SystemNode> list) {
    for (SystemNode systemNode : list) {
        if (systemNode.getIsLeader()) {
            return systemNode;
        }
    }
    return null;
}

@Override
public boolean isLeader() {
    final SystemNode node = systemNodeRepository.findByIp(ip);
    return node != null && node.getIsLeader();
}

@Override
public void onApplicationEvent(final ApplicationEvent applicationEvent) {
    try {
        ip = InetAddress.getLocalHost().getHostAddress();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
    if (applicationEvent instanceof ContextRefreshedEvent) {
        pingNode();
    }
}

/**
 * Creates the node
 */
private void createNode() {
    final SystemNode node = new SystemNode();
    node.setIp(ip);
    node.setTimestamp(String.valueOf(System.currentTimeMillis()));
    node.setCreatedAt(new Date());
    node.setLastPing(new Date());
    node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll()));
    systemNodeRepository.save(node);
}

/**
 * Updates the node
 */
private void updateNode(final SystemNode node) {
    node.setLastPing(new Date());
    systemNodeRepository.save(node);
}

/**
 * Returns the alive nodes.
 *
 * @param list
 *         the list
 * @return the alive nodes
 */
private List<SystemNode> filterAliveNodes(final List<SystemNode> list) {
    int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class);
    final List<SystemNode> finalList = new LinkedList<>();
    for (SystemNode systemNode : list) {
        if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) {
            finalList.add(systemNode);
        }
    }
    if (CollectionUtils.isEmpty(finalList)) {
        LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list));
        throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list));
    }
    return finalList;
}

/**
 * Finds the min name node.
 *
 * @param list
 *         the list
 * @return the min node
 */
private SystemNode findMinNode(final List<SystemNode> list) {
    SystemNode min = list.get(0);
    for (SystemNode systemNode : list) {
        if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) {
            min = systemNode;
        }
    }
    return min;
}

/**
 * Sets the leader flag.
 *
 * @param list
 *         the list
 * @param value
 *         the value
 */
private void setLeaderFlag(final List<SystemNode> list, final Boolean value) {
    for (SystemNode systemNode : list) {
        systemNode.setIsLeader(value);
    }
}

}

3.ping the database to send that your are alive

@Override
@Scheduled(cron = "0 0/5 * * * ?")
public void executeSystemNodePing() {
    systemNodeService.pingNode();
}

@Override
@Scheduled(cron = "0 0/10 * * * ?")
public void executeLeaderResolution() {
    systemNodeService.checkLeaderShip();
}

4.you are ready! Just check if you are the leader before execute the task:

@Override
@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
    if (checkIfLeader()) {
        final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
        for (EmailTask emailTask : list) {
            dispatchService.sendEmail(emailTask);
        }
    }
}
Greenman answered 5/2, 2016 at 14:12 Comment(3)
In this case what is SystemService and SettingEnum? Looks like it's extremely simple and just returns a timeout value. In that case why not just hard code the timeout?Mei
@mspapant, what is the SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT ? What is the optimal value i should be using here ?Mckenzie
@Mei did you implement this code, i have a question about the DateUtils.hasExpired method? is it custom method or is it a apache common utils ?Mckenzie
S
10

Batch and scheduled jobs are typically run on their own standalone servers, away from customer facing apps so it is not a common requirement to include a job in an application that is expected to run on a cluster. Additionally, jobs in clustered environments typically do not need to worry about other instances of the same job running in parallel so another reason why isolation of job instances is not a big requirement.

A simple solution would be to configure your jobs inside a Spring Profile. For example, if your current configuration is:

<beans>
  <bean id="someBean" .../>

  <task:scheduled-tasks>
    <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
  </task:scheduled-tasks>
</beans>

change it to:

<beans>
  <beans profile="scheduled">
    <bean id="someBean" .../>

    <task:scheduled-tasks>
      <task:scheduled ref="someBean" method="execute" cron="0/60 * * * * *"/>
    </task:scheduled-tasks>
  </beans>
</beans>

Then, launch your application on just one machine with the scheduled profile activated (-Dspring.profiles.active=scheduled).

If the primary server becomes unavailable for some reason, just launch another server with the profile enabled and things will continue to work just fine.


Things change if you want automatic failover for the jobs as well. Then, you will need to keep the job running on all servers and check synchronization through a common resource such as a database table, a clustered cache, a JMX variable, etc.

Shinny answered 8/7, 2015 at 9:59 Comment(4)
This is a valid workaround, but this will violate the idea behind having a clustered environment, where if a node is down, the other node can serve other requests. In this workaround, if the node with "scheduled" profile goes down, then this background job will not runCykana
I think we could use Redis with atomic get and set operation to archieve that.Moldy
There are several issues with your suggestion: 1. You would generally want each node of a cluster to have the exact same config, so they'll be 100% interchangeable and require the same resources under the same load they share. 2. Your solution would require manual intervention when the "task" node goes down. 3. It would still not guarantee that the job was actually ran successfully, because the "task" node went down before it finished processing the current execution and the new "task runner" has been created after the first one went down, not knowing whether it had finished or not.Algophobia
it simply violates the idea of clustered environments, there can't be any solution with the approach you suggested. You can't replicate even the profile servers to ensure availability because that will result in additional cost and unnecessary waste of resources as well. The solution suggested by @Thanh is much cleaner than this. Think of the same as a MUTEX. Any server running the script will acquire a temporary lock in some distributed cache like redis and then proceed with the concepts of traditional locking.Blastogenesis
P
2

I'm using a database table to do the locking. Only one task at a time can do a insert to the table. The other one will get a DuplicateKeyException. The insert and delete logic is handeld by an aspect around the @Scheduled annotation. I'm using Spring Boot 2.0

@Component
@Aspect
public class SchedulerLock {

    private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerLock.class);

    @Autowired
    private JdbcTemplate jdbcTemplate;  

    @Around("execution(@org.springframework.scheduling.annotation.Scheduled * *(..))")
    public Object lockTask(ProceedingJoinPoint joinPoint) throws Throwable {

        String jobSignature = joinPoint.getSignature().toString();
        try {
            jdbcTemplate.update("INSERT INTO scheduler_lock (signature, date) VALUES (?, ?)", new Object[] {jobSignature, new Date()});

            Object proceed = joinPoint.proceed();

            jdbcTemplate.update("DELETE FROM scheduler_lock WHERE lock_signature = ?", new Object[] {jobSignature});
            return proceed;

        }catch (DuplicateKeyException e) {
            LOGGER.warn("Job is currently locked: "+jobSignature);
            return null;
        }
    }
}


@Component
public class EveryTenSecondJob {

    @Scheduled(cron = "0/10 * * * * *")
    public void taskExecution() {
        System.out.println("Hello World");
    }
}


CREATE TABLE scheduler_lock(
    signature varchar(255) NOT NULL,
    date datetime DEFAULT NULL,
    PRIMARY KEY(signature)
);
Phallic answered 23/10, 2018 at 13:51 Comment(1)
Do you think it will work perfectly? Because if one of node will get down after taking lock then others willn't get to know why there is lock (in your case row entry corresponding to job in table).Bound
S
2

dlock is designed to run tasks only once by using database indexes and constraints. You can simply do something like below.

@Scheduled(cron = "30 30 3 * * *")
@TryLock(name = "executeMyTask", owner = SERVER_NAME, lockFor = THREE_MINUTES)
public void execute() {

}

See the article about using it.

Speight answered 27/10, 2018 at 17:3 Comment(1)
If using dlock .Assume we are using DB for maintaining lock. And one of the node in cluster got down unexpectedly after taking lock then what will happen in this scenario ? Will it be in deadlock state ?Bound
T
2

You can use Zookeeper here to elect master instance and master instance will only run the scheduled job.

One implementation here is with Aspect and Apache Curator

@SpringBootApplication
@EnableScheduling
public class Application {

    private static final int PORT = 2181;

    @Bean
    public CuratorFramework curatorFramework() {
        CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:" + PORT, new ExponentialBackoffRetry(1000, 3));
        client.start();
        return client;
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}

Aspect class

 @Aspect
@Component
public class LeaderAspect implements LeaderLatchListener{

    private static final Logger log = LoggerFactory.getLogger(LeaderAspect.class);
    private static final String ELECTION_ROOT = "/election";

    private volatile boolean isLeader = false;

    @Autowired
    public LeaderAspect(CuratorFramework client) throws Exception {
        LeaderLatch ll = new LeaderLatch(client , ELECTION_ROOT);
        ll.addListener(this);
        ll.start();
    }


    @Override
    public void isLeader() {
        isLeader = true;
        log.info("Leadership granted.");
    }

    @Override
    public void notLeader() {
        isLeader = false;
        log.info("Leadership revoked.");
    }


    @Around("@annotation(com.example.apache.curator.annotation.LeaderOnly)")
    public void onlyExecuteForLeader(ProceedingJoinPoint joinPoint) {
        if (!isLeader) {
            log.debug("I'm not leader, skip leader-only tasks.");
            return;
        }

        try {
            log.debug("I'm leader, execute leader-only tasks.");
            joinPoint.proceed();
        } catch (Throwable ex) {
            log.error(ex.getMessage());
        }
    }

}

LeaderOnlyAnnotation

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface LeaderOnly {
}

Scheduled Task

@Component
public class HelloWorld {

    private static final Logger log = LoggerFactory.getLogger(HelloWorld.class);


    @LeaderOnly
    @Scheduled(fixedRate = 1000L)
    public void sayHello() {
        log.info("Hello, world!");
    }
}
Threatt answered 10/9, 2021 at 19:1 Comment(1)
Thank you so much for this.. This is a wonderful soln and no need for a db level locking. I just integrated this to my zookeper and everything worked as expectredJeremiahjeremias
S
2

I am using a different approach without need to setup a database for managing the lock between the nodes.

The component is called FencedLock and is provided by Hazelcast.

We're using it to prevent another node to make some operation (not necessarily linked to schedule) but it could also be used for sharing a locks between nodes for a schedule.

For doing this, we just set up two functions helper that can create different lock names:

@Scheduled(cron = "${cron.expression}")
public void executeMyScheduler(){
   
   // This can also be a member of the class.
   HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();

   Lock lock = hazelcastInstance.getCPSubsystem().getLock("mySchedulerName");

   lock.lock();
   try {
      // do your schedule tasks here

   } finally {
      // don't forget to release lock whatever happens: end of task or any exceptions.
      lock.unlock();
   }
}

Alternatively you can also release automatically the lock after a delay: let say your cron job is running every hour, you can setup an automatic release after e.g. 50 minutes like this:

@Scheduled(cron = "${cron.expression}")
public void executeMyScheduler(){
   
   // This can also be a member of the class.
   HazelcastInstance hazelcastInstance = Hazelcast.newHazelcastInstance();

   Lock lock = hazelcastInstance.getCPSubsystem().getLock("mySchedulerName");

   if ( lock.tryLock ( 50, TimeUnit.MINUTES ) ) {
      try {
         // do your schedule tasks here
      } finally {
         // don't forget to release lock whatever happens: end of task or any exceptions.
         lock.unlock();
      }
   } else {
     // warning: lock has been released by timeout!
   }
}

Note that this Hazelcast component works very good in a cloud based environment (e.g. k8s clusters) and without need to pay for an extra database.

Here is what you need to configure:

// We need to specify the name otherwise it can conflict with internal Hazelcast beans
@Bean("hazelcastInstance")
public HazelcastInstance hazelcastInstance() {
    Config config = new Config();
    config.setClusterName(hazelcastProperties.getGroup().getName());
    NetworkConfig networkConfig = config.getNetworkConfig();

    networkConfig.setPortAutoIncrement(false);
    networkConfig.getJoin().getKubernetesConfig().setEnabled(hazelcastProperties.isNetworkEnabled())
            .setProperty("service-dns", hazelcastProperties.getServiceDNS())
            .setProperty("service-port", hazelcastProperties.getServicePort().toString());
    config.setProperty("hazelcast.metrics.enabled", "false");

    networkConfig.getJoin().getMulticastConfig().setEnabled(false);

    return Hazelcast.newHazelcastInstance(config);
}

The HazelcastProperties being the ConfigurationProperties object mapped with the properties.

For local testing you can just disable the network config by using the properties in your local profile:

hazelcast:
  network-enabled: false
  service-port: 5701
  group:
    name: your-hazelcast-group-name
Scanlan answered 21/6, 2022 at 22:13 Comment(0)
T
1

You could use an embeddable scheduler like db-scheduler to accomplish this. It has persistent executions and uses a simple optimistic locking mechanism to guarantee execution by a single node.

Example code for how the use-case can be achieved:

   RecurringTask<Void> recurring1 = Tasks.recurring("my-task-name", FixedDelay.of(Duration.ofSeconds(60)))
    .execute((taskInstance, executionContext) -> {
        System.out.println("Executing " + taskInstance.getTaskAndInstance());
    });

   final Scheduler scheduler = Scheduler
          .create(dataSource)
          .startTasks(recurring1)
          .build();

   scheduler.start();
Timber answered 19/10, 2018 at 10:32 Comment(0)
Q
0

I am using an free HTTP service called kJob-Manager. https://kjob-manager.ciesielski-systems.de/

Advantage is that you dont create a new table in your database and also dont need any database connection because it is just a HTTP request.

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.LinkedHashMap;
import org.apache.tomcat.util.json.JSONParser;
import org.apache.tomcat.util.json.ParseException;
import org.junit.jupiter.api.Test;

public class KJobManagerTest {

    @Test
    public void example() throws IOException, ParseException {

        String data = "{\"token\":\"<API-Token>\"}";
        URL url = new URL("https://kjob-manager.ciesielski-systems.de/api/ticket/<JOB-ID>");

        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
        connection.setRequestProperty("Content-Type", "application/json");
        connection.setRequestMethod("POST");
        connection.setDoOutput(true);
        connection.getOutputStream().write(data.getBytes(StandardCharsets.UTF_8));

        JSONParser jsonParser = new JSONParser(connection.getInputStream());
        LinkedHashMap<String, LinkedHashMap<String, Object>> result = (LinkedHashMap<String, LinkedHashMap<String, Object>>) jsonParser.parse();

        if ((boolean) result.get("ticket").get("open")) {
            System.out.println("This replica could run the cronjob!");
        } else {
            System.out.println("This replica has nothing to do!");
        }

    }

}
Quirt answered 8/2, 2023 at 20:22 Comment(0)
A
0

This question I found as I had similar requirements, but I implemented a little different.

1- create a post construct method where it inserts its hostname/ip/nodeid to a table (with auto identifier or sequence)

So every time a new node comes up it ll insert a record to this table APPNAME_NODE.

2- when scheduler starts , check/validate the record with max id in that table and that’s how it ll decides which host to run.

Anni answered 14/12, 2023 at 12:36 Comment(0)
L
-4

Spring context is not clustered so manage the task in distributed application is a little bit difficult and you need to use systems supporting jgroup to synchronis the state and let your task take the priority to execute the action. Or you could use ejb context to manage clustered ha singleton service like jboss ha environment https://developers.redhat.com/quickstarts/eap/cluster-ha-singleton/?referrer=jbd Or you could use clustered cache and access lock resource between the service and first service take the lock will beform the action or implement you own jgroup to communicat your service and perform the action one one node

Locoism answered 2/6, 2020 at 0:9 Comment(0)

© 2022 - 2024 — McMap. All rights reserved.