Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ var (
WalDir = kingpin.Flag("wal-dir", "Path to where the agent stores data (e.g. the metrics Write-Ahead Log)").Default("/tmp/coroot-node-agent").Envar("WAL_DIR").String()
MaxSpoolSize = kingpin.Flag("max-spool-size", "Maximum size of the on-disk spool used to buffer data when it cannot be sent to collector. Supports size suffixes like KB, MB, or GB.").Default("500MB").Envar("MAX_SPOOL_SIZE").Bytes()

AgentMetricsListen = kingpin.Flag("agent-metrics-listen", "Listen address for agent's own metrics (e.g. Prometheus scraping)").Default(":9093").Envar("AGENT_METRICS_LISTEN").String()

agentVersion = kingpin.Flag("version", "Print version and exit").Default("false").Bool()
Version = "unknown"
)
Expand Down
17 changes: 15 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func main() {
nodeCollector := node.NewCollector(hostname, kv)

registry := prometheus.NewRegistry()
internalReg := prometheus.NewRegistry()

registerer := prometheus.WrapRegistererWith(
prometheus.Labels{"machine_id": machineId, "system_uuid": systemUuid},
Expand All @@ -154,7 +155,7 @@ func main() {
if err := registerer.Register(gpuCollector); err != nil {
klog.Exitln(err)
}
registerer.MustRegister(info("node_agent_info", version))
internalReg.MustRegister(info("node_agent_info", version))

if md := nodeCollector.Metadata(); md != nil {
region := md.Region
Expand All @@ -170,14 +171,18 @@ func main() {
}
profiling.Start()

if err := prom.StartAgent(registry, machineId, systemUuid); err != nil {
if err := prom.StartAgent(registry, internalReg, machineId, systemUuid); err != nil {
klog.Exitln(err)
}

http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorLog: logger{}, Registry: registerer}))
mux := http.NewServeMux()
mux.Handle("/metrics", promhttp.HandlerFor(internalReg, promhttp.HandlerOpts{ErrorLog: logger{}}))
klog.Infoln("listening on:", *flags.ListenAddress)
klog.Infoln("agent metrics listening on:", *flags.AgentMetricsListen)

srv := &http.Server{Addr: *flags.ListenAddress}
agentSrv := &http.Server{Addr: *flags.AgentMetricsListen, Handler: mux}

sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)
Expand All @@ -187,6 +192,11 @@ func main() {
klog.Exitln(err)
}
}()
go func() {
if err := agentSrv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
klog.Exitln(err)
}
}()

sig := <-sigCh
klog.Infof("received %s, shutting down", sig)
Expand All @@ -196,6 +206,9 @@ func main() {
if err := srv.Shutdown(shutdownCtx); err != nil {
klog.Warningf("HTTP server shutdown error: %s", err)
}
if err := agentSrv.Shutdown(shutdownCtx); err != nil {
klog.Warningf("agent metrics server shutdown error: %s", err)
}

done := make(chan struct{})
go func() {
Expand Down
25 changes: 20 additions & 5 deletions prom/remote_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ type Agent struct {

httpClient http.Client

spoolDir string
maxSpoolSize int64
spoolDir string
maxSpoolSize int64
collectedMetrics prometheus.Gauge
}

func StartAgent(reg *prometheus.Registry, machineId, systemUuid string) error {
func StartAgent(reg *prometheus.Registry, internalReg *prometheus.Registry, machineId, systemUuid string) error {
if *flags.MetricsEndpoint == nil {
return nil
}
Expand All @@ -47,6 +48,14 @@ func StartAgent(reg *prometheus.Registry, machineId, systemUuid string) error {
up := prometheus.NewGauge(prometheus.GaugeOpts{Name: "up"})
up.Set(1)
reg.MustRegister(up)
internalReg.MustRegister(up)

collectedMetrics := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "node_agent_collected_metrics",
Help: "Number of time series collected in the last scrape",
})
reg.MustRegister(collectedMetrics)
internalReg.MustRegister(collectedMetrics)

instance := machineId
if s := strings.ReplaceAll(systemUuid, "-", ""); s != "" && s != machineId {
Expand All @@ -69,8 +78,9 @@ func StartAgent(reg *prometheus.Registry, machineId, systemUuid string) error {
TLSClientConfig: common.TlsConfig(),
},
},
spoolDir: path.Join(*flags.WalDir, "spool"),
maxSpoolSize: int64(*flags.MaxSpoolSize),
spoolDir: path.Join(*flags.WalDir, "spool"),
maxSpoolSize: int64(*flags.MaxSpoolSize),
collectedMetrics: collectedMetrics,
}
if _, err := os.Stat(*flags.WalDir); os.IsNotExist(err) {
if err = os.Mkdir(*flags.WalDir, 0750); err != nil {
Expand Down Expand Up @@ -166,6 +176,11 @@ func (a *Agent) scrape() error {
if err != nil {
return err
}
count := 0
for _, mf := range mfs {
count += len(mf.Metric)
}
a.collectedMetrics.Set(float64(count))
mfsByName := make(map[string]*dto.MetricFamily)
for _, mf := range mfs {
mfsByName[mf.GetName()] = mf
Expand Down
Loading