Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
)

var (
// Returned when an option combination isn’t yet handled by the cache (e.g. WithPrevKV, WithProgressNotify for Watch(), WithCountOnly for Get()).
// Returned when an option combination isn’t yet handled by the cache (e.g. WithPrevKV for Watch(), WithCountOnly for Get()).
ErrUnsupportedRequest = errors.New("cache: unsupported request parameters")
// Returned when the requested key or key‑range is invalid (empty or reversed) or lies outside c.prefix.
ErrKeyRangeInvalid = errors.New("cache: invalid or out‑of‑range key range")
Expand All @@ -52,6 +52,7 @@ type Cache struct {
waitGroup sync.WaitGroup
internalCtx context.Context
progressRequestor progressRequestor
clock Clock
}

// New builds a cache shard that watches only the requested prefix.
Expand All @@ -64,7 +65,10 @@ func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error)
for _, opt := range opts {
opt(&cfg)
}
return newCache(client, prefix, cfg, realClock{})
}

func newCache(client *clientv3.Client, prefix string, cfg Config, clock Clock) (*Cache, error) {
if cfg.HistoryWindowSize <= 0 {
return nil, fmt.Errorf("invalid HistoryWindowSize %d (must be > 0)", cfg.HistoryWindowSize)
}
Expand All @@ -83,7 +87,8 @@ func New(client *clientv3.Client, prefix string, opts ...Option) (*Cache, error)
ready: newReady(),
stop: cancel,
internalCtx: internalCtx,
progressRequestor: newConditionalProgressRequestor(client.Watcher, realClock{}, cfg.ProgressRequestInterval),
progressRequestor: newConditionalProgressRequestor(client.Watcher, clock, cfg.ProgressRequestInterval),
clock: clock,
}

cache.demux = NewDemux(internalCtx, &cache.waitGroup, cfg.HistoryWindowSize, cfg.ResyncInterval)
Expand Down Expand Up @@ -135,12 +140,24 @@ func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption
defer c.waitGroup.Done()
defer close(responseChan)
defer c.demux.Unregister(w)

var progressTimer Timer
var progressChan <-chan time.Time
if op.IsProgressNotify() {
progressTimer = c.clock.NewTimer(c.cfg.ProgressNotifyInterval)
defer progressTimer.Stop()
progressChan = progressTimer.Chan()
}

for {
select {
case <-ctx.Done():
return
case <-c.internalCtx.Done():
return
case <-progressChan:
c.demux.BroadcastProgressTo(w)
progressTimer.Reset(c.cfg.ProgressNotifyInterval)
case resp, ok := <-w.respCh:
if !ok {
if w.cancelResp != nil {
Expand All @@ -158,6 +175,9 @@ func (c *Cache) Watch(ctx context.Context, key string, opts ...clientv3.OpOption
case <-c.internalCtx.Done():
return
case responseChan <- resp:
if progressTimer != nil {
progressTimer.Reset(c.cfg.ProgressNotifyInterval)
}
}
}
}
Expand Down Expand Up @@ -404,8 +424,6 @@ func (c *Cache) validateWatch(key string, op clientv3.Op) (pred KeyPredicate, er
return nil, fmt.Errorf("%w: PrevKV not supported", ErrUnsupportedRequest)
case op.IsFragment():
return nil, fmt.Errorf("%w: Fragment not supported", ErrUnsupportedRequest)
case op.IsProgressNotify():
return nil, fmt.Errorf("%w: ProgressNotify not supported", ErrUnsupportedRequest)
case op.IsCreatedNotify():
return nil, fmt.Errorf("%w: CreatedNotify not supported", ErrUnsupportedRequest)
case op.IsFilterPut():
Expand Down
104 changes: 104 additions & 0 deletions cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,110 @@ func newCacheForWaitTest(serverRev int64, localRev int64, pr progressRequestor)
}, st
}

func setupWatcherWithFakeClock(t *testing.T, opts ...clientv3.OpOption) (clientv3.WatchChan, *mockWatcher, *fakeClock) {
t.Helper()
fc := newFakeClock()
mw := newMockWatcher(8)
fakeClient := &clientv3.Client{
Watcher: mw,
KV: newKVStub(),
}
cfg := defaultConfig()
cfg.ProgressNotifyInterval = 100 * time.Millisecond
c, err := newCache(fakeClient, "", cfg, fc)
if err != nil {
t.Fatalf("newCache: %v", err)
}
t.Cleanup(c.Close)

mw.responses <- clientv3.WatchResponse{}
<-mw.registered

ctxWait, cancelWait := context.WithTimeout(t.Context(), time.Second)
defer cancelWait()
if err := c.WaitReady(ctxWait); err != nil {
t.Fatalf("cache did not become Ready: %v", err)
}

watchOpts := append([]clientv3.OpOption{clientv3.WithPrefix()}, opts...)
watchCh := c.Watch(t.Context(), "", watchOpts...)

// Seed an event so the cache has a known revision (rev 5),
// then drain it from the watch channel.
mw.responses <- clientv3.WatchResponse{
Events: []*clientv3.Event{event(mvccpb.Event_PUT, "/a", 5)},
}
readResponse(t, watchCh)

return watchCh, mw, fc
}

func TestCacheWatchProgressNotify(t *testing.T) {
t.Run("watcher requesting notification, receives them periodically", func(t *testing.T) {
progressCh, _, fc := setupWatcherWithFakeClock(t, clientv3.WithProgressNotify())

t.Log("First interval — progress notification arrives")
fc.Advance(100 * time.Millisecond)
resp := readResponse(t, progressCh)
if !resp.IsProgressNotify() {
t.Fatalf("expected progress notify, got events: %v", resp.Events)
}

t.Log("Second interval — another progress notification arrives")
fc.Advance(100 * time.Millisecond)
resp = readResponse(t, progressCh)
if !resp.IsProgressNotify() {
t.Fatalf("expected progress notify, got events: %v", resp.Events)
}
})

t.Run("watcher that didn't request progress doesn't receive any", func(t *testing.T) {
plainCh, _, fc := setupWatcherWithFakeClock(t)

t.Log("Advance past the interval — plain watcher should not receive anything")
fc.Advance(150 * time.Millisecond)

select {
case got, ok := <-plainCh:
if ok {
t.Fatalf("expected no response on plain watcher, got: IsProgressNotify=%v events=%v", got.IsProgressNotify(), got.Events)
}
default:
}
})

t.Run("event resets timer and delays sending progress", func(t *testing.T) {
progressCh, mw, fc := setupWatcherWithFakeClock(t, clientv3.WithProgressNotify())

t.Log("Advance partway into the interval, then deliver an event to reset the timer")
fc.Advance(50 * time.Millisecond)
mw.responses <- clientv3.WatchResponse{
Events: []*clientv3.Event{event(mvccpb.Event_PUT, "/b", 6)},
}
readResponse(t, progressCh)

t.Log("100 ms since original start but only 50 ms since event reset — no progress notify")
fc.Advance(50 * time.Millisecond)
select {
case got, ok := <-progressCh:
if ok {
t.Fatalf("expected no progress notify within interval after event, got: IsProgressNotify=%v events=%v", got.IsProgressNotify(), got.Events)
}
default:
}

t.Log("Full interval after the event — progress notify arrives")
fc.Advance(50 * time.Millisecond)
resp := readResponse(t, progressCh)
if !resp.IsProgressNotify() {
t.Fatalf("expected progress notify, got events: %v", resp.Events)
}
if resp.Header.Revision != 6 {
t.Fatalf("expected progress revision 6, got %d", resp.Header.Revision)
}
})
}

func TestWaitTillRevision(t *testing.T) {
t.Run("cache_already_caught_up", func(t *testing.T) {
c, _ := newCacheForWaitTest(10, 10, newTestProgressRequestor())
Expand Down
7 changes: 7 additions & 0 deletions cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Config struct {
BTreeDegree int
// ProgressRequestInterval controls how often progress notifications are requested from the etcd watch stream during a consistent Get.
ProgressRequestInterval time.Duration
// ProgressNotifyInterval controls how often progress notifications are sent to local watchers registered with WithProgressNotify().
ProgressNotifyInterval time.Duration
}

// TODO: tune via performance/load tests.
Expand All @@ -51,6 +53,7 @@ func defaultConfig() Config {
WaitTimeout: 3 * time.Second,
BTreeDegree: 32,
ProgressRequestInterval: 100 * time.Millisecond,
ProgressNotifyInterval: 10 * time.Minute,
}
}

Expand Down Expand Up @@ -88,6 +91,10 @@ func WithProgressRequestInterval(d time.Duration) Option {
return func(c *Config) { c.ProgressRequestInterval = d }
}

func WithProgressNotifyInterval(d time.Duration) Option {
return func(c *Config) { c.ProgressNotifyInterval = d }
}

func WithWaitTimeout(d time.Duration) Option {
return func(c *Config) { c.WaitTimeout = d }
}
17 changes: 17 additions & 0 deletions cache/demux.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,23 @@ func (d *demux) BroadcastProgress() {
}
}

func (d *demux) BroadcastProgressTo(w *watcher) {
d.mu.RLock()
defer d.mu.RUnlock()
if d.maxRev == 0 {
return
}
if _, active := d.activeWatchers[w]; !active {
return
}
resp := clientv3.WatchResponse{
Header: etcdserverpb.ResponseHeader{
Revision: d.maxRev,
},
}
w.enqueueResponse(resp)
}

func (d *demux) updateStoreLocked(resp clientv3.WatchResponse) {
if resp.IsProgressNotify() {
d.maxRev = resp.Header.Revision
Expand Down
92 changes: 73 additions & 19 deletions cache/demux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,13 @@ func TestBroadcastProgress(t *testing.T) {

d.BroadcastProgress()

resps1 := readResponses(t, w1, 1)
require.Truef(t, resps1[0].IsProgressNotify(), "expected progress notify")
require.Equal(t, int64(10), resps1[0].Header.Revision)
resp1 := readResponse(t, w1.respCh)
require.Truef(t, resp1.IsProgressNotify(), "expected progress notify")
require.Equal(t, int64(10), resp1.Header.Revision)

resps2 := readResponses(t, w2, 1)
require.Truef(t, resps2[0].IsProgressNotify(), "expected progress notify")
require.Equal(t, int64(10), resps2[0].Header.Revision)
resp2 := readResponse(t, w2.respCh)
require.Truef(t, resp2.IsProgressNotify(), "expected progress notify")
require.Equal(t, int64(10), resp2.Header.Revision)
})

t.Run("is no-op when maxRev is zero", func(t *testing.T) {
Expand Down Expand Up @@ -386,9 +386,9 @@ func TestBroadcastProgress(t *testing.T) {

d.BroadcastProgress()

activeResps := readResponses(t, active, 1)
require.Truef(t, activeResps[0].IsProgressNotify(), "expected progress notify for active watcher")
require.Equal(t, int64(10), activeResps[0].Header.Revision)
activeResp := readResponse(t, active.respCh)
require.Truef(t, activeResp.IsProgressNotify(), "expected progress notify for active watcher")
require.Equal(t, int64(10), activeResp.Header.Revision)

select {
case <-lagging.respCh:
Expand All @@ -398,20 +398,74 @@ func TestBroadcastProgress(t *testing.T) {
})
}

func readResponses(t *testing.T, w *watcher, count int) []clientv3.WatchResponse {
t.Helper()
responses := make([]clientv3.WatchResponse, 0, count)
func TestBroadcastProgressTo(t *testing.T) {
t.Run("sends progress only to the target watcher", func(t *testing.T) {
d := newDemux(16, 10*time.Millisecond)
d.Init(1)
d.maxRev = 10

target := newWatcher(8, nil)
other := newWatcher(8, nil)
d.Register(target, 0)
d.Register(other, 0)

d.BroadcastProgressTo(target)

resp := readResponse(t, target.respCh)
require.Truef(t, resp.IsProgressNotify(), "expected progress notify")
require.Equal(t, int64(10), resp.Header.Revision)

for i := 0; i < count; i++ {
select {
case resp := <-w.respCh:
responses = append(responses, resp)
case <-time.After(2 * time.Second):
t.Fatalf("timed out waiting for response %d/%d (got %d)", i+1, count, len(responses))
case <-other.respCh:
t.Fatal("expected no progress notify for other watcher")
default:
}
}
})

t.Run("is no-op when maxRev is zero", func(t *testing.T) {
d := newDemux(16, 10*time.Millisecond)
d.Init(1)

w := newWatcher(8, nil)
d.Register(w, 0)

d.maxRev = 0
d.BroadcastProgressTo(w)

select {
case <-w.respCh:
t.Fatal("expected no response when maxRev is 0")
default:
}
})

t.Run("is no-op for lagging watcher", func(t *testing.T) {
d := newDemux(16, 10*time.Millisecond)
d.Init(1)
d.maxRev = 10

w := newWatcher(8, nil)
d.Register(w, 5) // startingRev <= maxRev => lagging

return responses
d.BroadcastProgressTo(w)

select {
case <-w.respCh:
t.Fatal("expected no progress notify for lagging watcher")
default:
}
})
}

func readResponse(t *testing.T, ch <-chan clientv3.WatchResponse) clientv3.WatchResponse {
t.Helper()
select {
case resp := <-ch:
return resp
case <-time.After(2 * time.Second):
t.Fatalf("timed out waiting for response")
return clientv3.WatchResponse{}
}
}

func respWithEventRevs(revs ...int64) clientv3.WatchResponse {
Expand Down
Loading
Loading