diff --git a/bill-of-materials.json b/bill-of-materials.json index 49e4831088d4..9a4265c164cd 100644 --- a/bill-of-materials.json +++ b/bill-of-materials.json @@ -17,15 +17,6 @@ } ] }, - { - "project": "github.com/antithesishq/antithesis-sdk-go", - "licenses": [ - { - "type": "MIT License", - "confidence": 1 - } - ] - }, { "project": "github.com/beorn7/perks/quantile", "licenses": [ diff --git a/cache/cache.go b/cache/cache.go index 65d337087d07..add9795d5635 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -77,6 +77,14 @@ func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error) cache.demux = NewDemux(internalCtx, &cache.waitGroup, cfg.HistoryWindowSize, cfg.ResyncInterval) + if cfg.MetricsMux != nil { + path := cfg.MetricsPath + if path == "" { + path = DefaultMetricsPath + } + HandleMetrics(cfg.MetricsMux, path) + } + cache.waitGroup.Add(1) go func() { defer cache.waitGroup.Done() @@ -89,7 +97,15 @@ func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error) // Watch registers a cache-backed watcher for a given key or prefix. // It returns a WatchChan that streams WatchResponses containing events. func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { + startTime := time.Now() + result := "success" + defer func() { + watchTotal.WithLabelValues(result).Inc() + watchDurationSeconds.Observe(time.Since(startTime).Seconds()) + }() + if err := c.WaitReady(ctx); err != nil { + result = "error" emptyWatchChan := make(chan clientv3.WatchResponse) close(emptyWatchChan) return emptyWatchChan @@ -100,6 +116,7 @@ func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption pred, err := c.validateWatch(key, op) if err != nil { + result = "error" ch := make(chan clientv3.WatchResponse, 1) ch <- clientv3.WatchResponse{Canceled: true, CancelReason: err.Error()} close(ch) @@ -145,15 +162,25 @@ func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption return responseChan } -func (c *Cache) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { +func (c *Cache) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (_ *clientv3.GetResponse, err error) { + startTime := time.Now() + defer func() { + result := "success" + if err != nil { + result = "error" + } + getTotal.WithLabelValues(result).Inc() + getDurationSeconds.Observe(time.Since(startTime).Seconds()) + }() + if c.store.LatestRev() == 0 { - if err := c.WaitReady(ctx); err != nil { + if err = c.WaitReady(ctx); err != nil { return nil, err } } op := clientv3.OpGet(key, opts...) - if _, err := c.validateGet(key, op); err != nil { + if _, err = c.validateGet(key, op); err != nil { return nil, err } diff --git a/cache/config.go b/cache/config.go index cf18ffcc7468..4dda356c2fac 100644 --- a/cache/config.go +++ b/cache/config.go @@ -14,7 +14,10 @@ package cache -import "time" +import ( + "net/http" + "time" +) type Config struct { // PerWatcherBufferSize caps each watcher’s buffered channel. @@ -33,6 +36,10 @@ type Config struct { GetTimeout time.Duration // BTreeDegree controls the degree (branching factor) of the in-memory B-tree store. BTreeDegree int + // MetricsMux, when set, registers a Prometheus metrics handler at MetricsPath. + MetricsMux *http.ServeMux + // MetricsPath is the HTTP path for the Prometheus handler (default: "/cache/metrics"). + MetricsPath string } // TODO: tune via performance/load tests. @@ -77,3 +84,14 @@ func WithGetTimeout(d time.Duration) Option { func WithBTreeDegree(n int) Option { return func(c *Config) { c.BTreeDegree = n } } + +// WithMetrics registers a Prometheus metrics handler on the given ServeMux. +// The default path is "/cache/metrics"; use WithMetricsPath to override. +func WithMetrics(mux *http.ServeMux) Option { + return func(c *Config) { c.MetricsMux = mux } +} + +// WithMetricsPath overrides the default metrics endpoint path. +func WithMetricsPath(path string) Option { + return func(c *Config) { c.MetricsPath = path } +} diff --git a/cache/demux.go b/cache/demux.go index 06258d8d36cd..da761836c959 100644 --- a/cache/demux.go +++ b/cache/demux.go @@ -59,6 +59,11 @@ func newDemux(historyWindowSize int, resyncInterval time.Duration) *demux { } } +func (d *demux) reportWatcherMetrics() { + demuxActiveWatchers.Set(float64(len(d.activeWatchers))) + demuxLaggingWatchers.Set(float64(len(d.laggingWatchers))) +} + // resyncLoop periodically tries to catch lagging watchers up by replaying events from History. func (d *demux) resyncLoop(ctx context.Context) { timer := time.NewTimer(d.resyncInterval) @@ -84,6 +89,7 @@ func (d *demux) WaitForNextResync(ctx context.Context) error { func (d *demux) Register(w *watcher, startingRev int64) { d.mu.Lock() defer d.mu.Unlock() + defer d.reportWatcherMetrics() if d.maxRev == 0 { if startingRev == 0 { @@ -112,6 +118,7 @@ func (d *demux) Unregister(w *watcher) { defer d.mu.Unlock() delete(d.activeWatchers, w) delete(d.laggingWatchers, w) + d.reportWatcherMetrics() }() w.Stop() } @@ -154,6 +161,9 @@ func (d *demux) Broadcast(resp clientv3.WatchResponse) error { } d.updateStoreLocked(resp) d.broadcastLocked(resp) + + demuxHistorySize.Set(float64(d.history.size)) + d.reportWatcherMetrics() return nil } @@ -272,6 +282,8 @@ func (d *demux) purge() { } d.activeWatchers = make(map[*watcher]int64) d.laggingWatchers = make(map[*watcher]int64) + d.reportWatcherMetrics() + demuxHistorySize.Set(0) } // Compact is called when etcd reports a compaction at compactRev to rebase history; @@ -286,6 +298,8 @@ func (d *demux) resyncLaggingWatchers() { d.mu.Lock() defer d.mu.Unlock() + defer d.reportWatcherMetrics() + if d.minRev == 0 { return } diff --git a/cache/go.mod b/cache/go.mod index f918fa379624..03a5d9611a86 100644 --- a/cache/go.mod +++ b/cache/go.mod @@ -6,6 +6,7 @@ toolchain go1.26.0 require ( github.com/google/go-cmp v0.7.0 + github.com/prometheus/client_golang v1.23.2 github.com/stretchr/testify v1.11.1 go.etcd.io/etcd/api/v3 v3.6.0-alpha.0 go.etcd.io/etcd/client/v3 v3.6.0-alpha.0 @@ -13,15 +14,23 @@ require ( ) require ( + github.com/beorn7/perks v1.0.1 // indirect + github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/coreos/go-semver v0.3.1 // indirect github.com/coreos/go-systemd/v22 v22.7.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 // indirect + github.com/kylelemons/godebug v1.1.0 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.67.5 // indirect + github.com/prometheus/procfs v0.16.1 // indirect go.etcd.io/etcd/client/pkg/v3 v3.6.0-alpha.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.1 // indirect + go.yaml.in/yaml/v2 v2.4.3 // indirect golang.org/x/net v0.51.0 // indirect golang.org/x/sys v0.41.0 // indirect golang.org/x/text v0.34.0 // indirect diff --git a/cache/go.sum b/cache/go.sum index 6ecdb96dfc91..44aca8e71f51 100644 --- a/cache/go.sum +++ b/cache/go.sum @@ -24,10 +24,14 @@ github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.3 h1:B+8ClL/kCQkRiU82d9xajR github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.3.3/go.mod h1:NbCUVmiS4foBGBHOYlCT25+YmGpJ32dZPi75pGEUpj4= github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0 h1:HWRh5R2+9EifMyIHV7ZV+MIZqgz+PMpZ14Jynv3O2Zs= github.com/grpc-ecosystem/grpc-gateway/v2 v2.28.0/go.mod h1:JfhWUomR1baixubs02l85lZYYOm7LV6om4ceouMv45c= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= +github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= diff --git a/cache/metrics.go b/cache/metrics.go new file mode 100644 index 000000000000..92e93974c747 --- /dev/null +++ b/cache/metrics.go @@ -0,0 +1,103 @@ +// Copyright 2025 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +const DefaultMetricsPath = "/cache/metrics" + +var ( + storeKeysTotal = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "cache", + Name: "store_keys_total", + Help: "Total number of keys in the cache store.", + }) + storeLatestRevision = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "cache", + Name: "store_latest_revision", + Help: "Latest revision observed by the cache store.", + }) + + getDurationSeconds = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "cache", + Name: "get_duration_seconds", + Help: "Latency distribution of cache Get operations in seconds.", + Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16), + }) + getTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "cache", + Name: "get_total", + Help: "Total number of cache Get operations.", + }, []string{"result"}) + + watchDurationSeconds = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "etcd", + Subsystem: "cache", + Name: "watch_register_duration_seconds", + Help: "Latency distribution of cache Watch registration in seconds.", + Buckets: prometheus.ExponentialBuckets(0.0001, 2, 16), + }) + watchTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "etcd", + Subsystem: "cache", + Name: "watch_total", + Help: "Total number of cache Watch operations.", + }, []string{"result"}) + + demuxActiveWatchers = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "cache", + Name: "demux_active_watchers", + Help: "Number of active (caught-up) watchers in the demux.", + }) + demuxLaggingWatchers = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "cache", + Name: "demux_lagging_watchers", + Help: "Number of lagging watchers waiting for resync.", + }) + demuxHistorySize = prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "etcd", + Subsystem: "cache", + Name: "demux_history_size", + Help: "Number of entries currently stored in the demux history ring buffer.", + }) +) + +func init() { + prometheus.MustRegister(storeKeysTotal) + prometheus.MustRegister(storeLatestRevision) + prometheus.MustRegister(getDurationSeconds) + prometheus.MustRegister(getTotal) + prometheus.MustRegister(watchDurationSeconds) + prometheus.MustRegister(watchTotal) + prometheus.MustRegister(demuxActiveWatchers) + prometheus.MustRegister(demuxLaggingWatchers) + prometheus.MustRegister(demuxHistorySize) +} + +// HandleMetrics registers the Prometheus metrics handler on the given mux at the specified path. +func HandleMetrics(mux *http.ServeMux, path string) { + mux.Handle(path, promhttp.Handler()) +} diff --git a/cache/metrics_test.go b/cache/metrics_test.go new file mode 100644 index 000000000000..d6a8372be805 --- /dev/null +++ b/cache/metrics_test.go @@ -0,0 +1,225 @@ +// Copyright 2025 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/stretchr/testify/require" + + "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/mvccpb" + clientv3 "go.etcd.io/etcd/client/v3" +) + +func TestStoreMetricsRestore(t *testing.T) { + s := newStore(4, 8) + s.Restore([]*mvccpb.KeyValue{ + makeKV("/a", "1", 5), + makeKV("/b", "2", 5), + makeKV("/c", "3", 5), + }, 5) + + require.InEpsilon(t, 3, testutil.ToFloat64(storeKeysTotal), 0.01) + require.InEpsilon(t, 5, testutil.ToFloat64(storeLatestRevision), 0.01) +} + +func TestStoreMetricsApplyPutAndDelete(t *testing.T) { + s := newStore(4, 8) + s.Restore([]*mvccpb.KeyValue{ + makeKV("/a", "1", 5), + }, 5) + + err := s.Apply(clientv3.WatchResponse{ + Events: []*clientv3.Event{ + makePutEvent("/b", "2", 6), + }, + }) + require.NoError(t, err) + require.InEpsilon(t, 2, testutil.ToFloat64(storeKeysTotal), 0.01) + require.InEpsilon(t, 6, testutil.ToFloat64(storeLatestRevision), 0.01) + + err = s.Apply(clientv3.WatchResponse{ + Events: []*clientv3.Event{ + makeDelEvent("/a", 7), + }, + }) + require.NoError(t, err) + require.InEpsilon(t, 1, testutil.ToFloat64(storeKeysTotal), 0.01) + require.InEpsilon(t, 7, testutil.ToFloat64(storeLatestRevision), 0.01) +} + +func TestStoreMetricsProgressNotify(t *testing.T) { + s := newStore(4, 8) + s.Restore([]*mvccpb.KeyValue{makeKV("/a", "1", 5)}, 5) + + err := s.Apply(progressNotify(10)) + require.NoError(t, err) + require.InEpsilon(t, 10, testutil.ToFloat64(storeLatestRevision), 0.01) +} + +func TestDemuxMetricsRegisterUnregister(t *testing.T) { + d := newDemux(8, 50*time.Millisecond) + d.minRev = 1 + + w1 := newWatcher(4, nil) + w2 := newWatcher(4, nil) + + d.Register(w1, 0) + require.InEpsilon(t, 1, testutil.ToFloat64(demuxActiveWatchers), 0.01) + require.InDelta(t, 0, testutil.ToFloat64(demuxLaggingWatchers), 0) + + d.Register(w2, 0) + require.InEpsilon(t, 2, testutil.ToFloat64(demuxActiveWatchers), 0.01) + + d.Unregister(w1) + require.InEpsilon(t, 1, testutil.ToFloat64(demuxActiveWatchers), 0.01) + + d.Unregister(w2) + require.InDelta(t, 0, testutil.ToFloat64(demuxActiveWatchers), 0) +} + +func TestDemuxMetricsLaggingWatcher(t *testing.T) { + d := newDemux(8, 1*time.Second) + d.minRev = 1 + d.maxRev = 10 + + w := newWatcher(4, nil) + d.Register(w, 5) + + require.InDelta(t, 0, testutil.ToFloat64(demuxActiveWatchers), 0) + require.InEpsilon(t, 1, testutil.ToFloat64(demuxLaggingWatchers), 0.01) + + d.Unregister(w) + require.InDelta(t, 0, testutil.ToFloat64(demuxLaggingWatchers), 0) +} + +func TestDemuxMetricsBroadcastUpdatesHistorySize(t *testing.T) { + d := newDemux(8, 50*time.Millisecond) + d.minRev = 1 + + w := newWatcher(16, nil) + d.Register(w, 0) + + err := d.Broadcast(clientv3.WatchResponse{ + Events: []*clientv3.Event{ + makePutEvent("/a", "1", 5), + makePutEvent("/b", "2", 5), + }, + }) + require.NoError(t, err) + require.InEpsilon(t, 1, testutil.ToFloat64(demuxHistorySize), 0.01) + + err = d.Broadcast(clientv3.WatchResponse{ + Events: []*clientv3.Event{ + makePutEvent("/c", "3", 6), + }, + }) + require.NoError(t, err) + require.InEpsilon(t, 2, testutil.ToFloat64(demuxHistorySize), 0.01) + + d.Unregister(w) +} + +func TestDemuxMetricsBroadcastOverflowPromotesToLagging(t *testing.T) { + d := newDemux(8, 50*time.Millisecond) + d.minRev = 1 + + w := newWatcher(1, nil) + d.Register(w, 0) + + err := d.Broadcast(clientv3.WatchResponse{ + Events: []*clientv3.Event{makePutEvent("/a", "1", 5)}, + }) + require.NoError(t, err) + require.InEpsilon(t, 1, testutil.ToFloat64(demuxActiveWatchers), 0.01) + + err = d.Broadcast(clientv3.WatchResponse{ + Events: []*clientv3.Event{makePutEvent("/b", "2", 6)}, + }) + require.NoError(t, err) + require.InDelta(t, 0, testutil.ToFloat64(demuxActiveWatchers), 0) + require.InEpsilon(t, 1, testutil.ToFloat64(demuxLaggingWatchers), 0.01) + + d.Unregister(w) +} + +func TestDemuxMetricsPurge(t *testing.T) { + d := newDemux(8, 50*time.Millisecond) + d.minRev = 1 + + w := newWatcher(16, nil) + d.Register(w, 0) + + _ = d.Broadcast(clientv3.WatchResponse{ + Events: []*clientv3.Event{makePutEvent("/a", "1", 5)}, + }) + + require.InEpsilon(t, 1, testutil.ToFloat64(demuxActiveWatchers), 0.01) + require.InEpsilon(t, 1, testutil.ToFloat64(demuxHistorySize), 0.01) + + d.Purge() + + require.InDelta(t, 0, testutil.ToFloat64(demuxActiveWatchers), 0) + require.InDelta(t, 0, testutil.ToFloat64(demuxLaggingWatchers), 0) + require.InDelta(t, 0, testutil.ToFloat64(demuxHistorySize), 0) +} + +func TestHandleMetricsEndpoint(t *testing.T) { + mux := http.NewServeMux() + HandleMetrics(mux, DefaultMetricsPath) + + req := httptest.NewRequest(http.MethodGet, DefaultMetricsPath, nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + body := rec.Body.String() + for _, want := range []string{ + "etcd_cache_store_keys_total", + "etcd_cache_store_latest_revision", + "etcd_cache_get_duration_seconds", + "etcd_cache_get_total", + "etcd_cache_watch_register_duration_seconds", + "etcd_cache_watch_total", + "etcd_cache_demux_active_watchers", + "etcd_cache_demux_lagging_watchers", + "etcd_cache_demux_history_size", + } { + require.Containsf(t, body, want, "expected %q in metrics output", want) + } +} + +func TestHandleMetricsCustomPath(t *testing.T) { + mux := http.NewServeMux() + HandleMetrics(mux, "/custom/metrics") + + req := httptest.NewRequest(http.MethodGet, "/custom/metrics", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + require.Equal(t, http.StatusOK, rec.Code) + require.Contains(t, rec.Body.String(), "etcd_cache_store_keys_total") +} + +func progressNotify(rev int64) clientv3.WatchResponse { + return clientv3.WatchResponse{ + Header: etcdserverpb.ResponseHeader{Revision: rev}, + } +} diff --git a/cache/store.go b/cache/store.go index 96151de91391..aea22bc197af 100644 --- a/cache/store.go +++ b/cache/store.go @@ -31,10 +31,11 @@ var ErrNotReady = fmt.Errorf("cache: store not ready") // The store keeps a bounded history of snapshots using ringBuffer so that // reads at historical revisions can be served until they fall out of the window. type store struct { - mu sync.RWMutex - degree int - latest snapshot // latest is the mutable working snapshot - history ringBuffer[*snapshot] // history stores immutable cloned snapshots + mu sync.RWMutex + degree int + keyCount int64 // number of keys in the latest tree + latest snapshot // latest is the mutable working snapshot + history ringBuffer[*snapshot] // history stores immutable cloned snapshots } func newStore(degree int, historyCapacity int) *store { @@ -112,7 +113,11 @@ func (s *store) Restore(kvs []*mvccpb.KeyValue, rev int64) { } s.history.RebaseHistory() s.latest.rev = rev + s.keyCount = int64(len(kvs)) s.history.Append(newClonedSnapshot(rev, s.latest.tree)) + + storeKeysTotal.Set(float64(s.keyCount)) + storeLatestRevision.Set(float64(rev)) } func (s *store) Apply(resp clientv3.WatchResponse) error { @@ -142,6 +147,7 @@ func (s *store) applyProgressNotifyLocked(revision int64) { return } s.latest.rev = revision + storeLatestRevision.Set(float64(revision)) } func (s *store) applyEventsLocked(events []*clientv3.Event) error { @@ -155,7 +161,11 @@ func (s *store) applyEventsLocked(events []*clientv3.Event) error { if _, ok := s.latest.tree.Delete(&kvItem{key: string(ev.Kv.Key)}); !ok { return fmt.Errorf("cache: delete non-existent key %s", string(ev.Kv.Key)) } + s.keyCount-- case clientv3.EventTypePut: + if _, ok := s.latest.tree.Get(newKVItem(ev.Kv)); !ok { + s.keyCount++ + } s.latest.tree.ReplaceOrInsert(newKVItem(ev.Kv)) } i++ @@ -163,6 +173,9 @@ func (s *store) applyEventsLocked(events []*clientv3.Event) error { s.latest.rev = rev s.history.Append(newClonedSnapshot(rev, s.latest.tree)) } + + storeKeysTotal.Set(float64(s.keyCount)) + storeLatestRevision.Set(float64(s.latest.rev)) return nil } diff --git a/tests/integration/cache_metrics_test.go b/tests/integration/cache_metrics_test.go new file mode 100644 index 000000000000..9db7a58051c7 --- /dev/null +++ b/tests/integration/cache_metrics_test.go @@ -0,0 +1,271 @@ +// Copyright 2025 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package integration + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "testing" + "time" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + + cache "go.etcd.io/etcd/cache/v3" + clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/tests/v3/framework/integration" +) + +func TestCacheMetricsStoreAfterPutsAndGets(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + t.Cleanup(func() { clus.Terminate(t) }) + client := clus.Client(0) + ctx := t.Context() + + mux := http.NewServeMux() + c, err := cache.New(client, "/m/", + cache.WithHistoryWindowSize(32), + cache.WithMetrics(mux), + cache.WithMetricsPath("/test/metrics"), + ) + require.NoError(t, err) + t.Cleanup(c.Close) + require.NoError(t, c.WaitReady(ctx)) + + var lastRev int64 + for i := 0; i < 5; i++ { + resp, putErr := client.Put(ctx, fmt.Sprintf("/m/key%c", 'a'+i), "val") + require.NoError(t, putErr) + lastRev = resp.Header.Revision + } + require.NoError(t, c.WaitForRevision(ctx, lastRev)) + + _, err = c.Get(ctx, "/m/", clientv3.WithPrefix(), clientv3.WithSerializable()) + require.NoError(t, err) + _, err = c.Get(ctx, "/m/keya", clientv3.WithSerializable()) + require.NoError(t, err) + + families := scrapeHTTPMetrics(t, mux, "/test/metrics") + + for _, name := range []string{ + "etcd_cache_store_keys_total", + "etcd_cache_store_latest_revision", + "etcd_cache_get_duration_seconds", + "etcd_cache_get_total", + "etcd_cache_demux_active_watchers", + "etcd_cache_demux_history_size", + } { + require.Containsf(t, families, name, "expected metric family %q", name) + } + + keysTotal := gaugeValue(t, families, "etcd_cache_store_keys_total") + require.InEpsilonf(t, 5.0, keysTotal, 0.01, "expected store_keys_total to be 5") + + latestRev := gaugeValue(t, families, "etcd_cache_store_latest_revision") + require.InEpsilonf(t, float64(lastRev), latestRev, 0.01, "expected store_latest_revision to be %d", lastRev) + + getSuccessCount := counterValue(t, families, "etcd_cache_get_total", "result", "success") + require.InEpsilonf(t, 2.0, getSuccessCount, 0.01, "expected get_total{success} to be 2") +} + +func TestCacheMetricsWatchRegistration(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + t.Cleanup(func() { clus.Terminate(t) }) + client := clus.Client(0) + ctx := t.Context() + + mux := http.NewServeMux() + c, err := cache.New(client, "/w/", + cache.WithHistoryWindowSize(32), + cache.WithMetrics(mux), + ) + require.NoError(t, err) + t.Cleanup(c.Close) + require.NoError(t, c.WaitReady(ctx)) + + watchCtx1, cancel1 := context.WithCancel(ctx) + defer cancel1() + ch1 := c.Watch(watchCtx1, "/w/", clientv3.WithPrefix()) + require.NotNil(t, ch1) + + watchCtx2, cancel2 := context.WithCancel(ctx) + defer cancel2() + ch2 := c.Watch(watchCtx2, "/w/key", clientv3.WithPrefix()) + require.NotNil(t, ch2) + + time.Sleep(50 * time.Millisecond) + + families := scrapeHTTPMetrics(t, mux, cache.DefaultMetricsPath) + + watchSuccess := counterValue(t, families, "etcd_cache_watch_total", "result", "success") + require.InEpsilonf(t, 2.0, watchSuccess, 0.01, "expected watch_total{success} to be 2") + require.Containsf(t, families, "etcd_cache_watch_register_duration_seconds", + "expected watch_register_duration_seconds metric") + + cancel1() + cancel2() +} + +func TestCacheMetricsDeleteUpdatesKeyCount(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + t.Cleanup(func() { clus.Terminate(t) }) + client := clus.Client(0) + ctx := t.Context() + + mux := http.NewServeMux() + c, err := cache.New(client, "/d/", + cache.WithHistoryWindowSize(32), + cache.WithMetrics(mux), + ) + require.NoError(t, err) + t.Cleanup(c.Close) + require.NoError(t, c.WaitReady(ctx)) + + var rev int64 + for _, key := range []string{"/d/x", "/d/y", "/d/z"} { + resp, putErr := client.Put(ctx, key, "v") + require.NoError(t, putErr) + rev = resp.Header.Revision + } + require.NoError(t, c.WaitForRevision(ctx, rev)) + + families := scrapeHTTPMetrics(t, mux, cache.DefaultMetricsPath) + keysAfterPuts := gaugeValue(t, families, "etcd_cache_store_keys_total") + require.InEpsilon(t, 3.0, keysAfterPuts, 0.01) + + // Update operation should not increase key count + resp, putErr := client.Put(ctx, "/d/x", "newVal") + require.NoError(t, putErr) + require.NoError(t, c.WaitForRevision(ctx, resp.Header.Revision)) + + families = scrapeHTTPMetrics(t, mux, cache.DefaultMetricsPath) + keysAfterPuts = gaugeValue(t, families, "etcd_cache_store_keys_total") + require.InEpsilon(t, 3.0, keysAfterPuts, 0.01) + + delResp, err := client.Delete(ctx, "/d/z") + require.NoError(t, err) + require.NoError(t, c.WaitForRevision(ctx, delResp.Header.Revision)) + + families = scrapeHTTPMetrics(t, mux, cache.DefaultMetricsPath) + keysAfterDelete := gaugeValue(t, families, "etcd_cache_store_keys_total") + require.InEpsilon(t, 2.0, keysAfterDelete, 0.01) +} + +func TestCacheMetricsGetErrors(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + t.Cleanup(func() { clus.Terminate(t) }) + client := clus.Client(0) + ctx := t.Context() + + mux := http.NewServeMux() + c, err := cache.New(client, "/g/", + cache.WithHistoryWindowSize(32), + cache.WithMetrics(mux), + ) + require.NoError(t, err) + t.Cleanup(c.Close) + require.NoError(t, c.WaitReady(ctx)) + + _, err = c.Get(ctx, "/outside/prefix", clientv3.WithSerializable()) + require.Error(t, err) + + families := scrapeHTTPMetrics(t, mux, cache.DefaultMetricsPath) + errorCount := counterValue(t, families, "etcd_cache_get_total", "result", "error") + require.InEpsilonf(t, 1.0, errorCount, 0.01, "expected get_total{error} >= 1 after out-of-scope Get") +} + +func TestCacheMetricsDefaultPath(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + t.Cleanup(func() { clus.Terminate(t) }) + client := clus.Client(0) + + mux := http.NewServeMux() + c, err := cache.New(client, "", + cache.WithHistoryWindowSize(8), + cache.WithMetrics(mux), + ) + require.NoError(t, err) + t.Cleanup(c.Close) + + families := scrapeHTTPMetrics(t, mux, cache.DefaultMetricsPath) + require.Containsf(t, families, "etcd_cache_store_keys_total", + "expected metrics served on default path %s", cache.DefaultMetricsPath) +} + +func TestCacheMetricsNilMuxNoEndpoint(t *testing.T) { + integration.BeforeTest(t) + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + t.Cleanup(func() { clus.Terminate(t) }) + client := clus.Client(0) + + c, err := cache.New(client, "", + cache.WithHistoryWindowSize(8), + ) + require.NoError(t, err) + t.Cleanup(c.Close) + require.NoError(t, c.WaitReady(t.Context())) +} + +// scrapeHTTPMetrics does an in-process HTTP GET against the mux and parses the +// Prometheus text exposition format into metric families. +func scrapeHTTPMetrics(t *testing.T, mux *http.ServeMux, path string) map[string]*dto.MetricFamily { + t.Helper() + req := httptest.NewRequest(http.MethodGet, path, nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + require.Equal(t, http.StatusOK, rec.Code) + b, err := io.ReadAll(rec.Body) + require.NoError(t, err) + parser := expfmt.NewTextParser(model.LegacyValidation) + families, err := parser.TextToMetricFamilies(bytes.NewReader(b)) + require.NoError(t, err) + return families +} + +// gaugeValue returns the value of a gauge metric with no labels. +func gaugeValue(t *testing.T, families map[string]*dto.MetricFamily, name string) float64 { + t.Helper() + fam, ok := families[name] + require.Truef(t, ok, "metric family %q not found", name) + require.NotEmptyf(t, fam.GetMetric(), "metric family %q has no samples", name) + return fam.GetMetric()[0].GetGauge().GetValue() +} + +// counterValue returns the value of a counter metric matching the given label key/value. +func counterValue(t *testing.T, families map[string]*dto.MetricFamily, name, labelKey, labelVal string) float64 { + t.Helper() + fam, ok := families[name] + require.Truef(t, ok, "metric family %q not found", name) + for _, m := range fam.GetMetric() { + for _, lp := range m.GetLabel() { + if lp.GetName() == labelKey && lp.GetValue() == labelVal { + return m.GetCounter().GetValue() + } + } + } + t.Fatalf("metric %s{%s=%q} not found", name, labelKey, labelVal) + return 0 +}