Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions pkg/election/lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ const (
revokeLeaseTimeout = time.Second
requestTimeout = etcdutil.DefaultRequestTimeout
slowRequestTime = etcdutil.DefaultSlowRequestTime
// maxLeaseKeepAliveInterval caps the keepalive cadence at 500ms, decoupling
// renewal frequency from the configured lease timeout. etcd's clientv3
// keepalive uses TTL/3 with no upper bound; we deviate so that operators who
// raise the lease TTL (e.g. to tolerate longer GC pauses) do not also slow
// down PD's leader-failover reaction time. PD has only a handful of leases
// per cluster, so the extra RPCs at large TTLs are negligible.
maxLeaseKeepAliveInterval = 500 * time.Millisecond
)

// Lease is used as the low-level mechanism for campaigning and renewing elected leadership.
Expand Down Expand Up @@ -147,6 +154,17 @@ func (l *Lease) loadExpireTime() time.Time {
return expireTime
}

// getKeepAliveInterval returns the interval used to drive the keepalive ticker.
// It takes the minimum of `leaseTimeout/3` and `maxLeaseKeepAliveInterval` so the
// renewal cadence never gets slower as the lease timeout grows.
func (l *Lease) getKeepAliveInterval() time.Duration {
interval := l.leaseTimeout / 3
if interval > maxLeaseKeepAliveInterval {
return maxLeaseKeepAliveInterval
}
return interval
}

// KeepAlive auto renews the lease and update expireTime.
func (l *Lease) KeepAlive(ctx context.Context) {
defer logutil.LogPanic()
Expand All @@ -156,7 +174,7 @@ func (l *Lease) KeepAlive(ctx context.Context) {
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
timeCh := l.keepAliveWorker(ctx, l.leaseTimeout/3)
timeCh := l.keepAliveWorker(ctx)
defer log.Info("lease keep alive stopped", zap.String("purpose", l.purpose))

var (
Expand Down Expand Up @@ -221,9 +239,10 @@ func (l *Lease) KeepAlive(ctx context.Context) {
}

// Periodically call `lease.KeepAliveOnce` and post back latest received expire time into the channel.
func (l *Lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-chan time.Time {
func (l *Lease) keepAliveWorker(ctx context.Context) <-chan time.Time {
ch := make(chan time.Time)

interval := l.getKeepAliveInterval()
go func() {
defer logutil.LogPanic()
ticker := time.NewTicker(interval)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the ticker to 500ms for large leases, but each KeepAliveOnce below still uses l.leaseTimeout as its context timeout. With a large lease and slow etcd, that can accumulate many overlapping RPCs. Please cap the request timeout separately, or limit in-flight attempts.

Expand Down
9 changes: 6 additions & 3 deletions pkg/election/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,11 @@ func TestLeaseKeepAlive(t *testing.T) {
lease := NewLease(client, "test_lease")

re.NoError(lease.Grant(defaultLeaseTimeout))
ch := lease.keepAliveWorker(ctx, 2*time.Second)
time.Sleep(2 * time.Second)
<-ch
ch := lease.keepAliveWorker(ctx)
select {
case <-ch:
case <-time.After(2 * lease.getKeepAliveInterval()):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not really verify the capped cadence: the first keepalive is sent immediately, and the timeout is derived from the implementation under test. Please add a direct interval test, or assert timing on the second keepalive.

re.Fail("timed out waiting for lease keepalive")
}
re.NoError(lease.Close())
}
Loading