Skip to content

Commit 403d5e6

Browse files
committed
Switch all metrics to distributions via v1 distribution_points API
Use distributions instead of gauges so we get p50/p95/p99 across all cache keys. Add speed_mbps and size_bytes to every command. Remove high-cardinality branch tag from delta metrics.
1 parent 26ffa7b commit 403d5e6

3 files changed

Lines changed: 48 additions & 64 deletions

File tree

cmd/gradle-cache/main.go

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -301,8 +301,13 @@ func (c *RestoreCmd) Run(ctx context.Context, metrics metricsClient) error {
301301
// total ≈ download time + a small flush of buffered pipeline stages.
302302
slog.Info("restore pipeline complete",
303303
"total_duration", totalElapsed.Round(time.Millisecond))
304-
emitTiming(metrics, "gradle_cache.restore.duration_ms", totalElapsed.Milliseconds(), "cache_key:"+c.CacheKey)
305-
emitGauge(metrics, "gradle_cache.restore.size_bytes", cb.n, "cache_key:"+c.CacheKey)
304+
metrics.distribution("gradle_cache.restore.duration_ms", float64(totalElapsed.Milliseconds()), "cache_key:"+c.CacheKey)
305+
metrics.distribution("gradle_cache.restore.size_bytes", float64(cb.n), "cache_key:"+c.CacheKey)
306+
if !cb.eofAt.IsZero() {
307+
dlElapsed := cb.eofAt.Sub(dlStart)
308+
mbps := float64(cb.n) / dlElapsed.Seconds() / 1e6
309+
metrics.distribution("gradle_cache.restore.speed_mbps", mbps, "cache_key:"+c.CacheKey)
310+
}
306311

307312
// Write a marker recording when the base restore finished.
308313
// save-delta compares file mtimes against this to identify files created
@@ -458,8 +463,16 @@ func (c *RestoreDeltaCmd) Run(ctx context.Context, metrics metricsClient) error
458463
deltaElapsed := time.Since(dlStart)
459464
slog.Info("applied delta bundle", "branch", c.Branch, "cache-key", c.CacheKey,
460465
"total_duration", deltaElapsed.Round(time.Millisecond))
461-
emitTiming(metrics, "gradle_cache.restore_delta.duration_ms", deltaElapsed.Milliseconds(),
462-
"cache_key:"+c.CacheKey, "branch:"+c.Branch)
466+
metrics.distribution("gradle_cache.restore_delta.duration_ms", float64(deltaElapsed.Milliseconds()),
467+
"cache_key:"+c.CacheKey)
468+
metrics.distribution("gradle_cache.restore_delta.size_bytes", float64(cb.n),
469+
"cache_key:"+c.CacheKey)
470+
if !cb.eofAt.IsZero() {
471+
dlElapsed := cb.eofAt.Sub(dlStart)
472+
mbps := float64(cb.n) / dlElapsed.Seconds() / 1e6
473+
metrics.distribution("gradle_cache.restore_delta.speed_mbps", mbps,
474+
"cache_key:"+c.CacheKey)
475+
}
463476
return nil
464477
}
465478

@@ -580,8 +593,9 @@ func (c *SaveCmd) Run(ctx context.Context, metrics metricsClient) error {
580593
"size_mb", fmt.Sprintf("%.1f", float64(size)/1e6),
581594
"speed_mbps", fmt.Sprintf("%.1f", mbps))
582595
slog.Info("saved bundle", "commit", c.Commit[:min(8, len(c.Commit))], "cache-key", c.CacheKey)
583-
emitTiming(metrics, "gradle_cache.save.duration_ms", elapsed.Milliseconds(), "cache_key:"+c.CacheKey)
584-
emitGauge(metrics, "gradle_cache.save.size_bytes", size, "cache_key:"+c.CacheKey)
596+
metrics.distribution("gradle_cache.save.duration_ms", float64(elapsed.Milliseconds()), "cache_key:"+c.CacheKey)
597+
metrics.distribution("gradle_cache.save.size_bytes", float64(size), "cache_key:"+c.CacheKey)
598+
metrics.distribution("gradle_cache.save.speed_mbps", mbps, "cache_key:"+c.CacheKey)
585599
return nil
586600
}
587601

@@ -692,10 +706,12 @@ func (c *SaveDeltaCmd) Run(ctx context.Context, metrics metricsClient) error {
692706
"duration", elapsed.Round(time.Millisecond),
693707
"size_mb", fmt.Sprintf("%.1f", float64(size)/1e6),
694708
"speed_mbps", fmt.Sprintf("%.1f", mbps))
695-
emitTiming(metrics, "gradle_cache.save_delta.duration_ms", elapsed.Milliseconds(),
696-
"cache_key:"+c.CacheKey, "branch:"+c.Branch)
697-
emitGauge(metrics, "gradle_cache.save_delta.size_bytes", size,
698-
"cache_key:"+c.CacheKey, "branch:"+c.Branch)
709+
metrics.distribution("gradle_cache.save_delta.duration_ms", float64(elapsed.Milliseconds()),
710+
"cache_key:"+c.CacheKey)
711+
metrics.distribution("gradle_cache.save_delta.size_bytes", float64(size),
712+
"cache_key:"+c.CacheKey)
713+
metrics.distribution("gradle_cache.save_delta.speed_mbps", mbps,
714+
"cache_key:"+c.CacheKey)
699715
return nil
700716
}
701717

cmd/gradle-cache/metrics.go

Lines changed: 16 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,11 @@ import (
1313
"time"
1414
)
1515

16-
// metricsClient emits timing and gauge metrics to a backend.
16+
// metricsClient emits distribution metrics to a backend.
17+
// All metrics are submitted as distributions to support percentile aggregation.
1718
type metricsClient interface {
18-
// timing records a duration metric in milliseconds.
19-
timing(name string, ms int64, tags ...string)
20-
// gauge records a point-in-time value.
21-
gauge(name string, value int64, tags ...string)
19+
// distribution records a single sample for a distribution metric.
20+
distribution(name string, value float64, tags ...string)
2221
// close flushes any pending data.
2322
close()
2423
}
@@ -27,9 +26,8 @@ type metricsClient interface {
2726
// It exists because kong cannot bind a nil interface value.
2827
type noopMetrics struct{}
2928

30-
func (noopMetrics) timing(string, int64, ...string) {}
31-
func (noopMetrics) gauge(string, int64, ...string) {}
32-
func (noopMetrics) close() {}
29+
func (noopMetrics) distribution(string, float64, ...string) {}
30+
func (noopMetrics) close() {}
3331

3432
// metricsFlags are CLI flags for configuring metrics emission.
3533
type metricsFlags struct {
@@ -94,12 +92,8 @@ func newStatsdClient(addr string, baseTags []string) *statsdClient {
9492
return &statsdClient{conn: conn, tags: baseTags}
9593
}
9694

97-
func (s *statsdClient) timing(name string, ms int64, tags ...string) {
98-
s.send(fmt.Sprintf("%s:%d|d", name, ms), tags) // |d = distribution for DD percentile support
99-
}
100-
101-
func (s *statsdClient) gauge(name string, value int64, tags ...string) {
102-
s.send(fmt.Sprintf("%s:%d|g", name, value), tags)
95+
func (s *statsdClient) distribution(name string, value float64, tags ...string) {
96+
s.send(fmt.Sprintf("%s:%g|d", name, value), tags)
10397
}
10498

10599
func (s *statsdClient) send(stat string, extraTags []string) {
@@ -114,9 +108,9 @@ func (s *statsdClient) close() {
114108
s.conn.Close() //nolint:errcheck,gosec
115109
}
116110

117-
// ── DataDog HTTP API ────────────────────────────────────────────────────────
111+
// ── DataDog HTTP API (v1 distribution_points) ───────────────────────────────
118112

119-
const datadogSeriesURL = "https://api.datadoghq.com/api/v2/series"
113+
const datadogDistURL = "https://api.datadoghq.com/api/v1/distribution_points"
120114

121115
type datadogAPIClient struct {
122116
apiKey string
@@ -132,34 +126,17 @@ func newDatadogAPIClient(apiKey string, baseTags []string) *datadogAPIClient {
132126
}
133127
}
134128

135-
// Datadog v2 metric type enum values.
136-
const (
137-
ddMetricTypeGauge = 3
138-
ddMetricTypeCount = 1
139-
)
140-
141-
func (d *datadogAPIClient) timing(name string, ms int64, tags ...string) {
142-
d.submit(name, float64(ms), ddMetricTypeGauge, tags)
143-
}
144-
145-
func (d *datadogAPIClient) gauge(name string, value int64, tags ...string) {
146-
d.submit(name, float64(value), ddMetricTypeGauge, tags)
147-
}
148-
149-
func (d *datadogAPIClient) submit(name string, value float64, metricType int, extraTags []string) {
150-
allTags := append(d.tags, extraTags...)
129+
func (d *datadogAPIClient) distribution(name string, value float64, tags ...string) {
130+
allTags := append(d.tags, tags...)
151131
now := time.Now().Unix()
152132

133+
// v1 distribution_points format: points is [[timestamp, [value, ...]]]
153134
payload := map[string]interface{}{
154135
"series": []map[string]interface{}{
155136
{
156137
"metric": name,
157-
"type": metricType,
158-
"points": []map[string]interface{}{
159-
{
160-
"timestamp": now,
161-
"value": value,
162-
},
138+
"points": []interface{}{
139+
[]interface{}{now, []float64{value}},
163140
},
164141
"tags": allTags,
165142
},
@@ -172,7 +149,7 @@ func (d *datadogAPIClient) submit(name string, value float64, metricType int, ex
172149
return
173150
}
174151

175-
req, err := http.NewRequest("POST", datadogSeriesURL, bytes.NewReader(body))
152+
req, err := http.NewRequest("POST", datadogDistURL, bytes.NewReader(body))
176153
if err != nil {
177154
slog.Warn("metrics: failed to create request", "error", err)
178155
return
@@ -193,11 +170,3 @@ func (d *datadogAPIClient) submit(name string, value float64, metricType int, ex
193170
}
194171

195172
func (d *datadogAPIClient) close() {}
196-
197-
func emitTiming(m metricsClient, name string, ms int64, tags ...string) {
198-
m.timing(name, ms, tags...)
199-
}
200-
201-
func emitGauge(m metricsClient, name string, value int64, tags ...string) {
202-
m.gauge(name, value, tags...)
203-
}

cmd/gradle-cache/metrics_test.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"time"
77
)
88

9-
func TestStatsdClientTiming(t *testing.T) {
9+
func TestStatsdClientDistribution(t *testing.T) {
1010
// Start a UDP listener to capture the statsd packet.
1111
pc, err := net.ListenPacket("udp", "127.0.0.1:0")
1212
if err != nil {
@@ -21,7 +21,7 @@ func TestStatsdClientTiming(t *testing.T) {
2121
}
2222
defer client.close()
2323

24-
client.timing("gradle_cache.restore.duration_ms", 1234, "cache_key:foo")
24+
client.distribution("gradle_cache.restore.duration_ms", 1234, "cache_key:foo")
2525

2626
buf := make([]byte, 1024)
2727
_ = pc.SetReadDeadline(time.Now().Add(2 * time.Second))
@@ -37,7 +37,7 @@ func TestStatsdClientTiming(t *testing.T) {
3737
}
3838
}
3939

40-
func TestStatsdClientGauge(t *testing.T) {
40+
func TestStatsdClientDistributionFloat(t *testing.T) {
4141
pc, err := net.ListenPacket("udp", "127.0.0.1:0")
4242
if err != nil {
4343
t.Fatal(err)
@@ -51,7 +51,7 @@ func TestStatsdClientGauge(t *testing.T) {
5151
}
5252
defer client.close()
5353

54-
client.gauge("gradle_cache.save.size_bytes", 5678)
54+
client.distribution("gradle_cache.restore.speed_mbps", 155.6)
5555

5656
buf := make([]byte, 1024)
5757
_ = pc.SetReadDeadline(time.Now().Add(2 * time.Second))
@@ -61,7 +61,7 @@ func TestStatsdClientGauge(t *testing.T) {
6161
}
6262

6363
got := string(buf[:n])
64-
want := "gradle_cache.save.size_bytes:5678|g"
64+
want := "gradle_cache.restore.speed_mbps:155.6|d"
6565
if got != want {
6666
t.Errorf("got %q, want %q", got, want)
6767
}
@@ -70,8 +70,7 @@ func TestStatsdClientGauge(t *testing.T) {
7070
func TestNoopMetrics(t *testing.T) {
7171
// noopMetrics should not panic.
7272
var m metricsClient = noopMetrics{}
73-
emitTiming(m, "test.metric", 100)
74-
emitGauge(m, "test.metric", 200)
73+
m.distribution("test.metric", 100)
7574
m.close()
7675
}
7776

0 commit comments

Comments
 (0)