diff --git a/cache/README.md b/cache/README.md index 2c6bf8e59f2f..5a66c5855c96 100644 --- a/cache/README.md +++ b/cache/README.md @@ -1,3 +1,5 @@ # etcd cache Experimental etcd client cache library. + +**Note:** gRPC proxy is not supported. The cache relies on `RequestProgress` RPCs, which the gRPC proxy does not forward. diff --git a/cache/cache.go b/cache/cache.go index 65d337087d07..e148a30dc4c2 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -23,6 +23,7 @@ import ( "time" pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -31,24 +32,33 @@ var ( ErrUnsupportedRequest = errors.New("cache: unsupported request parameters") // Returned when the requested key or key‑range is invalid (empty or reversed) or lies outside c.prefix. ErrKeyRangeInvalid = errors.New("cache: invalid or out‑of‑range key range") + // Returned when the cache timed out waiting for the requested revision + ErrCacheTimeout = errors.New("cache: timed out waiting for revision") ) // Cache buffers a single etcd Watch for a given key‐prefix and fan‑outs local watchers. +// +// Note: gRPC proxy is not supported. Cache relies on RequestProgress RPCs, +// which the gRPC proxy does not forward. type Cache struct { - prefix string // prefix is the key-prefix this shard is responsible for ("" = root). - cfg Config // immutable runtime configuration - watcher clientv3.Watcher - kv clientv3.KV - demux *demux // demux fans incoming events out to active watchers and manages resync. - store *store // last‑observed snapshot - ready *ready - stop context.CancelFunc - waitGroup sync.WaitGroup - internalCtx context.Context + prefix string // prefix is the key-prefix this shard is responsible for ("" = root). + cfg Config // immutable runtime configuration + watcher clientv3.Watcher + kv clientv3.KV + demux *demux // demux fans incoming events out to active watchers and manages resync. + store *store // last‑observed snapshot + ready *ready + stop context.CancelFunc + waitGroup sync.WaitGroup + internalCtx context.Context + progressRequestor progressRequestor } // New builds a cache shard that watches only the requested prefix. // For the root cache pass "". +// +// Note: gRPC proxy is not supported. Cache relies on RequestProgress RPCs, +// which the gRPC proxy does not forward. func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error) { cfg := defaultConfig() for _, opt := range opts { @@ -65,23 +75,28 @@ func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error) internalCtx, cancel := context.WithCancel(context.Background()) cache := &Cache{ - prefix: prefix, - cfg: cfg, - watcher: client.Watcher, - kv: client.KV, - store: newStore(cfg.BTreeDegree, cfg.HistoryWindowSize), - ready: newReady(), - stop: cancel, - internalCtx: internalCtx, + prefix: prefix, + cfg: cfg, + watcher: client.Watcher, + kv: client.KV, + store: newStore(cfg.BTreeDegree, cfg.HistoryWindowSize), + ready: newReady(), + stop: cancel, + internalCtx: internalCtx, + progressRequestor: newConditionalProgressRequestor(client.Watcher, realClock{}, cfg.ProgressRequestInterval), } cache.demux = NewDemux(internalCtx, &cache.waitGroup, cfg.HistoryWindowSize, cfg.ResyncInterval) - cache.waitGroup.Add(1) + cache.waitGroup.Add(2) go func() { defer cache.waitGroup.Done() cache.getWatchLoop() }() + go func() { + defer cache.waitGroup.Done() + cache.progressRequestor.run(internalCtx) + }() return cache, nil } @@ -161,6 +176,19 @@ func (c *Cache) Get(ctx context.Context, key string, opts ...clientv3.OpOption) endKey := op.RangeBytes() requestedRev := op.Rev() + if !op.IsSerializable() { + serverRev, err := c.serverRevision(ctx) + if err != nil { + return nil, err + } + if requestedRev > serverRev { + return nil, rpctypes.ErrFutureRev + } + if err = c.waitTillRevision(ctx, serverRev); err != nil { + return nil, err + } + } + kvs, latestRev, err := c.store.Get(startKey, endKey, requestedRev) if err != nil { return nil, err @@ -196,6 +224,45 @@ func (c *Cache) WaitForRevision(ctx context.Context, rev int64) error { } } +func (c *Cache) serverRevision(ctx context.Context) (int64, error) { + key := c.prefix + if key == "" { + key = "/" + } + resp, err := c.kv.Get(ctx, key, clientv3.WithLimit(1), clientv3.WithCountOnly()) + if err != nil { + return 0, err + } + return resp.Header.Revision, nil +} + +func (c *Cache) waitTillRevision(ctx context.Context, rev int64) error { + if c.store.LatestRev() >= rev { + return nil + } + + c.progressRequestor.add() + defer c.progressRequestor.remove() + + ticker := time.NewTicker(revisionPollInterval) + defer ticker.Stop() + timeout := time.After(c.cfg.WaitTimeout) + + // TODO: rewrite from periodic polling to passive notification + for { + if c.store.LatestRev() >= rev { + return nil + } + select { + case <-ticker.C: + case <-timeout: + return ErrCacheTimeout + case <-ctx.Done(): + return ctx.Err() + } + } +} + // Close cancels the private context and blocks until all goroutines return. func (c *Cache) Close() { c.stop() @@ -358,9 +425,6 @@ func (c *Cache) validateGet(key string, op clientv3.Op) (KeyPredicate, error) { return nil, fmt.Errorf("%w: MinCreateRev(%d) not supported", ErrUnsupportedRequest, op.MinCreateRev()) case op.MaxCreateRev() != 0: return nil, fmt.Errorf("%w: MaxCreateRev(%d) not supported", ErrUnsupportedRequest, op.MaxCreateRev()) - // cache now only serves serializable reads of the latest revision (rev == 0). - case !op.IsSerializable(): - return nil, fmt.Errorf("%w: non-serializable request", ErrUnsupportedRequest) } startKey := []byte(key) diff --git a/cache/cache_test.go b/cache/cache_test.go index e2380747e0d1..dabafcc6e849 100644 --- a/cache/cache_test.go +++ b/cache/cache_test.go @@ -16,6 +16,8 @@ package cache import ( "context" + "errors" + "fmt" "sync" "testing" "time" @@ -501,6 +503,7 @@ type mockWatcher struct { wg sync.WaitGroup mu sync.Mutex lastStartRev int64 + progressErr error } func newMockWatcher(buf int) *mockWatcher { @@ -522,7 +525,7 @@ func (m *mockWatcher) Watch(ctx context.Context, _ string, opts ...clientv3.OpOp return out } -func (m *mockWatcher) RequestProgress(_ context.Context) error { return nil } +func (m *mockWatcher) RequestProgress(_ context.Context) error { return m.progressErr } func (m *mockWatcher) Close() error { m.closeOnce.Do(func() { close(m.responses) }) @@ -600,6 +603,7 @@ func (m *mockWatcher) streamResponses(ctx context.Context, out chan<- clientv3.W type kvStub struct { queued []*clientv3.GetResponse defaultResp *clientv3.GetResponse + defaultErr error } func newKVStub(resps ...*clientv3.GetResponse) *kvStub { @@ -610,7 +614,11 @@ func newKVStub(resps ...*clientv3.GetResponse) *kvStub { } } -func (s *kvStub) Get(ctx context.Context, key string, _ ...clientv3.OpOption) (*clientv3.GetResponse, error) { +func (s *kvStub) Get(_ context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + if s.defaultErr != nil { + return nil, s.defaultErr + } + if len(s.queued) > 0 { next := s.queued[0] s.queued = s.queued[1:] @@ -692,3 +700,171 @@ func verifySnapshot(t *testing.T, cache *Cache, want []*mvccpb.KeyValue) { t.Fatalf("cache snapshot mismatch (-want +got):\n%s", diff) } } + +type noopProgressNotifier struct{} + +func (n *noopProgressNotifier) RequestProgress(_ context.Context) error { + return nil +} + +func newTestProgressRequestor() *conditionalProgressRequestor { + return newConditionalProgressRequestor(&noopProgressNotifier{}, realClock{}, 100*time.Millisecond) +} + +func newCacheForWaitTest(serverRev int64, localRev int64, pr progressRequestor) (*Cache, *store) { + cfg := defaultConfig() + st := newStore(cfg.BTreeDegree, cfg.HistoryWindowSize) + if localRev > 0 { + st.Restore(nil, localRev) + } + kv := &kvStub{ + defaultResp: &clientv3.GetResponse{Header: &pb.ResponseHeader{Revision: serverRev}}, + } + return &Cache{ + kv: kv, + store: st, + prefix: "/", + progressRequestor: pr, + cfg: cfg, + }, st +} + +func TestWaitTillRevision(t *testing.T) { + t.Run("cache_already_caught_up", func(t *testing.T) { + c, _ := newCacheForWaitTest(10, 10, newTestProgressRequestor()) + + if err := c.waitTillRevision(context.Background(), 10); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("local_rev_sufficient_skips_server_call", func(t *testing.T) { + cfg := defaultConfig() + st := newStore(cfg.BTreeDegree, cfg.HistoryWindowSize) + st.Restore(nil, 10) + c := &Cache{ + kv: &kvStub{defaultErr: fmt.Errorf("should not be called")}, + store: st, + prefix: "/", + progressRequestor: newTestProgressRequestor(), + cfg: cfg, + } + + if err := c.waitTillRevision(context.Background(), 5); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("cache_catches_up", func(t *testing.T) { + c, st := newCacheForWaitTest(15, 5, newTestProgressRequestor()) + + go func() { + time.Sleep(200 * time.Millisecond) + st.Restore(nil, 10) + }() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if err := c.waitTillRevision(ctx, 10); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("rev_zero_cache_caught_up", func(t *testing.T) { + c, _ := newCacheForWaitTest(10, 10, newTestProgressRequestor()) + + if err := c.waitTillRevision(context.Background(), 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("rev_zero_waits_for_server_rev", func(t *testing.T) { + c, st := newCacheForWaitTest(10, 5, newTestProgressRequestor()) + + go func() { + time.Sleep(200 * time.Millisecond) + st.Restore(nil, 10) + }() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + if err := c.waitTillRevision(ctx, 0); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("context_cancelled", func(t *testing.T) { + c, _ := newCacheForWaitTest(10, 5, newTestProgressRequestor()) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + err := c.waitTillRevision(ctx, 10) + if !errors.Is(err, context.DeadlineExceeded) { + t.Fatalf("got %v, want context.DeadlineExceeded", err) + } + }) + + t.Run("timeout", func(t *testing.T) { + c, _ := newCacheForWaitTest(10, 5, newTestProgressRequestor()) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + err := c.waitTillRevision(ctx, 10) + if !errors.Is(err, ErrCacheTimeout) { + t.Fatalf("got %v, want ErrCacheTimeout", err) + } + }) +} + +func TestWaitTillRevisionTriggersProgressRequests(t *testing.T) { + fc := newFakeClock() + pr := newTestConditionalProgressRequestor(fc, 50*time.Millisecond) + c, st := newCacheForWaitTest(15, 5, pr) + + // Start progress requestor + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go pr.run(ctx) + + // Wait for goroutine to start + time.Sleep(10 * time.Millisecond) + + // Initially, no progress requests should be sent (no waiters) + fc.Advance(100 * time.Millisecond) + if err := pollConditionNoChange(func() bool { + return pr.progressRequestsSentCount.Load() == 0 + }); err != nil { + t.Fatal("expected no progress requests without active waiters") + } + + // Start waiting - this should trigger progress requests + errCh := make(chan error, 1) + go func() { + errCh <- c.waitTillRevision(context.Background(), 10) + }() + + // Advance time and wait for progress requests to start + fc.Advance(50 * time.Millisecond) + time.Sleep(10 * time.Millisecond) + + // Verify progress requests are being sent while waiting + if pr.progressRequestsSentCount.Load() == 0 { + t.Fatal("expected progress requests during wait") + } + + // Complete the wait + st.Restore(nil, 15) + + if err := <-errCh; err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // After completion, progress requests should stop + finalCount := pr.progressRequestsSentCount.Load() + fc.Advance(100 * time.Millisecond) + if err := pollConditionNoChange(func() bool { + return pr.progressRequestsSentCount.Load() == finalCount + }); err != nil { + t.Fatalf("expected no new progress requests after completion, got %d initially, then changed", finalCount) + } +} diff --git a/cache/clock.go b/cache/clock.go new file mode 100644 index 000000000000..62b33a668c76 --- /dev/null +++ b/cache/clock.go @@ -0,0 +1,54 @@ +// Copyright 2025 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import "time" + +// Clock allows for injecting fake or real clocks into code +// that needs to do arbitrary things based on time. +type Clock interface { + NewTimer(d time.Duration) Timer +} + +// Timer allows for injecting fake or real timers into code +// that needs to do arbitrary things based on time. +type Timer interface { + Chan() <-chan time.Time + Stop() bool + Reset(d time.Duration) bool +} + +// realClock implements Clock using the standard time package. +type realClock struct{} + +func (realClock) NewTimer(d time.Duration) Timer { + return &realTimer{timer: time.NewTimer(d)} +} + +type realTimer struct { + timer *time.Timer +} + +func (r *realTimer) Chan() <-chan time.Time { + return r.timer.C +} + +func (r *realTimer) Stop() bool { + return r.timer.Stop() +} + +func (r *realTimer) Reset(d time.Duration) bool { + return r.timer.Reset(d) +} diff --git a/cache/config.go b/cache/config.go index cf18ffcc7468..3419ef40134c 100644 --- a/cache/config.go +++ b/cache/config.go @@ -16,6 +16,8 @@ package cache import "time" +const revisionPollInterval = 50 * time.Millisecond + type Config struct { // PerWatcherBufferSize caps each watcher’s buffered channel. // Bigger values tolerate brief client slow-downs at the cost of extra memory. @@ -31,20 +33,26 @@ type Config struct { MaxBackoff time.Duration // GetTimeout is the timeout applied to the first Get() used to bootstrap the cache. GetTimeout time.Duration + // WaitTimeout is the maximum time a consistent Get will wait for the local cache to catch up before returning ErrCacheTimeout. + WaitTimeout time.Duration // BTreeDegree controls the degree (branching factor) of the in-memory B-tree store. BTreeDegree int + // ProgressRequestInterval controls how often progress notifications are requested from the etcd watch stream during a consistent Get. + ProgressRequestInterval time.Duration } // TODO: tune via performance/load tests. func defaultConfig() Config { return Config{ - PerWatcherBufferSize: 10, - HistoryWindowSize: 2048, - ResyncInterval: 50 * time.Millisecond, - InitialBackoff: 50 * time.Millisecond, - MaxBackoff: 2 * time.Second, - GetTimeout: 5 * time.Second, - BTreeDegree: 32, + PerWatcherBufferSize: 10, + HistoryWindowSize: 2048, + ResyncInterval: 50 * time.Millisecond, + InitialBackoff: 50 * time.Millisecond, + MaxBackoff: 2 * time.Second, + GetTimeout: 5 * time.Second, + WaitTimeout: 3 * time.Second, + BTreeDegree: 32, + ProgressRequestInterval: 100 * time.Millisecond, } } @@ -77,3 +85,11 @@ func WithGetTimeout(d time.Duration) Option { func WithBTreeDegree(n int) Option { return func(c *Config) { c.BTreeDegree = n } } + +func WithProgressRequestInterval(d time.Duration) Option { + return func(c *Config) { c.ProgressRequestInterval = d } +} + +func WithWaitTimeout(d time.Duration) Option { + return func(c *Config) { c.WaitTimeout = d } +} diff --git a/cache/fake_clock_test.go b/cache/fake_clock_test.go new file mode 100644 index 000000000000..65bec2f8fa67 --- /dev/null +++ b/cache/fake_clock_test.go @@ -0,0 +1,111 @@ +// Copyright 2025 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +// Minimal fake clock for testing, based on +// https://github.com/kubernetes/utils/blob/master/clock/testing/fake_clock.go + +import ( + "sync" + "time" +) + +type fakeClock struct { + lock sync.Mutex + time time.Time + waiters []*fakeClockWaiter +} + +type fakeClockWaiter struct { + targetTime time.Time + destChan chan time.Time +} + +func newFakeClock() *fakeClock { + return &fakeClock{time: time.Now()} +} + +func (f *fakeClock) NewTimer(d time.Duration) Timer { + f.lock.Lock() + defer f.lock.Unlock() + ch := make(chan time.Time, 1) + timer := &fakeTimer{ + fakeClock: f, + waiter: fakeClockWaiter{ + targetTime: f.time.Add(d), + destChan: ch, + }, + } + f.waiters = append(f.waiters, &timer.waiter) + return timer +} + +func (f *fakeClock) Advance(d time.Duration) { + f.lock.Lock() + defer f.lock.Unlock() + f.time = f.time.Add(d) + newWaiters := make([]*fakeClockWaiter, 0, len(f.waiters)) + for _, w := range f.waiters { + if !w.targetTime.After(f.time) { + w.destChan <- f.time + } else { + newWaiters = append(newWaiters, w) + } + } + f.waiters = newWaiters +} + +type fakeTimer struct { + fakeClock *fakeClock + waiter fakeClockWaiter +} + +func (t *fakeTimer) Chan() <-chan time.Time { + return t.waiter.destChan +} + +func (t *fakeTimer) Stop() bool { + t.fakeClock.lock.Lock() + defer t.fakeClock.lock.Unlock() + active := false + newWaiters := make([]*fakeClockWaiter, 0, len(t.fakeClock.waiters)) + for _, w := range t.fakeClock.waiters { + if w != &t.waiter { + newWaiters = append(newWaiters, w) + } else { + active = true + } + } + t.fakeClock.waiters = newWaiters + return active +} + +func (t *fakeTimer) Reset(d time.Duration) bool { + t.fakeClock.lock.Lock() + defer t.fakeClock.lock.Unlock() + active := false + t.waiter.targetTime = t.fakeClock.time.Add(d) + for _, w := range t.fakeClock.waiters { + if w == &t.waiter { + // If timer is found, it has not been fired yet. + active = true + break + } + } + if !active { + t.fakeClock.waiters = append(t.fakeClock.waiters, &t.waiter) + } + return active +} diff --git a/cache/progress_requestor.go b/cache/progress_requestor.go new file mode 100644 index 000000000000..4bde695799fa --- /dev/null +++ b/cache/progress_requestor.go @@ -0,0 +1,114 @@ +// Copyright 2025 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "context" + "fmt" + "sync" + "time" +) + +type progressNotifier interface { + RequestProgress(ctx context.Context) error +} + +// Copied from https://github.com/kubernetes/kubernetes/blob/4116c15/staging/src/k8s.io/apiserver/pkg/storage/cacher/progress/watch_progress.go +type progressRequestor interface { + // run starts the background loop that sends RequestProgress RPCs. + run(ctx context.Context) + // add increments the count of active waiters so the run loop knows to send RequestProgress RPCs. + add() + // remove decrements the count of active waiters. + remove() +} + +type conditionalProgressRequestor struct { + mux sync.Mutex + cond *sync.Cond + waiting int32 + stopped bool + notifier progressNotifier + requestInterval time.Duration + clock Clock +} + +func newConditionalProgressRequestor(notifier progressNotifier, clock Clock, requestInterval time.Duration) *conditionalProgressRequestor { + waiter := &conditionalProgressRequestor{ + notifier: notifier, + requestInterval: requestInterval, + clock: clock, + } + waiter.cond = sync.NewCond(&waiter.mux) + return waiter +} + +func (p *conditionalProgressRequestor) run(ctx context.Context) { + go func() { + <-ctx.Done() + p.mux.Lock() + defer p.mux.Unlock() + p.stopped = true + p.cond.Signal() + }() + + timer := p.clock.NewTimer(p.requestInterval) + defer timer.Stop() + for { + stopped := func() bool { + p.mux.Lock() + defer p.mux.Unlock() + for p.waiting == 0 && !p.stopped { + p.cond.Wait() + } + return p.stopped + }() + if stopped { + return + } + + select { + case <-timer.Chan(): + shouldContinue := func() bool { + p.mux.Lock() + defer p.mux.Unlock() + return p.waiting > 0 && !p.stopped + }() + if !shouldContinue { + timer.Reset(0) + continue + } + timer.Reset(p.requestInterval) + if err := p.notifier.RequestProgress(ctx); err != nil { + fmt.Printf("RequestProgress failed: %v\n", err) + } + case <-ctx.Done(): + return + } + } +} + +func (p *conditionalProgressRequestor) add() { + p.mux.Lock() + defer p.mux.Unlock() + p.waiting++ + p.cond.Signal() +} + +func (p *conditionalProgressRequestor) remove() { + p.mux.Lock() + defer p.mux.Unlock() + p.waiting-- +} diff --git a/cache/progress_requestor_test.go b/cache/progress_requestor_test.go new file mode 100644 index 000000000000..92837c6221dd --- /dev/null +++ b/cache/progress_requestor_test.go @@ -0,0 +1,155 @@ +// Copyright 2025 The etcd Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package cache + +import ( + "context" + "sync/atomic" + "testing" + "time" +) + +const ( + pollPeriod = 1 * time.Millisecond + minimalNoChange = 20 * time.Millisecond + pollTimeout = 5 * time.Second +) + +// Copied from https://github.com/kubernetes/kubernetes/blob/4116c15/staging/src/k8s.io/apiserver/pkg/storage/cacher/progress/watch_progress_test.go +func TestConditionalProgressRequestor(t *testing.T) { + ctx := context.Background() + + fc := newFakeClock() + pr := newTestConditionalProgressRequestor(fc, 100*time.Millisecond) + stopCtx, cancel := context.WithCancel(ctx) + defer cancel() + go pr.run(stopCtx) + + var wantRequestsSent int32 + var requestsSent int32 + + t.Log("Wait for goroutine to start") + time.Sleep(10 * time.Millisecond) + + t.Log("No progress requests if no-one is waiting") + fc.Advance(250 * time.Millisecond) + + if err := pollConditionNoChange(func() bool { + requestsSent = pr.progressRequestsSentCount.Load() + return requestsSent == wantRequestsSent + }); err != nil { + t.Fatalf("Failed to wait progress requests, err: %s, want: %d, got %d", err, wantRequestsSent, requestsSent) + } + + t.Log("Adding waiters allows progress request to be sent") + pr.add() + fc.Advance(150 * time.Millisecond) + wantRequestsSent++ + if err := pollConditionNoChange(func() bool { + requestsSent = pr.progressRequestsSentCount.Load() + return requestsSent == wantRequestsSent + }); err != nil { + t.Fatalf("Failed to wait progress requests, err: %s, want: %d, got %d", err, wantRequestsSent, requestsSent) + } + + t.Log("Periodically request progress to be sent every period") + for wantRequestsSent < 5 { + fc.Advance(100 * time.Millisecond) + wantRequestsSent++ + + if err := pollConditionNoChange(func() bool { + requestsSent = pr.progressRequestsSentCount.Load() + return requestsSent == wantRequestsSent + }); err != nil { + t.Fatalf("Failed to wait progress requests, err: %s, want: %d, got %d", err, wantRequestsSent, requestsSent) + } + } + pr.remove() + + t.Log("No progress requests if no-one is waiting") + fc.Advance(250 * time.Millisecond) + if err := pollConditionNoChange(func() bool { + requestsSent = pr.progressRequestsSentCount.Load() + return requestsSent == wantRequestsSent + }); err != nil { + t.Fatalf("Failed to wait progress requests, err: %s, want: %d, got %d", err, wantRequestsSent, requestsSent) + } + + t.Log("No progress after stopping") + cancel() + fc.Advance(50 * time.Millisecond) + if err := pollConditionNoChange(func() bool { + requestsSent = pr.progressRequestsSentCount.Load() + return requestsSent == wantRequestsSent + }); err != nil { + t.Fatalf("Failed to wait progress requests, err: %s, want: %d, got %d", err, wantRequestsSent, requestsSent) + } + pr.add() + fc.Advance(250 * time.Millisecond) + if err := pollConditionNoChange(func() bool { + requestsSent = pr.progressRequestsSentCount.Load() + return requestsSent == wantRequestsSent + }); err != nil { + t.Fatalf("Failed to wait progress requests, err: %s, want: %d, got %d", err, wantRequestsSent, requestsSent) + } +} + +func newTestConditionalProgressRequestor(clock Clock, requestInterval time.Duration) *testConditionalProgressRequestor { + pr := &testConditionalProgressRequestor{} + pr.conditionalProgressRequestor = newConditionalProgressRequestor(pr, clock, requestInterval) + return pr +} + +type testConditionalProgressRequestor struct { + *conditionalProgressRequestor + progressRequestsSentCount atomic.Int32 +} + +func (pr *testConditionalProgressRequestor) RequestProgress(ctx context.Context) error { + pr.progressRequestsSentCount.Add(1) + return nil +} + +func pollConditionNoChange(condition func() bool) error { + passCounter := 0 + requiredNumberOfPasses := int(minimalNoChange/pollPeriod) + 1 + deadline := time.Now().Add(pollTimeout) + + ticker := time.NewTicker(pollPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if condition() { + passCounter++ + } else { + passCounter = 0 + } + if passCounter >= requiredNumberOfPasses { + return nil + } + if time.Now().After(deadline) { + return &timeoutError{} + } + } + } +} + +type timeoutError struct{} + +func (e *timeoutError) Error() string { + return "timed out waiting for condition to stabilize" +} diff --git a/tests/integration/cache_test.go b/tests/integration/cache_test.go index db0240eb118b..b2b990e3c34c 100644 --- a/tests/integration/cache_test.go +++ b/tests/integration/cache_test.go @@ -18,6 +18,7 @@ import ( "context" "errors" "fmt" + "math/rand" "strings" "testing" "time" @@ -28,6 +29,7 @@ import ( "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" cache "go.etcd.io/etcd/cache/v3" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/pkg/v3/stringutil" "go.etcd.io/etcd/tests/v3/framework/integration" ) @@ -545,6 +547,9 @@ func TestCacheWatchPrefixProgressNotify(t *testing.T) { } func TestCacheWithoutPrefixGet(t *testing.T) { + if integration.ThroughProxy { + t.Skip("grpc proxy currently does not support requesting progress notifications") + } tcs := []struct { name string initialEvents, followupEvents []*clientv3.Event @@ -590,12 +595,12 @@ func TestGet(t *testing.T) { func testGet(t *testing.T, kv clientv3.KV, getReader func() Getter, initialEvents, followupEvents []*clientv3.Event) { ctx := t.Context() t.Log("Setup") - initialRev := applyEvents(ctx, t, kv, initialEvents) + baseRev := applyEvents(ctx, t, kv, initialEvents) reader := getReader() if c, ok := reader.(*cache.Cache); ok { - if err := c.WaitForRevision(ctx, initialRev); err != nil { - t.Fatalf("cache never caught up to rev %d: %v", initialRev, err) + if err := c.WaitForRevision(ctx, baseRev); err != nil { + t.Fatalf("cache never caught up to rev %d: %v", baseRev, err) } } @@ -606,13 +611,27 @@ func testGet(t *testing.T, kv clientv3.KV, getReader func() Getter, initialEvent } } + if followupRev > baseRev { + baseRev = followupRev + } + + latestRev := baseRev + t.Log("Validate") for _, tc := range getTestCases { tc := tc t.Run(tc.name, func(t *testing.T) { - op := clientv3.OpGet(tc.key, tc.opts...) + latestRev += advanceRevision(t, kv) + + opts := tc.opts + if tc.optsFunc != nil { + opts = append(opts, tc.optsFunc(latestRev)...) + } + + op := clientv3.OpGet(tc.key, opts...) requestedRev := op.Rev() - resp, err := reader.Get(ctx, tc.key, tc.opts...) + + resp, err := reader.Get(ctx, tc.key, opts...) if tc.expectErr != nil { if !errors.Is(err, tc.expectErr) { t.Fatalf("expected %v for Get %q; got %v", tc.expectErr, tc.key, err) @@ -620,8 +639,8 @@ func testGet(t *testing.T, kv clientv3.KV, getReader func() Getter, initialEvent return } if err != nil { - if _, ok := reader.(*cache.Cache); ok && requestedRev > 0 && requestedRev < initialRev && errors.Is(err, rpctypes.ErrCompacted) { - t.Logf("expected ErrCompacted: requestedRev=%d < initialCompleteRev=%d", requestedRev, initialRev) + if _, ok := reader.(*cache.Cache); ok && requestedRev > 0 && requestedRev < baseRev && errors.Is(err, rpctypes.ErrCompacted) { + t.Logf("expected ErrCompacted: requestedRev=%d < baseCompleteRev=%d", requestedRev, baseRev) return } t.Fatalf("Get %q failed: %v", tc.key, err) @@ -629,8 +648,14 @@ func testGet(t *testing.T, kv clientv3.KV, getReader func() Getter, initialEvent if diff := cmp.Diff(tc.wantKVs, resp.Kvs); diff != "" { t.Fatalf("unexpected KVs (-want +got):\n%s", diff) } - if resp.Header.Revision != tc.wantRevision { - t.Fatalf("revision: got %d, want %d", resp.Header.Revision, tc.wantRevision) + if op.IsSerializable() { + if resp.Header.Revision < baseRev { + t.Fatalf("revision: got %d, want >= %d", resp.Header.Revision, baseRev) + } + } else { + if resp.Header.Revision != latestRev { + t.Fatalf("revision: got %d, want == %d", resp.Header.Revision, latestRev) + } } }) } @@ -721,147 +746,225 @@ var ( ) type getTestCase struct { - name string - key string - opts []clientv3.OpOption - wantKVs []*mvccpb.KeyValue - wantRevision int64 - expectErr error + name string + key string + opts []clientv3.OpOption + optsFunc func(latestRev int64) []clientv3.OpOption + wantKVs []*mvccpb.KeyValue + expectErr error } var getTestCases = []getTestCase{ { - name: "single key /foo/a", - key: "/foo/a", - opts: []clientv3.OpOption{clientv3.WithSerializable()}, - wantKVs: []*mvccpb.KeyValue{Rev8PutFooA.Kv}, - wantRevision: 8, + name: "single key /foo/a", + key: "/foo/a", + opts: []clientv3.OpOption{}, + wantKVs: []*mvccpb.KeyValue{Rev8PutFooA.Kv}, + }, + { + name: "single key /foo/a at rev=2", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithRev(2)}, + wantKVs: []*mvccpb.KeyValue{Rev2PutFooA.Kv}, }, { - name: "single key /foo/a at rev=2", - key: "/foo/a", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithRev(2)}, - wantKVs: []*mvccpb.KeyValue{Rev2PutFooA.Kv}, - wantRevision: 8, + name: "single key /foo/a at rev=7", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithRev(7)}, + wantKVs: []*mvccpb.KeyValue{Rev7TxnPutFooA.Kv}, }, { - name: "single key /foo/a at rev=7", - key: "/foo/a", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithRev(7)}, - wantKVs: []*mvccpb.KeyValue{Rev7TxnPutFooA.Kv}, - wantRevision: 8, + name: "single key /foo/a at rev=latest", + key: "/foo/a", + optsFunc: func(latestRev int64) []clientv3.OpOption { + return []clientv3.OpOption{clientv3.WithRev(latestRev)} + }, + wantKVs: []*mvccpb.KeyValue{Rev8PutFooA.Kv}, }, { - name: "single key /foo/a at rev=8", - key: "/foo/a", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithRev(8)}, - wantKVs: []*mvccpb.KeyValue{Rev8PutFooA.Kv}, - wantRevision: 8, + name: "single key /foo/a at rev=latest+1 (future), returns error", + key: "/foo/a", + optsFunc: func(latestRev int64) []clientv3.OpOption { + return []clientv3.OpOption{clientv3.WithRev(latestRev + 1)} + }, + expectErr: rpctypes.ErrFutureRev, }, { - name: "single key /foo/a at rev=9 (future), returns error", - key: "/foo/a", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithRev(9)}, + name: "non-existing key", + key: "/doesnotexist", + opts: []clientv3.OpOption{}, + wantKVs: nil, + }, + { + name: "non-existing key at rev=4", + key: "/doesnotexist", + opts: []clientv3.OpOption{clientv3.WithRev(4)}, + wantKVs: nil, + }, + { + name: "non-existing key at rev=latest+1 (future), returns error", + key: "/doesnotexist", + optsFunc: func(latestRev int64) []clientv3.OpOption { + return []clientv3.OpOption{clientv3.WithRev(latestRev + 1)} + }, expectErr: rpctypes.ErrFutureRev, }, { - name: "non-existing key", - key: "/doesnotexist", - opts: []clientv3.OpOption{clientv3.WithSerializable()}, - wantKVs: nil, - wantRevision: 8, + name: "prefix /foo", + key: "/foo", + opts: []clientv3.OpOption{clientv3.WithPrefix()}, + wantKVs: []*mvccpb.KeyValue{Rev8PutFooA.Kv, Rev7TxnPutFooB.Kv, Rev4PutFooC.Kv}, }, { - name: "non-existing key at rev=4", - key: "/doesnotexist", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithRev(4)}, - wantKVs: nil, - wantRevision: 8, + name: "prefix /foo at rev=5", + key: "/foo", + opts: []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithRev(5)}, + wantKVs: []*mvccpb.KeyValue{Rev2PutFooA.Kv, Rev3PutFooB.Kv, Rev4PutFooC.Kv, Rev5PutFooD.Kv}, }, { - name: "non-existing key at rev=9 (future), returns error", - key: "/doesnotexist", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithRev(9)}, + name: "prefix /foo/b at rev=4", + key: "/foo/b", + opts: []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithRev(4)}, + wantKVs: []*mvccpb.KeyValue{Rev3PutFooB.Kv}, + }, + { + name: "prefix /foo/b at rev=7", + key: "/foo/b", + opts: []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithRev(7)}, + wantKVs: []*mvccpb.KeyValue{Rev7TxnPutFooB.Kv}, + }, + { + name: "prefix /foo at rev=latest+1 (future), returns error", + key: "/foo", + opts: []clientv3.OpOption{clientv3.WithPrefix()}, + optsFunc: func(latestRev int64) []clientv3.OpOption { + return []clientv3.OpOption{clientv3.WithRev(latestRev + 1)} + }, expectErr: rpctypes.ErrFutureRev, }, { - name: "prefix /foo", - key: "/foo", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithPrefix()}, - wantKVs: []*mvccpb.KeyValue{Rev8PutFooA.Kv, Rev7TxnPutFooB.Kv, Rev4PutFooC.Kv}, - wantRevision: 8, + name: "range [/foo/a, /foo/c)", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithRange("/foo/c")}, + wantKVs: []*mvccpb.KeyValue{Rev8PutFooA.Kv, Rev7TxnPutFooB.Kv}, }, { - name: "prefix /foo at rev=5", - key: "/foo", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithPrefix(), clientv3.WithRev(5)}, - wantKVs: []*mvccpb.KeyValue{Rev2PutFooA.Kv, Rev3PutFooB.Kv, Rev4PutFooC.Kv, Rev5PutFooD.Kv}, - wantRevision: 8, + name: "range [/foo/a, /foo/d) at rev=5", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithRange("/foo/d"), clientv3.WithRev(5)}, + wantKVs: []*mvccpb.KeyValue{Rev2PutFooA.Kv, Rev3PutFooB.Kv, Rev4PutFooC.Kv}, }, { - name: "prefix /foo/b at rev=4", - key: "/foo/b", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithPrefix(), clientv3.WithRev(4)}, - wantKVs: []*mvccpb.KeyValue{Rev3PutFooB.Kv}, - wantRevision: 8, + name: "range [/foo/a, /foo/c) at rev=latest+1 (future), returns error", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithRange("/foo/c")}, + optsFunc: func(latestRev int64) []clientv3.OpOption { + return []clientv3.OpOption{clientv3.WithRev(latestRev + 1)} + }, + expectErr: rpctypes.ErrFutureRev, + }, + { + name: "fromKey /foo/b", + key: "/foo/b", + opts: []clientv3.OpOption{clientv3.WithFromKey()}, + wantKVs: []*mvccpb.KeyValue{Rev7TxnPutFooB.Kv, Rev4PutFooC.Kv}, }, { - name: "prefix /foo/b at rev=7", - key: "/foo/b", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithPrefix(), clientv3.WithRev(7)}, - wantKVs: []*mvccpb.KeyValue{Rev7TxnPutFooB.Kv}, - wantRevision: 8, + name: "fromKey /foo/b at rev=7", + key: "/foo/b", + opts: []clientv3.OpOption{clientv3.WithFromKey(), clientv3.WithRev(7)}, + wantKVs: []*mvccpb.KeyValue{Rev7TxnPutFooB.Kv, Rev4PutFooC.Kv}, }, { - name: "prefix /foo at rev=9 (future), returns error", - key: "/foo", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithPrefix(), clientv3.WithRev(9)}, - wantKVs: []*mvccpb.KeyValue{Rev2PutFooA.Kv, Rev3PutFooB.Kv, Rev4PutFooC.Kv, Rev5PutFooD.Kv}, + name: "fromKey /foo/b at rev=latest+1 (future), returns error", + key: "/foo/b", + opts: []clientv3.OpOption{clientv3.WithFromKey()}, + optsFunc: func(latestRev int64) []clientv3.OpOption { + return []clientv3.OpOption{clientv3.WithRev(latestRev + 1)} + }, expectErr: rpctypes.ErrFutureRev, }, { - name: "range [/foo/a, /foo/c)", - key: "/foo/a", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithRange("/foo/c")}, - wantKVs: []*mvccpb.KeyValue{Rev8PutFooA.Kv, Rev7TxnPutFooB.Kv}, - wantRevision: 8, + name: "single key /foo/a serializable", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithSerializable()}, + wantKVs: []*mvccpb.KeyValue{Rev8PutFooA.Kv}, + }, + { + name: "single key /foo/a serializable at rev=latest+1 (future), returns error", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithSerializable()}, + optsFunc: func(latestRev int64) []clientv3.OpOption { + return []clientv3.OpOption{clientv3.WithRev(latestRev + 1)} + }, + expectErr: rpctypes.ErrFutureRev, }, { - name: "range [/foo/a, /foo/d) at rev=5", - key: "/foo/a", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithRange("/foo/d"), clientv3.WithRev(5)}, - wantKVs: []*mvccpb.KeyValue{Rev2PutFooA.Kv, Rev3PutFooB.Kv, Rev4PutFooC.Kv}, - wantRevision: 8, + name: "non-existing key serializable", + key: "/doesnotexist", + opts: []clientv3.OpOption{clientv3.WithSerializable()}, + wantKVs: nil, }, { - name: "range [/foo/a, /foo/c) at rev=9 (future), returns error", - key: "/foo/a", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithRange("/foo/c"), clientv3.WithRev(9)}, + name: "non-existing key serializable at rev=latest+1 (future), returns error", + key: "/doesnotexist", + opts: []clientv3.OpOption{clientv3.WithSerializable()}, + optsFunc: func(latestRev int64) []clientv3.OpOption { + return []clientv3.OpOption{clientv3.WithRev(latestRev + 1)} + }, expectErr: rpctypes.ErrFutureRev, }, { - name: "fromKey /foo/b", - key: "/foo/b", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithFromKey()}, - wantKVs: []*mvccpb.KeyValue{Rev7TxnPutFooB.Kv, Rev4PutFooC.Kv}, - wantRevision: 8, + name: "prefix /foo serializable", + key: "/foo", + opts: []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithSerializable()}, + wantKVs: []*mvccpb.KeyValue{Rev8PutFooA.Kv, Rev7TxnPutFooB.Kv, Rev4PutFooC.Kv}, }, { - name: "fromKey /foo/b at rev=7", - key: "/foo/b", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithFromKey(), clientv3.WithRev(7)}, - wantKVs: []*mvccpb.KeyValue{Rev7TxnPutFooB.Kv, Rev4PutFooC.Kv}, - wantRevision: 8, + name: "prefix /foo serializable at rev=latest+1 (future), returns error", + key: "/foo", + opts: []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithSerializable()}, + optsFunc: func(latestRev int64) []clientv3.OpOption { + return []clientv3.OpOption{clientv3.WithRev(latestRev + 1)} + }, + expectErr: rpctypes.ErrFutureRev, }, { - name: "fromKey /foo/b at rev=9 (future), returns error", - key: "/foo/b", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithFromKey(), clientv3.WithRev(9)}, + name: "range [/foo/a, /foo/c) serializable", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithRange("/foo/c"), clientv3.WithSerializable()}, + wantKVs: []*mvccpb.KeyValue{Rev8PutFooA.Kv, Rev7TxnPutFooB.Kv}, + }, + { + name: "range [/foo/a, /foo/c) serializable at rev=latest+1 (future), returns error", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithRange("/foo/c"), clientv3.WithSerializable()}, + optsFunc: func(latestRev int64) []clientv3.OpOption { + return []clientv3.OpOption{clientv3.WithRev(latestRev + 1)} + }, + expectErr: rpctypes.ErrFutureRev, + }, + { + name: "fromKey /foo/b serializable", + key: "/foo/b", + opts: []clientv3.OpOption{clientv3.WithFromKey(), clientv3.WithSerializable()}, + wantKVs: []*mvccpb.KeyValue{Rev7TxnPutFooB.Kv, Rev4PutFooC.Kv}, + }, + { + name: "fromKey /foo/b serializable at rev=latest+1 (future), returns error", + key: "/foo/b", + opts: []clientv3.OpOption{clientv3.WithFromKey(), clientv3.WithSerializable()}, + optsFunc: func(latestRev int64) []clientv3.OpOption { + return []clientv3.OpOption{clientv3.WithRev(latestRev + 1)} + }, expectErr: rpctypes.ErrFutureRev, }, } func TestCacheWithPrefixGetInScope(t *testing.T) { + if integration.ThroughProxy { + t.Skip("grpc proxy currently does not support requesting progress notifications") + } integration.BeforeTest(t) clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) t.Cleanup(func() { clus.Terminate(t) }) @@ -926,61 +1029,108 @@ func testWithPrefixGet(t *testing.T, cli *clientv3.Client, getReader func() Gett Version: 1, } + baseRev := latestRev + testCases := []struct { - name string - key string - opts []clientv3.OpOption - wantKVs []*mvccpb.KeyValue - wantRevision int64 + name string + key string + opts []clientv3.OpOption + optsFunc func(latestRev int64) []clientv3.OpOption + wantKVs []*mvccpb.KeyValue }{ { - name: "single key within cache prefix", - key: "/foo/a", - opts: []clientv3.OpOption{clientv3.WithSerializable()}, - wantKVs: []*mvccpb.KeyValue{expectedFooA}, - wantRevision: latestRev, + name: "single key within cache prefix", + key: "/foo/a", + opts: []clientv3.OpOption{}, + wantKVs: []*mvccpb.KeyValue{expectedFooA}, + }, + { + name: "single key within cache prefix at latest/progress rev", + key: "/foo/a", + optsFunc: func(latestRev int64) []clientv3.OpOption { + return []clientv3.OpOption{clientv3.WithRev(latestRev)} + }, + wantKVs: []*mvccpb.KeyValue{expectedFooA}, + }, + { + name: "prefix query within cache prefix", + key: "/foo", + opts: []clientv3.OpOption{clientv3.WithPrefix()}, + wantKVs: []*mvccpb.KeyValue{expectedFooA}, }, { - name: "single key within cache prefix at latest/progress rev", - key: "/foo/a", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithRev(latestRev)}, - wantKVs: []*mvccpb.KeyValue{expectedFooA}, - wantRevision: latestRev, + name: "prefix query within cache prefix at latest/progress rev", + key: "/foo", + opts: []clientv3.OpOption{clientv3.WithPrefix()}, + optsFunc: func(latestRev int64) []clientv3.OpOption { + return []clientv3.OpOption{clientv3.WithRev(latestRev)} + }, + wantKVs: []*mvccpb.KeyValue{expectedFooA}, + }, + { + name: "range query within cache prefix", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithRange("/foo/b")}, + wantKVs: []*mvccpb.KeyValue{expectedFooA}, + }, + { + name: "range query within cache prefix at latest/progress rev", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithRange("/foo/z")}, + optsFunc: func(latestRev int64) []clientv3.OpOption { + return []clientv3.OpOption{clientv3.WithRev(latestRev)} + }, + wantKVs: []*mvccpb.KeyValue{expectedFooA}, }, { - name: "prefix query within cache prefix", - key: "/foo", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithPrefix()}, - wantKVs: []*mvccpb.KeyValue{expectedFooA}, - wantRevision: latestRev, + name: "single key within cache prefix serializable", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithSerializable()}, + wantKVs: []*mvccpb.KeyValue{expectedFooA}, }, { - name: "prefix query within cache prefix at latest/progress rev", - key: "/foo", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithPrefix(), clientv3.WithRev(latestRev)}, - wantKVs: []*mvccpb.KeyValue{expectedFooA}, - wantRevision: latestRev, + name: "single key within cache prefix at base rev serializable", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithRev(baseRev)}, + wantKVs: []*mvccpb.KeyValue{expectedFooA}, }, { - name: "range query within cache prefix", - key: "/foo/a", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithRange("/foo/b")}, - wantKVs: []*mvccpb.KeyValue{expectedFooA}, - wantRevision: latestRev, + name: "prefix query within cache prefix serializable", + key: "/foo", + opts: []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithSerializable()}, + wantKVs: []*mvccpb.KeyValue{expectedFooA}, }, { - name: "range query within cache prefix at latest/progress rev", - key: "/foo/a", - opts: []clientv3.OpOption{clientv3.WithSerializable(), clientv3.WithRange("/foo/z"), clientv3.WithRev(latestRev)}, - wantKVs: []*mvccpb.KeyValue{expectedFooA}, - wantRevision: latestRev, + name: "prefix query within cache prefix at base rev serializable", + key: "/foo", + opts: []clientv3.OpOption{clientv3.WithPrefix(), clientv3.WithSerializable(), clientv3.WithRev(baseRev)}, + wantKVs: []*mvccpb.KeyValue{expectedFooA}, + }, + { + name: "range query within cache prefix serializable", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithRange("/foo/b"), clientv3.WithSerializable()}, + wantKVs: []*mvccpb.KeyValue{expectedFooA}, + }, + { + name: "range query within cache prefix at base rev serializable", + key: "/foo/a", + opts: []clientv3.OpOption{clientv3.WithRange("/foo/z"), clientv3.WithSerializable(), clientv3.WithRev(baseRev)}, + wantKVs: []*mvccpb.KeyValue{expectedFooA}, }, } for _, tc := range testCases { tc := tc t.Run(tc.name, func(t *testing.T) { - resp, err := reader.Get(ctx, tc.key, tc.opts...) + latestRev += advanceRevision(t, cli) + + opts := tc.opts + if tc.optsFunc != nil { + opts = append(opts, tc.optsFunc(latestRev)...) + } + op := clientv3.OpGet(tc.key, opts...) + resp, err := reader.Get(ctx, tc.key, opts...) if err != nil { t.Fatalf("Get(%q): %v", tc.key, err) } @@ -989,14 +1139,23 @@ func testWithPrefixGet(t *testing.T, cli *clientv3.Client, getReader func() Gett t.Errorf("unexpected KVs (-want +got):\n%s", diff) } - if resp.Header.Revision != tc.wantRevision { - t.Errorf("Header.Revision=%d; want: %d", resp.Header.Revision, tc.wantRevision) + if op.IsSerializable() { + if resp.Header.Revision < baseRev { + t.Errorf("Header.Revision=%d; want >= %d", resp.Header.Revision, baseRev) + } + } else { + if resp.Header.Revision != latestRev { + t.Errorf("Header.Revision=%d; want: %d", resp.Header.Revision, latestRev) + } } }) } } func TestCacheWithPrefixGetOutOfScope(t *testing.T) { + if integration.ThroughProxy { + t.Skip("grpc proxy currently does not support requesting progress notifications") + } integration.BeforeTest(t) clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) t.Cleanup(func() { clus.Terminate(t) }) @@ -1184,6 +1343,9 @@ func TestCacheUnsupportedWatchOptions(t *testing.T) { } func TestCacheUnsupportedGetOptions(t *testing.T) { + if integration.ThroughProxy { + t.Skip("grpc proxy currently does not support requesting progress notifications") + } integration.BeforeTest(t) clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) t.Cleanup(func() { clus.Terminate(t) }) @@ -1210,7 +1372,6 @@ func TestCacheUnsupportedGetOptions(t *testing.T) { {"WithMaxModRevision", []clientv3.OpOption{clientv3.WithMaxModRev(10)}}, {"WithMinCreateRevision", []clientv3.OpOption{clientv3.WithMinCreateRev(3)}}, {"WithMaxCreateRevision", []clientv3.OpOption{clientv3.WithMaxCreateRev(5)}}, - {"NoSerializable", nil}, } for _, tc := range unsupported { @@ -1270,6 +1431,18 @@ func collectAndAssertAtomicEvents(t *testing.T, watch clientv3.WatchChan) (event } } +func advanceRevision(t *testing.T, kv clientv3.KV) int64 { + t.Helper() + n := rand.Intn(3) + 3 + for i := 0; i < n; i++ { + _, err := kv.Put(t.Context(), fmt.Sprintf("/bar/%d/%s", i, stringutil.RandString(10)), "v") + if err != nil { + t.Fatalf("Put: %v", err) + } + } + return int64(n) +} + func applyEvents(ctx context.Context, t *testing.T, kv clientv3.KV, evs []*clientv3.Event) int64 { var lastRev int64 for _, batches := range batchEventsByRevision(evs) {