企业🤖AI Agent构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] EtcdMetrics这个Measurement主要是用来收集Etcd的一些指标,该Measurement的结构体定义如下: ``` // clusterloader2/pkg/measurement/common/etcd_metrics.go type etcdMetricsMeasurement struct { sync.Mutex isRunning bool stopCh chan struct{} wg *sync.WaitGroup metrics *etcdMetrics } type etcdMetrics struct { BackendCommitDuration measurementutil.HistogramVec `json:"backendCommitDuration"` SnapshotSaveTotalDuration measurementutil.HistogramVec `json:"snapshotSaveTotalDuration"` PeerRoundTripTime measurementutil.HistogramVec `json:"peerRoundTripTime"` WalFsyncDuration measurementutil.HistogramVec `json:"walFsyncDuration"` MaxDatabaseSize float64 `json:"maxDatabaseSize"` } ``` 可以看出,它主要收集的Etcd的BackendCommitDuration等五个指标,这五个指标其实在Grafana中也可以看到。如果需要了解这五个指标具体的含义及类型,可以参考etcd的metrics接口或者自行google。 我们再来看一下它是如何收集这些指标的。我们来分析它的Excute函数: ``` // Execute supports two actions: // - start - Starts collecting etcd metrics. // - gather - Gathers and prints etcd metrics summary. func (e *etcdMetricsMeasurement) Execute(config *measurement.Config) ([]measurement.Summary, error) { provider := config.ClusterFramework.GetClusterConfig().Provider // 判断provider是否支持ssh到master // Etcd is only exposed on localhost level. We are using ssh method if !provider.Features().SupportSSHToMaster { klog.Warningf("not grabbing etcd metrics through master SSH: unsupported for provider, %s", config.ClusterFramework.GetClusterConfig().Provider.Name()) return nil, nil } action, err := util.GetString(config.Params, "action") if err != nil { return nil, err } // 从config(即从clusterloader的启动参数--masterip)中获取MasterIPs hosts := config.ClusterFramework.GetClusterConfig().MasterIPs if len(hosts) < 1 { klog.Warningf("ETCD measurements will be disabled due to no MasterIps: %v", hosts) return nil, nil } // 从config(即从clusterloader的启动参数--etcd-insecure-port)中获取etcd的metrics端口,clusterloader2默认为2382 etcdInsecurePort := config.ClusterFramework.GetClusterConfig().EtcdInsecurePort switch action { case "start": klog.V(2).Infof("%s: starting etcd metrics collecting...", e) waitTime, err := util.GetDurationOrDefault(config.Params, "waitTime", time.Minute) if err != nil { return nil, err } for _, h := range hosts { // 调用startCollecting函数开始收集metrics e.startCollecting(h, provider, waitTime, etcdInsecurePort) } return nil, nil case "gather": for _, h := range hosts { // 调用该函数停止收集并统计结果 if err = e.stopAndSummarize(h, provider, etcdInsecurePort); err != nil { return nil, err } } content, err := util.PrettyPrintJSON(e.metrics) if err != nil { return nil, err } summary := measurement.CreateSummary(etcdMetricsMetricName, "json", content) return []measurement.Summary{summary}, nil default: return nil, fmt.Errorf("unknown action %v", action) } } ``` 从上面可以看到,代码中使用ssh到每个master节点上,去获取etcd的指标。收集指标时调用的函数为`startColleting`,该函数如下: ``` func (e *etcdMetricsMeasurement) startCollecting(host string, provider provider.Provider, interval time.Duration, port int) { e.isRunning = true e.wg.Add(1) collectEtcdDatabaseSize := func() error { dbSize, err := e.getEtcdDatabaseSize(host, provider, port) if err != nil { return err } e.Lock() defer e.Unlock() e.metrics.MaxDatabaseSize = math.Max(e.metrics.MaxDatabaseSize, dbSize) return nil } go func() { defer e.wg.Done() for { select { case <-time.After(interval): err := collectEtcdDatabaseSize() if err != nil { klog.Errorf("%s: failed to collect etcd database size", e) continue } case <-e.stopCh: return } } }() } ``` 可以看出,它只是调用了getEtcdDatabaseSize函数来获取db的大小。该函数如下: ``` func (e *etcdMetricsMeasurement) getEtcdDatabaseSize(host string, provider provider.Provider, port int) (float64, error) { // 从metrics接口获取所有指标 samples, err := e.getEtcdMetrics(host, provider, port) if err != nil { return 0, err } for _, sample := range samples { // 过滤只返回这一个指标 if sample.Metric[model.MetricNameLabel] == "etcd_debugging_mvcc_db_total_size_in_bytes" { return float64(sample.Value), nil } } return 0, fmt.Errorf("couldn't find etcd database size metric") } ``` 可以看出,该函数首先从etcd的metrics接口获取所有的指标,然后从中拿到`etcd_debugging_mvcc_db_total_size_in_bytes`这个指标,返回。获取etcd所有指标调用的函数为getEtcdMetrics,该函数如下: ``` func (e *etcdMetricsMeasurement) getEtcdMetrics(host string, provider provider.Provider, port int) ([]*model.Sample, error) { // In https://github.com/kubernetes/kubernetes/pull/74690, mTLS is enabled for etcd server // in order to bypass TLS credential requirement when checking etc /metrics and /health, you // need to provide the insecure http port number to access etcd, http://localhost:2382 for // example. // ssh到master主机上,首先通过curl http://localhost:2382/metrics 获取指标 cmd := fmt.Sprintf("curl http://localhost:%d/metrics", port) if samples, err := e.sshEtcdMetrics(cmd, host, provider); err == nil { return samples, nil } // Use old endpoint if new one fails, "2379" is hard-coded here as well, it is kept as is since // we don't want to bloat the cluster config only for a fall-back attempt. // 如果上面的方式获取失败,则先在执行clusterloader2命令的主机上获取etcd的key cert host etcdCert, etcdKey, etcdHost := os.Getenv("ETCD_CERTIFICATE"), os.Getenv("ETCD_KEY"), os.Getenv("ETCD_HOST") if etcdHost == "" { etcdHost = "localhost" } if etcdCert == "" || etcdKey == "" { klog.Warning("empty etcd cert or key, using http") // 如果没有获取到key和cert,则使用http://{etcdHost}:2379/metrics去获取,注意端口已经是2379 cmd = fmt.Sprintf("curl http://%s:2379/metrics", etcdHost) } else { // 如果没有获取到key和cert,则使用https://{etcdHost}:2379/metrics去获取,注意端口已经是2379 cmd = fmt.Sprintf("curl -k --cert %s --key %s https://%s:2379/metrics", etcdCert, etcdKey, etcdHost) } // 调用该函数,该函数就是ssh到host上执行cmd return e.sshEtcdMetrics(cmd, host, provider) } ``` 可以看出,getEtcdMetrics函数,本质就是ssh到master节点上,执行curl命令来获取etcd的metrics。 上面的start action阶段,只获取了db size这个指标,其他的指标是在gather阶段获取,可以自行分析函数stopAndSummarize。