为了账号安全,请及时绑定邮箱和手机立即绑定

Prometheus Exporter - 直接检测与自定义收集器

Prometheus Exporter - 直接检测与自定义收集器

Go
浮云间 2023-07-10 14:31:36
我目前正在为遥测网络应用程序编写一个 Prometheus 导出器。我已阅读此处的文档“编写导出器”,虽然我了解实现自定义收集器以避免竞争条件的用例,但我不确定我的用例是否适合直接检测。基本上,网络指标通过网络设备通过 gRPC 进行流式传输,因此我的导出器只需接收它们,而不必有效地抓取它们。我使用以下代码直接检测:我使用 promauto 包声明我的指标以保持代码紧凑:package metricsimport (    "github.com/lucabrasi83/prom-high-obs/proto/telemetry"    "github.com/prometheus/client_golang/prometheus"    "github.com/prometheus/client_golang/prometheus/promauto")var (    cpu5Sec = promauto.NewGaugeVec(        prometheus.GaugeOpts{            Name: "cisco_iosxe_iosd_cpu_busy_5_sec_percentage",            Help: "The IOSd daemon CPU busy percentage over the last 5 seconds",        },        []string{"node"},    )下面是我如何简单地设置 gRPC 协议缓冲区解码消息的指标值:cpu5Sec.WithLabelValues(msg.GetNodeIdStr()).Set(float64(val))最后,这是我的主循环,它基本上处理我感兴趣的指标的遥测 gRPC 流:for {        req, err := stream.Recv()        if err == io.EOF {            return nil        }        if err != nil {            logging.PeppaMonLog(                "error",                fmt.Sprintf("Error while reading client %v stream: %v", clientIPSocket, err))            return err        }        data := req.GetData()        msg := &telemetry.Telemetry{}        err = proto.Unmarshal(data, msg)        if err != nil {            log.Fatalln(err)        }        if !logFlag {            logging.PeppaMonLog(                "info",                fmt.Sprintf(                    "Telemetry Subscription Request Received - Client %v - Node %v - YANG Model Path %v",                    clientIPSocket, msg.GetNodeIdStr(), msg.GetEncodingPath(),                ),            )        }        }}我使用 Grafana 作为前端,到目前为止,在关联 Prometheus 公开的指标与直接在设备上检查指标时,还没有看到任何特定的差异。所以我想了解这是否遵循 Prometheus 最佳实践,或者我仍然应该采用自定义收集器路线。
查看完整描述

1 回答

?
凤凰求蛊

TA贡献1825条经验 获得超4个赞

您没有遵循最佳实践,因为您正在使用您链接到的文章所警告的全局指标。使用您当前的实现,在设备断开连接后(或者更准确地说,直到您的导出器重新启动),您的仪表板将永远显示 CPU 指标的一些任意且恒定的值。


相反,RPC 方法应该维护一组本地指标,并在方法返回后将其删除。这样,当设备断开连接时,设备的指标就会从抓取输出中消失。


这是执行此操作的一种方法。它使用包含当前活动指标的地图。每个映射元素都是一个特定流的一组指标(我理解它对应于一个设备)。一旦流结束,该条目就会被删除。


package main


import (

    "sync"


    "github.com/prometheus/client_golang/prometheus"

)


// Exporter is a prometheus.Collector implementation.

type Exporter struct {

    // We need some way to map gRPC streams to their metrics. Using the stream

    // itself as a map key is simple enough, but anything works as long as we

    // can remove metrics once the stream ends.

    sync.Mutex

    Metrics map[StreamServer]*DeviceMetrics

}


type DeviceMetrics struct {

    sync.Mutex


    CPU prometheus.Metric

}


// Globally defined descriptions are fine.

var cpu5SecDesc = prometheus.NewDesc(

    "cisco_iosxe_iosd_cpu_busy_5_sec_percentage",

    "The IOSd daemon CPU busy percentage over the last 5 seconds",

    []string{"node"},

    nil, // constant labels

)


// Collect implements prometheus.Collector.

func (e *Exporter) Collect(ch chan<- prometheus.Metric) {

    // Copy current metrics so we don't lock for very long if ch's consumer is

    // slow.

    var metrics []prometheus.Metric


    e.Lock()

    for _, deviceMetrics := range e.Metrics {

        deviceMetrics.Lock()

        metrics = append(metrics,

            deviceMetrics.CPU,

        )

        deviceMetrics.Unlock()

    }

    e.Unlock()


    for _, m := range metrics {

        if m != nil {

            ch <- m

        }

    }

}


// Describe implements prometheus.Collector.

func (e *Exporter) Describe(ch chan<- *prometheus.Desc) {

    ch <- cpu5SecDesc

}


// Service is the gRPC service implementation.

type Service struct {

    exp *Exporter

}


func (s *Service) RPCMethod(stream StreamServer) (*Response, error) {

    deviceMetrics := new(DeviceMetrics)


    s.exp.Lock()

    s.exp.Metrics[stream] = deviceMetrics

    s.exp.Unlock()


    defer func() {

        // Stop emitting metrics for this stream.

        s.exp.Lock()

        delete(s.exp.Metrics, stream)

        s.exp.Unlock()

    }()


    for {

        req, err := stream.Recv()

        // TODO: handle error


        var msg *Telemetry = parseRequest(req) // Your existing code that unmarshals the nested message.


        var (

            metricField *prometheus.Metric

            metric      prometheus.Metric

        )


        switch msg.GetEncodingPath() {

        case CpuYANGEncodingPath:

            metricField = &deviceMetrics.CPU

            metric = prometheus.MustNewConstMetric(

                cpu5SecDesc,

                prometheus.GaugeValue,

                ParsePBMsgCpuBusyPercent(msg), // func(*Telemetry) float64

                "node", msg.GetNodeIdStr(),

            )

        default:

            continue

        }


        deviceMetrics.Lock()

        *metricField = metric

        deviceMetrics.Unlock()

    }


    return nil, &Response{}

}



查看完整回答
反对 回复 2023-07-10
  • 1 回答
  • 0 关注
  • 109 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信