在Mongo中,我们如何利用ClusterListener?

9

我正尝试寻找一个示例或使用ClusterListener来优化和改进与MongoDB Java客户端集成的服务的调试信息。

我们如何有效地利用这个功能来改进我们使用复制的Mongo集群设置?

1个回答

9

TL;DR

ClusterListener接口可用于监视复制集的某些方面,但如果您想深入挖掘和/或在ClusterListener提供回调之外查询复制集状态,则可能更喜欢调用replSetGetStatus命令并检查其输出。

Detail

ClusterListener提供回调,允许您观察/响应对复制集的更改。例如,以下CLusterListener...

public class LoggingClusterListener implements ClusterListener {
    private static final Logger logger = LoggerFactory.getLogger(LoggingClusterListener.class);

    @Override
    public void clusterOpening(final ClusterOpeningEvent clusterOpeningEvent) {
        logger.info("clusterOpening: {}", clusterOpeningEvent.getClusterId().getValue());
    }

    @Override
    public void clusterClosed(final ClusterClosedEvent clusterClosedEvent) {
        logger.info("clusterClosed: {}", clusterClosedEvent.getClusterId().getValue());
    }

    @Override
    public void clusterDescriptionChanged(final ClusterDescriptionChangedEvent event) {
        logger.info("clusterDescriptionChanged: {}", event.getClusterId().getValue());
        for (ServerDescription sd : event.getNewDescription().getServerDescriptions()) {
            logger.info("{} / {} / {} / {}", sd.getType(), sd.getCanonicalAddress(), sd.getState().name());
        }
    }
}

当与MongoClient相关联时,可以像这样使用...

final MongoClientOptions options = MongoClientOptions.builder()
  .addClusterListener(new LoggingClusterListener())
  .build();
return new MongoClient(serverAddresses, options);

...将发出以下日志记录:

// cluster starting up ...
2017-08-17 12:49:55,977 [main]  clusterOpening: 599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   clusterDescriptionChanged: 599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostB:27017]   clusterDescriptionChanged: 599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostC:27017]   clusterDescriptionChanged: 599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   clusterDescriptionChanged   599582e36d47c231ec963b0b
2017-08-17 12:49:56,076 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_OTHER / hostB:27017 / CONNECTED / {}    
2017-08-17 12:49:56,077 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_OTHER / hostC:27017 / CONNECTED / {}    
2017-08-17 12:49:56,077 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_SECONDARY / hostA:27017 / CONNECTED / {}    
// ... the primary fails over to hostA:27017
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   clusterDescriptionChanged:  599582e36d47c231ec963b0b
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_OTHER / hostB:27017 / CONNECTED / {}    
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_SECONDARY / hostC:27017 / CONNECTED / {}    
2017-08-17 12:50:06,080 [cluster-ClusterId{value='599582e36d47c231ec963b0b', description='null'}-hostA:27017]   REPLICA_SET_PRIMARY / hostA:27017 / CONNECTED / {}  
2017-08-17 12:50:07,126 [main]  clusterClosed: 599582e36d47c231ec963b0b

也许这对您来说已经足够了,但如果不够的话,例如您想要主动监视副本集状态,而不仅是在以下情况下做出响应...
  • 集群启动
  • 集群停止
  • 集群描述更改
...那么您可能更喜欢定期采样副本集状态并报告/记录/警报结果的方式。您可以通过执行replSetGetStatus命令并询问结果来实现这一点。此命令返回一个BsonDocument(其格式在此处描述),该文档可被询问和记录。
记录状态文档是最简单的响应,但该方法可以通过根据文档内容引发警报来构建监控解决方案,例如:
  • replicationLag > 配置的阈值
  • lastHeartbeat > now() - 配置的阈值
  • 主服务器身份标识已更改
  • health != 1
  • 等等
以下代码读取副本集状态文档,对其进行询问(包括计算复制延迟),并记录输出。
MongoReplicaSetStatusLogger mongoReplicaSetStatusLogger = new MongoReplicaSetStatusLogger();

// periodically ...
MongoClient mongoClient = getMongoClient();

MongoDatabase admin = mongoClient.getDatabase("admin");
BsonDocument commandResult = admin.runCommand(new BsonDocument("replSetGetStatus", new BsonInt32(1)), BsonDocument.class);
mongoReplicaSetStatusLogger.report(commandResult);

这是`MongoReplicaSetStatusLogger`的实现代码:
import org.bson.BsonDocument;
import org.bson.BsonInvalidOperationException;
import org.bson.BsonNumber;
import org.bson.BsonValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Optional;

public class MongoReplicaSetStatusLogger {
    private static final Logger logger = LoggerFactory.getLogger(MongoReplicaSetStatusLogger.class);

    private static final SimpleDateFormat dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss,SSSZ");

    private static final String DEFAULT_VALUE = "UNKNOWN";
    private static final String MEMBERS = "members";

    public void report(BsonDocument replicasetStatusDocument) {
        if (hasMembers(replicasetStatusDocument)) {
            replicasetStatusDocument.getArray(MEMBERS).stream()
                    .filter(BsonValue::isDocument)
                    .map(memberDocument -> (BsonDocument) memberDocument)
                    .forEach(memberDocument -> logMemberDocument(memberDocument));
        } else {
            logger.warn("The replicaset status document does not contain a '{}' attributes, perhaps there has been " +
                    "a MongoDB upgrade and the format has changed!", MEMBERS);
        }
    }

    private boolean hasMembers(BsonDocument replicasetStatusDocument) {
        return replicasetStatusDocument.containsKey(MEMBERS) && replicasetStatusDocument.get(MEMBERS).isArray();
    }

    private void logMemberDocument(BsonDocument memberDocument) {
        StringBuilder stringBuilder = new StringBuilder()
                .append(logAttribute("node", getStringValue(memberDocument, "name")))
                .append(logAttribute("health", getNumericValue(memberDocument, "health")))
                .append(logAttribute("state", getStringValue(memberDocument, "stateStr")))
                .append(logAttribute("uptime(s)", getNumericValue(memberDocument, "uptime")))
                .append(logAttribute("lastOptime", getDateTimeValue(memberDocument, "optimeDate")))
                .append(logAttribute("lastHeartbeat", getDateTimeValue(memberDocument, "lastHeartbeat")))
                .append(logAttribute("lastHeartbeatRecv", getDateTimeValue(memberDocument, "lastHeartbeatRecv")))
                .append(logAttribute("ping(ms)", getNumericValue(memberDocument, "pingMs")))
                .append(logAttribute("replicationLag(s)", getReplicationLag(memberDocument)));

        logger.error(stringBuilder.toString());
    }

    private String logAttribute(String key, Optional<String> value) {
        return new StringBuilder(key).append("=").append(value.orElse(DEFAULT_VALUE)).append("|").toString();
    }

    private Optional<String> getStringValue(BsonDocument memberDocument, String key) {
        if (memberDocument.containsKey(key)) {
            try {
                return Optional.of(memberDocument.getString(key).getValue().toUpperCase());
            } catch (BsonInvalidOperationException e) {
                logger.warn("Exception reading: {} from replicaset status document, message: {}.", key, e.getMessage());
            }
        }
        return Optional.empty();
    }

    private Optional<String> getNumericValue(BsonDocument memberDocument, String key) {
        if (memberDocument.containsKey(key)) {
            BsonNumber bsonNumber = memberDocument.getNumber(key);
            if (bsonNumber.isInt32()) {
                return Optional.of(Integer.toString(bsonNumber.intValue()));
            } else if (bsonNumber.isInt64()) {
                return Optional.of(Long.toString(bsonNumber.longValue()));
            } else if (bsonNumber.isDouble()) {
                return Optional.of(Double.toString(bsonNumber.doubleValue()));
            }
        }
        return Optional.empty();
    }

    private Optional<String> getDateTimeValue(BsonDocument memberDocument, String key) {
        if (memberDocument.containsKey(key)) {
            try {
                return Optional.of(dateFormatter.format(new Date(memberDocument.getDateTime(key).getValue())));
            } catch (BsonInvalidOperationException e) {
                logger.warn("Exception reading: {} from replicaset status document due to: {}!", key, e.getMessage());
            }
        }
        return Optional.empty();
    }

    private Optional<String> getReplicationLag(BsonDocument memberDocument) {
        if (memberDocument.containsKey("optimeDate") && memberDocument.containsKey("lastHeartbeat")) {
            try {
                long optimeDate = memberDocument.getDateTime("optimeDate").getValue();
                long lastHeartbeat = memberDocument.getDateTime("lastHeartbeat").getValue();
                long replicationLag = lastHeartbeat - optimeDate;
                return Optional.of(Long.toString(replicationLag));
            } catch (BsonInvalidOperationException e) {
                logger.warn("Exception reading 'optimeDate' or 'lastHeartbeat' from replicaset status document due to: {}!", e.getMessage());
            } catch (IllegalArgumentException e) {
                logger.warn("Exception calculating the replication lag due to: {}!", e.getMessage());
            }
        }
        return Optional.empty();
    }
}

以下是输出的示例:
2017-08-17 15:44:35,192|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostA:27017|health=1.0|state=PRIMARY|uptime(s)=21|lastOptime=2017-08-17T15:43:32,000+0100|lastHeartbeat=UNKNOWN|lastHeartbeatRecv=UNKNOWN|ping(ms)=UNKNOWN|replicationLag(s)=UNKNOWN|
2017-08-17 15:44:35,193|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostB:27017|health=1.0|state=SECONDARY|uptime(s)=17|lastOptime=2017-08-17T15:43:20,000+0100|lastHeartbeat=2017-08-17T15:43:35,443+0100|lastHeartbeatRecv=2017-08-17T15:43:36,412+0100|ping(ms)=0|replicationLag(s)=15443|
2017-08-17 15:44:35,193|[main]|ERROR|MongoReplicaSetStatusLogger|node=hostC:27017|health=1.0|state=SECONDARY|uptime(s)=17|lastOptime=2017-08-17T15:43:20,000+0100|lastHeartbeat=2017-08-17T15:43:35,444+0100|lastHeartbeatRecv=2017-08-17T15:43:36,470+0100|ping(ms)=0|replicationLag(s)=15444|

谢谢您的见解。 :) - Naman

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接