The is another simple and robust way to safe execute a job in a cluster. You can based on database and execute the task only if the node is the "leader" in the cluster.
Also when a node is failed or shutdown in the cluster another node became the leader.
All you have is to create a "leader election" mechanism and every time to check if your are the leader:
@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
if (checkIfLeader()) {
final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
for (EmailTask emailTask : list) {
dispatchService.sendEmail(emailTask);
}
}
}
Follow those steps:
1.Define the object and table that holds one entry per node in the cluster:
@Entity(name = "SYS_NODE")
public class SystemNode {
/** The id. */
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
/** The name. */
@Column(name = "TIMESTAMP")
private String timestamp;
/** The ip. */
@Column(name = "IP")
private String ip;
/** The last ping. */
@Column(name = "LAST_PING")
private Date lastPing;
/** The last ping. */
@Column(name = "CREATED_AT")
private Date createdAt = new Date();
/** The last ping. */
@Column(name = "IS_LEADER")
private Boolean isLeader = Boolean.FALSE;
public Long getId() {
return id;
}
public void setId(final Long id) {
this.id = id;
}
public String getTimestamp() {
return timestamp;
}
public void setTimestamp(final String timestamp) {
this.timestamp = timestamp;
}
public String getIp() {
return ip;
}
public void setIp(final String ip) {
this.ip = ip;
}
public Date getLastPing() {
return lastPing;
}
public void setLastPing(final Date lastPing) {
this.lastPing = lastPing;
}
public Date getCreatedAt() {
return createdAt;
}
public void setCreatedAt(final Date createdAt) {
this.createdAt = createdAt;
}
public Boolean getIsLeader() {
return isLeader;
}
public void setIsLeader(final Boolean isLeader) {
this.isLeader = isLeader;
}
@Override
public String toString() {
return "SystemNode{" +
"id=" + id +
", timestamp='" + timestamp + '\'' +
", ip='" + ip + '\'' +
", lastPing=" + lastPing +
", createdAt=" + createdAt +
", isLeader=" + isLeader +
'}';
}
}
2.Create the service that a) insert the node in database , b) check for leader
@Service
@Transactional
public class SystemNodeServiceImpl implements SystemNodeService, ApplicationListener {
/** The logger. */
private static final Logger LOGGER = Logger.getLogger(SystemNodeService.class);
/** The constant NO_ALIVE_NODES. */
private static final String NO_ALIVE_NODES = "Not alive nodes found in list {0}";
/** The ip. */
private String ip;
/** The system service. */
private SystemService systemService;
/** The system node repository. */
private SystemNodeRepository systemNodeRepository;
@Autowired
public void setSystemService(final SystemService systemService) {
this.systemService = systemService;
}
@Autowired
public void setSystemNodeRepository(final SystemNodeRepository systemNodeRepository) {
this.systemNodeRepository = systemNodeRepository;
}
@Override
public void pingNode() {
final SystemNode node = systemNodeRepository.findByIp(ip);
if (node == null) {
createNode();
} else {
updateNode(node);
}
}
@Override
public void checkLeaderShip() {
final List<SystemNode> allList = systemNodeRepository.findAll();
final List<SystemNode> aliveList = filterAliveNodes(allList);
SystemNode leader = findLeader(allList);
if (leader != null && aliveList.contains(leader)) {
setLeaderFlag(allList, Boolean.FALSE);
leader.setIsLeader(Boolean.TRUE);
systemNodeRepository.save(allList);
} else {
final SystemNode node = findMinNode(aliveList);
setLeaderFlag(allList, Boolean.FALSE);
node.setIsLeader(Boolean.TRUE);
systemNodeRepository.save(allList);
}
}
/**
* Returns the leaded
* @param list
* the list
* @return the leader
*/
private SystemNode findLeader(final List<SystemNode> list) {
for (SystemNode systemNode : list) {
if (systemNode.getIsLeader()) {
return systemNode;
}
}
return null;
}
@Override
public boolean isLeader() {
final SystemNode node = systemNodeRepository.findByIp(ip);
return node != null && node.getIsLeader();
}
@Override
public void onApplicationEvent(final ApplicationEvent applicationEvent) {
try {
ip = InetAddress.getLocalHost().getHostAddress();
} catch (Exception e) {
throw new RuntimeException(e);
}
if (applicationEvent instanceof ContextRefreshedEvent) {
pingNode();
}
}
/**
* Creates the node
*/
private void createNode() {
final SystemNode node = new SystemNode();
node.setIp(ip);
node.setTimestamp(String.valueOf(System.currentTimeMillis()));
node.setCreatedAt(new Date());
node.setLastPing(new Date());
node.setIsLeader(CollectionUtils.isEmpty(systemNodeRepository.findAll()));
systemNodeRepository.save(node);
}
/**
* Updates the node
*/
private void updateNode(final SystemNode node) {
node.setLastPing(new Date());
systemNodeRepository.save(node);
}
/**
* Returns the alive nodes.
*
* @param list
* the list
* @return the alive nodes
*/
private List<SystemNode> filterAliveNodes(final List<SystemNode> list) {
int timeout = systemService.getSetting(SettingEnum.SYSTEM_CONFIGURATION_SYSTEM_NODE_ALIVE_TIMEOUT, Integer.class);
final List<SystemNode> finalList = new LinkedList<>();
for (SystemNode systemNode : list) {
if (!DateUtils.hasExpired(systemNode.getLastPing(), timeout)) {
finalList.add(systemNode);
}
}
if (CollectionUtils.isEmpty(finalList)) {
LOGGER.warn(MessageFormat.format(NO_ALIVE_NODES, list));
throw new RuntimeException(MessageFormat.format(NO_ALIVE_NODES, list));
}
return finalList;
}
/**
* Finds the min name node.
*
* @param list
* the list
* @return the min node
*/
private SystemNode findMinNode(final List<SystemNode> list) {
SystemNode min = list.get(0);
for (SystemNode systemNode : list) {
if (systemNode.getTimestamp().compareTo(min.getTimestamp()) < -1) {
min = systemNode;
}
}
return min;
}
/**
* Sets the leader flag.
*
* @param list
* the list
* @param value
* the value
*/
private void setLeaderFlag(final List<SystemNode> list, final Boolean value) {
for (SystemNode systemNode : list) {
systemNode.setIsLeader(value);
}
}
}
3.ping the database to send that your are alive
@Override
@Scheduled(cron = "0 0/5 * * * ?")
public void executeSystemNodePing() {
systemNodeService.pingNode();
}
@Override
@Scheduled(cron = "0 0/10 * * * ?")
public void executeLeaderResolution() {
systemNodeService.checkLeaderShip();
}
4.you are ready! Just check if you are the leader before execute the task:
@Override
@Scheduled(cron = "*/30 * * * * *")
public void executeFailedEmailTasks() {
if (checkIfLeader()) {
final List<EmailTask> list = emailTaskService.getFailedEmailTasks();
for (EmailTask emailTask : list) {
dispatchService.sendEmail(emailTask);
}
}
}
CronJob
inkubernetes
? – BureaucratFencedLock
more appropriate for cluster environment k8s (in cloud or not) than relying on a DB. – Padraig