How could we make use of ClusterListener in Mongo?
Asked Answered
L

1

9

I was trying to look for an example or usage of the ClusterListener to optimize and improve the debugging information of a service integrated with MongoDB Java client.

How could this be used by us effectively to improve on our Mongo cluster set using Replication?

Lewendal answered 10/7, 2017 at 17:23 Comment(0)
B
9

TL;DR

The ClusterListener interface can be used to monitor some aspects of a replicaset but if you want to dig deeper and/or if you want to interrogate the replicaset status outside of the events for which the ClusterListener provides callbacks then you might prefer to invoke the replSetGetStatus command and inspect its output.

Detail

The ClusterListener provides call backs which allow you to watch/respond to changes to your replicaset. For example, the following CLusterListener ...

public class LoggingClusterListener implements ClusterListener {
    private static final Logger logger = LoggerFactory.getLogger(LoggingClusterListener.class);

    @Override
    public void clusterOpening(final ClusterOpeningEvent clusterOpeningEvent) {
        logger.info("clusterOpening: {}", clusterOpeningEvent.getClusterId().getValue());
    }

    @Override
    public void clusterClosed(final ClusterClosedEvent clusterClosedEvent) {
        logger.info("clusterClosed: {}", clusterClosedEvent.getClusterId().getValue());
    }

    @Override
    public void clusterDescriptionChanged(final ClusterDescriptionChangedEvent event) {
        logger.info("clusterDescriptionChanged: {}", event.getClusterId().getValue());
        for (ServerDescription sd : event.getNewDescription().getServerDescriptions()) {
            logger.info("{} / {} / {} / {}", sd.getType(), sd.getCanonicalAddress(), sd.getState().name());
        }
    }
}

... when associated with a MongoClient like this ...

final MongoClientOptions options = MongoClientOptions.builder()
  .addClusterListener(new LoggingClusterListener())
  .build();
return new MongoClient(serverAddresses, options);

... will emit the following logging:

// cluster starting up ...
2017-08-17 12:49:55,977 [main]  clusterOpening: 599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   clusterDescriptionChanged: 599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostB:27017]   clusterDescriptionChanged: 599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostC:27017]   clusterDescriptionChanged: 599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   clusterDescriptionChanged   599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_OTHER / hostB:27017 / CONNECTED / {}    
2017-08-17 12:49:56,077 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_OTHER / hostC:27017 / CONNECTED / {}    
2017-08-17 12:49:56,077 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_SECONDARY / hostA:27017 / CONNECTED / {}    
// ... the primary fails over to hostA:27017
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   clusterDescriptionChanged:  599582e36d47c231ec963b0b
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_OTHER / hostB:27017 / CONNECTED / {}    
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_SECONDARY / hostC:27017 / CONNECTED / {}    
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_PRIMARY / hostA:27017 / CONNECTED / {}  
2017-08-17 12:50:07,126 [main]  clusterClosed: 599582e36d47c231ec963b0b

Perhaps this is sufficient for you but if not, if for example you want to actively monitor replicaset status - rather than only responding when one of the following happens ...

  • Cluster start
  • Cluster stop
  • Cluster description changes

... then you might prefer to periodically sample the replicaset status and report/log/alert on the results. You can do this by executing the replSetGetStatus command and interrogating the results. This command returns a BsonDocument (the format of which is described here) which can be interrogated and logged.

Logging the status document is the simplest response but that approach could be enhanced to form the basis of a monitoring solution by raising alerts on the basis of the document's contents e.g.

  • replicationLag > configured threadhold
  • lastHeartbeat > now() - configured threshold
  • identity of the primary has changed
  • health != 1
  • etc

The following code reads the replicaset status document, interrogates it (including calculating the replication lag) and logs the output.

MongoReplicaSetStatusLogger mongoReplicaSetStatusLogger = new MongoReplicaSetStatusLogger();

// periodically ...
MongoClient mongoClient = getMongoClient();

MongoDatabase admin = mongoClient.getDatabase("admin");
BsonDocument commandResult = admin.runCommand(new BsonDocument("replSetGetStatus", new BsonInt32(1)), BsonDocument.class);
mongoReplicaSetStatusLogger.report(commandResult);

Here's the MongoReplicaSetStatusLogger implementation:

import org.bson.BsonDocument;
import org.bson.BsonInvalidOperationException;
import org.bson.BsonNumber;
import org.bson.BsonValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Optional;

public class MongoReplicaSetStatusLogger {
    private static final Logger logger = LoggerFactory.getLogger(MongoReplicaSetStatusLogger.class);

    private static final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss,SSSZ");

    private static final String DEFAULT_VALUE = "UNKNOWN";
    private static final String MEMBERS = "members";

    public void report(BsonDocument replicasetStatusDocument) {
        if (hasMembers(replicasetStatusDocument)) {
            replicasetStatusDocument.getArray(MEMBERS).stream()
                    .filter(BsonValue::isDocument)
                    .map(memberDocument -> (BsonDocument) memberDocument)
                    .forEach(memberDocument -> logMemberDocument(memberDocument));
        } else {
            logger.warn("The replicaset status document does not contain a '{}' attributes, perhaps there has been " +
                    "a MongoDB upgrade and the format has changed!", MEMBERS);
        }
    }

    private boolean hasMembers(BsonDocument replicasetStatusDocument) {
        return replicasetStatusDocument.containsKey(MEMBERS) && replicasetStatusDocument.get(MEMBERS).isArray();
    }

    private void logMemberDocument(BsonDocument memberDocument) {
        StringBuilder stringBuilder = new StringBuilder()
                .append(logAttribute("node", getStringValue(memberDocument, "name")))
                .append(logAttribute("health", getNumericValue(memberDocument, "health")))
                .append(logAttribute("state", getStringValue(memberDocument, "stateStr")))
                .append(logAttribute("uptime(s)", getNumericValue(memberDocument, "uptime")))
                .append(logAttribute("lastOptime", getDateTimeValue(memberDocument, "optimeDate")))
                .append(logAttribute("lastHeartbeat", getDateTimeValue(memberDocument, "lastHeartbeat")))
                .append(logAttribute("lastHeartbeatRecv", getDateTimeValue(memberDocument, "lastHeartbeatRecv")))
                .append(logAttribute("ping(ms)", getNumericValue(memberDocument, "pingMs")))
                .append(logAttribute("replicationLag(s)", getReplicationLag(memberDocument)));

        logger.error(stringBuilder.toString());
    }

    private String logAttribute(String key, Optional<String> value) {
        return new StringBuilder(key).append("=").append(value.orElse(DEFAULT_VALUE)).append("|").toString();
    }

    private Optional<String> getStringValue(BsonDocument memberDocument, String key) {
        if (memberDocument.containsKey(key)) {
            try {
                return Optional.of(memberDocument.getString(key).getValue().toUpperCase());
            } catch (BsonInvalidOperationException e) {
                logger.warn("Exception reading: {} from replicaset status document, message: {}.", key, e.getMessage());
            }
        }
        return Optional.empty();
    }

    private Optional<String> getNumericValue(BsonDocument memberDocument, String key) {
        if (memberDocument.containsKey(key)) {
            BsonNumber bsonNumber = memberDocument.getNumber(key);
            if (bsonNumber.isInt32()) {
                return Optional.of(Integer.toString(bsonNumber.intValue()));
            } else if (bsonNumber.isInt64()) {
                return Optional.of(Long.toString(bsonNumber.longValue()));
            } else if (bsonNumber.isDouble()) {
                return Optional.of(Double.toString(bsonNumber.doubleValue()));
            }
        }
        return Optional.empty();
    }

    private Optional<String> getDateTimeValue(BsonDocument memberDocument, String key) {
        if (memberDocument.containsKey(key)) {
            try {
                return Optional.of(dateFormatter.format(new Date(memberDocument.getDateTime(key).getValue())));
            } catch (BsonInvalidOperationException e) {
                logger.warn("Exception reading: {} from replicaset status document due to: {}!", key, e.getMessage());
            }
        }
        return Optional.empty();
    }

    private Optional<String> getReplicationLag(BsonDocument memberDocument) {
        if (memberDocument.containsKey("optimeDate") && memberDocument.containsKey("lastHeartbeat")) {
            try {
                long optimeDate = memberDocument.getDateTime("optimeDate").getValue();
                long lastHeartbeat = memberDocument.getDateTime("lastHeartbeat").getValue();
                long replicationLag = lastHeartbeat - optimeDate;
                return Optional.of(Long.toString(replicationLag));
            } catch (BsonInvalidOperationException e) {
                logger.warn("Exception reading 'optimeDate' or 'lastHeartbeat' from replicaset status document due to: {}!", e.getMessage());
            } catch (IllegalArgumentException e) {
                logger.warn("Exception calculating the replication lag due to: {}!", e.getMessage());
            }
        }
        return Optional.empty();
    }
}

Here's an example of the output:

2017-08-17 15:44:35,192|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostA:27017|health=1.0|state=PRIMARY|uptime(s)=21|lastOptime=2017-08-17T15:43:32,000+0100|lastHeartbeat=UNKNOWN|lastHeartbeatRecv=UNKNOWN|ping(ms)=UNKNOWN|replicationLag(s)=UNKNOWN|
2017-08-17 15:44:35,193|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostB:27017|health=1.0|state=SECONDARY|uptime(s)=17|lastOptime=2017-08-17T15:43:20,000+0100|lastHeartbeat=2017-08-17T15:43:35,443+0100|lastHeartbeatRecv=2017-08-17T15:43:36,412+0100|ping(ms)=0|replicationLag(s)=15443|
2017-08-17 15:44:35,193|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostC:27017|health=1.0|state=SECONDARY|uptime(s)=17|lastOptime=2017-08-17T15:43:20,000+0100|lastHeartbeat=2017-08-17T15:43:35,444+0100|lastHeartbeatRecv=2017-08-17T15:43:36,470+0100|ping(ms)=0|replicationLag(s)=15444|
Butterball answered 17/8, 2017 at 14:52 Comment(1)
Thank you for the insight. :)Lewendal

© 2022 - 2024 — McMap. All rights reserved.