Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions bill-of-materials.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,6 @@
}
]
},
{
"project": "github.com/antithesishq/antithesis-sdk-go",
"licenses": [
{
"type": "MIT License",
"confidence": 1
}
]
},
{
"project": "github.com/beorn7/perks/quantile",
"licenses": [
Expand Down
33 changes: 30 additions & 3 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,14 @@ func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error)

cache.demux = NewDemux(internalCtx, &cache.waitGroup, cfg.HistoryWindowSize, cfg.ResyncInterval)

if cfg.MetricsMux != nil {
path := cfg.MetricsPath
if path == "" {
path = DefaultMetricsPath
}
HandleMetrics(cfg.MetricsMux, path)
}

cache.waitGroup.Add(1)
go func() {
defer cache.waitGroup.Done()
Expand All @@ -89,7 +97,15 @@ func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error)
// Watch registers a cache-backed watcher for a given key or prefix.
// It returns a WatchChan that streams WatchResponses containing events.
func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan {
startTime := time.Now()
result := "success"
defer func() {
watchTotal.WithLabelValues(result).Inc()
watchDurationSeconds.Observe(time.Since(startTime).Seconds())
}()

if err := c.WaitReady(ctx); err != nil {
result = "error"
emptyWatchChan := make(chan clientv3.WatchResponse)
close(emptyWatchChan)
return emptyWatchChan
Expand All @@ -100,6 +116,7 @@ func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption

pred, err := c.validateWatch(key, op)
if err != nil {
result = "error"
ch := make(chan clientv3.WatchResponse, 1)
ch <- clientv3.WatchResponse{Canceled: true, CancelReason: err.Error()}
close(ch)
Expand Down Expand Up @@ -145,15 +162,25 @@ func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption
return responseChan
}

func (c *Cache) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
func (c *Cache) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (_ *clientv3.GetResponse, err error) {
startTime := time.Now()
defer func() {
result := "success"
if err != nil {
result = "error"
}
getTotal.WithLabelValues(result).Inc()
getDurationSeconds.Observe(time.Since(startTime).Seconds())
}()

if c.store.LatestRev() == 0 {
if err := c.WaitReady(ctx); err != nil {
if err = c.WaitReady(ctx); err != nil {
return nil, err
}
}
op := clientv3.OpGet(key, opts...)

if _, err := c.validateGet(key, op); err != nil {
if _, err = c.validateGet(key, op); err != nil {
return nil, err
}

Expand Down
20 changes: 19 additions & 1 deletion cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package cache

import "time"
import (
"net/http"
"time"
)

type Config struct {
// PerWatcherBufferSize caps each watcher’s buffered channel.
Expand All @@ -33,6 +36,10 @@ type Config struct {
GetTimeout time.Duration
// BTreeDegree controls the degree (branching factor) of the in-memory B-tree store.
BTreeDegree int
// MetricsMux, when set, registers a Prometheus metrics handler at MetricsPath.
MetricsMux *http.ServeMux
// MetricsPath is the HTTP path for the Prometheus handler (default: "/cache/metrics").
MetricsPath string
}

// TODO: tune via performance/load tests.
Expand Down Expand Up @@ -77,3 +84,14 @@ func WithGetTimeout(d time.Duration) Option {
func WithBTreeDegree(n int) Option {
return func(c *Config) { c.BTreeDegree = n }
}

// WithMetrics registers a Prometheus metrics handler on the given ServeMux.
// The default path is "/cache/metrics"; use WithMetricsPath to override.
func WithMetrics(mux *http.ServeMux) Option {
return func(c *Config) { c.MetricsMux = mux }
}

// WithMetricsPath overrides the default metrics endpoint path.
func WithMetricsPath(path string) Option {
return func(c *Config) { c.MetricsPath = path }
}
14 changes: 14 additions & 0 deletions cache/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ func newDemux(historyWindowSize int, resyncInterval time.Duration) *demux {
}
}

func (d *demux) reportWatcherMetrics() {
demuxActiveWatchers.Set(float64(len(d.activeWatchers)))
demuxLaggingWatchers.Set(float64(len(d.laggingWatchers)))
}

// resyncLoop periodically tries to catch lagging watchers up by replaying events from History.
func (d *demux) resyncLoop(ctx context.Context) {
timer := time.NewTimer(d.resyncInterval)
Expand All @@ -84,6 +89,7 @@ func (d *demux) WaitForNextResync(ctx context.Context) error {
func (d *demux) Register(w *watcher, startingRev int64) {
d.mu.Lock()
defer d.mu.Unlock()
defer d.reportWatcherMetrics()

if d.maxRev == 0 {
if startingRev == 0 {
Expand Down Expand Up @@ -112,6 +118,7 @@ func (d *demux) Unregister(w *watcher) {
defer d.mu.Unlock()
delete(d.activeWatchers, w)
delete(d.laggingWatchers, w)
d.reportWatcherMetrics()
}()
w.Stop()
}
Expand Down Expand Up @@ -154,6 +161,9 @@ func (d *demux) Broadcast(resp clientv3.WatchResponse) error {
}
d.updateStoreLocked(resp)
d.broadcastLocked(resp)

demuxHistorySize.Set(float64(d.history.size))
d.reportWatcherMetrics()
return nil
}

Expand Down Expand Up @@ -272,6 +282,8 @@ func (d *demux) purge() {
}
d.activeWatchers = make(map[*watcher]int64)
d.laggingWatchers = make(map[*watcher]int64)
d.reportWatcherMetrics()
demuxHistorySize.Set(0)
}

// Compact is called when etcd reports a compaction at compactRev to rebase history;
Expand All @@ -286,6 +298,8 @@ func (d *demux) resyncLaggingWatchers() {
d.mu.Lock()
defer d.mu.Unlock()

defer d.reportWatcherMetrics()

if d.minRev == 0 {
return
}
Expand Down
9 changes: 9 additions & 0 deletions cache/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,22 +6,31 @@ toolchain go1.26.0

require (
github.com/google/go-cmp v0.7.0
github.com/prometheus/client_golang v1.23.2
github.com/stretchr/testify v1.11.1
go.etcd.io/etcd/api/v3 v3.6.0-alpha.0
go.etcd.io/etcd/client/v3 v3.6.0-alpha.0
k8s.io/utils v0.0.0-20260108192941-914a6e750570
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/coreos/go-semver v0.3.1 // indirect
github.com/coreos/go-systemd/v22 v22.7.0 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/client_model v0.6.2 // indirect
github.com/prometheus/common v0.67.5 // indirect
github.com/prometheus/procfs v0.16.1 // indirect
go.etcd.io/etcd/client/pkg/v3 v3.6.0-alpha.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.27.1 // indirect
go.yaml.in/yaml/v2 v2.4.3 // indirect
golang.org/x/net v0.51.0 // indirect
golang.org/x/sys v0.41.0 // indirect
golang.org/x/text v0.34.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions cache/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,14 @@ github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.3 h1:B+8ClL/kCQkRiU82d9xajR
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.3/go.mod h1:NbCUVmiS4foBGBHOYlCT25+YmGpJ32dZPi75pGEUpj4=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
Expand Down
103 changes: 103 additions & 0 deletions cache/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright 2025 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package cache

import (
"net/http"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

const DefaultMetricsPath = "/cache/metrics"

var (
storeKeysTotal = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "cache",
Name: "store_keys_total",
Help: "Total number of keys in the cache store.",
})
storeLatestRevision = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "cache",
Name: "store_latest_revision",
Help: "Latest revision observed by the cache store.",
})

getDurationSeconds = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "cache",
Name: "get_duration_seconds",
Help: "Latency distribution of cache Get operations in seconds.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16),
})
getTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "cache",
Name: "get_total",
Help: "Total number of cache Get operations.",
}, []string{"result"})

watchDurationSeconds = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "etcd",
Subsystem: "cache",
Name: "watch_register_duration_seconds",
Help: "Latency distribution of cache Watch registration in seconds.",
Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16),
})
watchTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "etcd",
Subsystem: "cache",
Name: "watch_total",
Help: "Total number of cache Watch operations.",
}, []string{"result"})

demuxActiveWatchers = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "cache",
Name: "demux_active_watchers",
Help: "Number of active (caught-up) watchers in the demux.",
})
demuxLaggingWatchers = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "cache",
Name: "demux_lagging_watchers",
Help: "Number of lagging watchers waiting for resync.",
})
demuxHistorySize = prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "etcd",
Subsystem: "cache",
Name: "demux_history_size",
Help: "Number of entries currently stored in the demux history ring buffer.",
})
)

func init() {
prometheus.MustRegister(storeKeysTotal)
prometheus.MustRegister(storeLatestRevision)
prometheus.MustRegister(getDurationSeconds)
prometheus.MustRegister(getTotal)
prometheus.MustRegister(watchDurationSeconds)
prometheus.MustRegister(watchTotal)
prometheus.MustRegister(demuxActiveWatchers)
prometheus.MustRegister(demuxLaggingWatchers)
prometheus.MustRegister(demuxHistorySize)
}

// HandleMetrics registers the Prometheus metrics handler on the given mux at the specified path.
func HandleMetrics(mux *http.ServeMux, path string) {
mux.Handle(path, promhttp.Handler())
}
Loading