Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (h *confHandler) GetConfig(w http.ResponseWriter, r *http.Request) {
mergedCfg := localCfg
mergedCfg.Replication = leaderCfg.Replication
mergedCfg.Schedule = leaderCfg.Schedule
mergedCfg.LeaderLease = leaderCfg.LeaderLease
h.rd.JSON(w, http.StatusOK, mergedCfg)
return
}
Expand Down Expand Up @@ -215,6 +216,8 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value any) er
return h.updateMicroserviceConfig(cfg, kp[len(kp)-1], value)
case "controller":
return h.updateControllerConfig(kp[len(kp)-1], value)
case "lease":
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider version-gating this API until all PD members understand the persisted lease. During rolling upgrade, an old PD can still persist lease: 0 in the config blob, which makes the new code ignore the online value and fall back to local toml on the next campaign.

return h.updateLeaderLease(cfg, value)
}
return errors.Errorf("config prefix %s not found", kp[0])
}
Expand Down Expand Up @@ -254,6 +257,20 @@ func (h *confHandler) updateMicroserviceConfig(config *config.Config, key string
return err
}

func (h *confHandler) updateLeaderLease(config *config.Config, value any) error {
updated, found, err := jsonutil.AddKeyValue(config, "lease", value)
if err != nil {
return err
}
if !found {
return errors.New("config item lease not found")
}
if updated {
return h.svr.SetLeaderLease(config.LeaderLease)
}
return nil
}

func (h *confHandler) updateSchedule(config *config.Config, key string, value any) error {
updated, found, err := jsonutil.AddKeyValue(&config.Schedule, key, value)
if err != nil {
Expand Down
90 changes: 90 additions & 0 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,96 @@ func TestReloadConfig(t *testing.T) {
re.Equal(int64(512), newOpt.GetMaxMovableHotPeerSize())
}

func TestLeaderLeasePersistAndReload(t *testing.T) {
re := require.New(t)
cfg := NewConfig()
re.NoError(cfg.Adjust(nil, false))
cfg.LeaderLease = 7
opt := NewPersistOptions(cfg)
opt.SetMaxReplicas(5)
storage := storage.NewStorageWithMemoryBackend()
re.NoError(opt.Persist(storage))

var stored struct {
LeaderLease int64 `json:"lease"`
}
exists, err := storage.LoadConfig(&stored)
re.NoError(err)
re.True(exists)
re.Equal(int64(7), stored.LeaderLease)

cfg.LeaderLease = 9
newOpt := NewPersistOptions(cfg)
re.NoError(newOpt.Reload(storage))
re.Equal(int64(7), newOpt.GetLeaderLease())
}

func TestLeaderLeaseSetPersistAndReload(t *testing.T) {
re := require.New(t)
opt, err := newTestScheduleOption()
re.NoError(err)
opt.SetLeaderLease(7)
storage := storage.NewStorageWithMemoryBackend()
re.NoError(opt.Persist(storage))
var stored struct {
LeaderLease int64 `json:"lease"`
}
exists, err := storage.LoadConfig(&stored)
re.NoError(err)
re.True(exists)
re.Equal(int64(7), stored.LeaderLease)

cfg := NewConfig()
re.NoError(cfg.Adjust(nil, false))
cfg.LeaderLease = 9
newOpt := NewPersistOptions(cfg)
re.NoError(newOpt.Reload(storage))
re.Equal(int64(7), newOpt.GetLeaderLease())
}

func TestLeaderLeaseReloadIgnoresNonPositivePersistedValues(t *testing.T) {
for _, persistedLease := range []int64{0, -1} {
t.Run(fmt.Sprintf("lease-%d", persistedLease), func(t *testing.T) {
re := require.New(t)
storage := storage.NewStorageWithMemoryBackend()
re.NoError(storage.SaveConfig(struct {
LeaderLease int64 `json:"lease"`
}{LeaderLease: persistedLease}))

cfg := NewConfig()
re.NoError(cfg.Adjust(nil, false))
cfg.LeaderLease = 7
opt := NewPersistOptions(cfg)
re.NoError(opt.Reload(storage))
re.Equal(int64(7), opt.GetLeaderLease())
})
}
}

func TestReloadKeepsCustomLeaderLeaseWhenPersistedConfigMissesLease(t *testing.T) {
re := require.New(t)
storage := storage.NewStorageWithMemoryBackend()

opt, err := newTestScheduleOption()
re.NoError(err)
type OldConfig struct {
Schedule sc.ScheduleConfig `toml:"schedule" json:"schedule"`
Replication sc.ReplicationConfig `toml:"replication" json:"replication"`
}
old := &OldConfig{
Schedule: *opt.GetScheduleConfig(),
Replication: *opt.GetReplicationConfig(),
}
re.NoError(storage.SaveConfig(old))

cfg := NewConfig()
re.NoError(cfg.Adjust(nil, false))
cfg.LeaderLease = 7
newOpt := NewPersistOptions(cfg)
re.NoError(newOpt.Reload(storage))
re.Equal(int64(7), newOpt.GetLeaderLease())
}

func TestReloadUpgrade(t *testing.T) {
re := require.New(t)
opt, err := newTestScheduleOption()
Expand Down
47 changes: 46 additions & 1 deletion server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package config

import (
"context"
"encoding/json"
"reflect"
"strconv"
"sync/atomic"
Expand Down Expand Up @@ -55,6 +56,7 @@ type PersistOptions struct {
keyspace atomic.Value
microservice atomic.Value
storeConfig atomic.Value
leaderLease atomic.Int64
clusterVersion unsafe.Pointer
}

Expand All @@ -68,6 +70,7 @@ func NewPersistOptions(cfg *Config) *PersistOptions {
o.labelProperty.Store(cfg.LabelProperty)
o.keyspace.Store(&cfg.Keyspace)
o.microservice.Store(&cfg.Microservice)
o.leaderLease.Store(cfg.LeaderLease)
// storeConfig will be fetched from TiKV later,
// set it to an empty config here first.
o.storeConfig.Store(&sc.StoreConfig{})
Expand Down Expand Up @@ -156,6 +159,16 @@ func (o *PersistOptions) SetStoreConfig(cfg *sc.StoreConfig) {
o.storeConfig.Store(cfg)
}

// GetLeaderLease returns the PD leader lease timeout in seconds.
func (o *PersistOptions) GetLeaderLease() int64 {
return o.leaderLease.Load()
}

// SetLeaderLease sets the PD leader lease timeout in seconds.
func (o *PersistOptions) SetLeaderLease(lease int64) {
o.leaderLease.Store(lease)
}

// GetClusterVersion returns the cluster version.
func (o *PersistOptions) GetClusterVersion() *semver.Version {
return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion))
Expand Down Expand Up @@ -761,6 +774,30 @@ type persistedConfig struct {
*Config
// StoreConfig is injected into Config to avoid breaking the original API.
StoreConfig sc.StoreConfig `json:"store"`
// leaseLoaded records whether the persisted etcd blob explicitly included
// the top-level "lease" field. Older blobs without that field should not
// silently overwrite the startup value with the default-filled lease.
leaseLoaded bool
}

// UnmarshalJSON records whether the persisted blob explicitly contains the
// leader lease field while preserving the default Config JSON decoding.
func (cfg *persistedConfig) UnmarshalJSON(data []byte) error {
if cfg.Config == nil {
cfg.Config = &Config{}
}
var fields map[string]json.RawMessage
if err := json.Unmarshal(data, &fields); err != nil {
return err
}
_, cfg.leaseLoaded = fields["lease"]

type persistedConfigAlias persistedConfig
return json.Unmarshal(data, (*persistedConfigAlias)(cfg))
}

func (cfg *persistedConfig) hasValidLeaderLease() bool {
return cfg.leaseLoaded && IsValidLeaderLease(cfg.LeaderLease)
}

// SwitchRaftV2 update some config if tikv raft engine switch into partition raft v2
Expand All @@ -781,6 +818,7 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error {
Keyspace: *o.GetKeyspaceConfig(),
Microservice: *o.GetMicroserviceConfig(),
ClusterVersion: *o.GetClusterVersion(),
LeaderLease: o.GetLeaderLease(),
},
StoreConfig: *o.GetStoreConfig(),
}
Expand All @@ -797,7 +835,6 @@ func (o *PersistOptions) Reload(storage endpoint.ConfigStorage) error {
if err := cfg.Adjust(nil, true); err != nil {
return err
}

isExist, err := storage.LoadConfig(cfg)
if err != nil {
return err
Expand All @@ -813,12 +850,20 @@ func (o *PersistOptions) Reload(storage endpoint.ConfigStorage) error {
o.labelProperty.Store(cfg.LabelProperty)
o.keyspace.Store(&cfg.Keyspace)
o.microservice.Store(&cfg.Microservice)
if cfg.hasValidLeaderLease() {
o.SetLeaderLease(cfg.LeaderLease)
}
o.storeConfig.Store(&cfg.StoreConfig)
o.SetClusterVersion(&cfg.ClusterVersion)
}
return nil
}

// IsValidLeaderLease returns whether the given PD leader lease timeout is valid.
func IsValidLeaderLease(lease int64) bool {
return lease > 0
}

func adjustScheduleCfg(scheduleCfg *sc.ScheduleConfig) {
// In case we add new default schedulers.
for _, ps := range sc.DefaultSchedulers {
Expand Down
36 changes: 31 additions & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1013,6 +1013,7 @@ func (s *Server) GetServiceMiddlewareConfig() *config.ServiceMiddlewareConfig {
// GetConfig gets the config information.
func (s *Server) GetConfig() *config.Config {
cfg := s.cfg.Clone()
cfg.LeaderLease = s.persistOptions.GetLeaderLease()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This keeps the leader response fresh, but follower GET /config still only merges schedule/replication from the leader. Can we also copy LeaderLease there so followers don't return the old lease after a POST?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in c334f2830 by copying LeaderLease from the leader config in the follower GET /config merge path.

cfg.Schedule = *s.persistOptions.GetScheduleConfig().Clone()
cfg.Replication = *s.persistOptions.GetReplicationConfig().Clone()
cfg.PDServerCfg = *s.persistOptions.GetPDServerConfig().Clone()
Expand Down Expand Up @@ -1356,6 +1357,25 @@ func (s *Server) SetPDServerConfig(cfg config.PDServerConfig) error {
return nil
}

// SetLeaderLease sets the PD leader lease timeout.
func (s *Server) SetLeaderLease(lease int64) error {
if !config.IsValidLeaderLease(lease) {
return errors.Errorf("leader lease must be positive, got %d", lease)
}
old := s.persistOptions.GetLeaderLease()
s.persistOptions.SetLeaderLease(lease)
if err := s.persistOptions.Persist(s.storage); err != nil {
s.persistOptions.SetLeaderLease(old)
log.Error("failed to update leader lease",
zap.Int64("new", lease),
zap.Int64("old", old),
errs.ZapError(err))
return err
}
log.Info("leader lease is updated", zap.Int64("new", lease), zap.Int64("old", old))
return nil
}

// SetLabelPropertyConfig sets the label property config.
func (s *Server) SetLabelPropertyConfig(cfg config.LabelPropertyConfig) error {
old := s.persistOptions.GetLabelPropertyConfig()
Expand Down Expand Up @@ -1737,7 +1757,13 @@ func (s *Server) leaderLoop() {

func (s *Server) campaignLeader() {
log.Info("start to campaign PD leader", zap.String("campaign-leader-name", s.Name()))
if err := s.member.Campaign(s.ctx, s.cfg.LeaderLease); err != nil {
if err := s.persistOptions.Reload(s.storage); err != nil {
log.Warn("failed to reload persisted configuration before campaign",
errs.ZapError(err),
zap.String("campaign-leader-name", s.Name()))
return
}
if err := s.member.Campaign(s.ctx, s.persistOptions.GetLeaderLease()); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign PD leader meets error due to txn conflict, another PD may campaign successfully",
zap.String("campaign-leader-name", s.Name()))
Expand Down Expand Up @@ -1770,12 +1796,12 @@ func (s *Server) campaignLeader() {
keepLeaderDuration := time.Since(keepLeaderStart)
log.Info("keep leader lease completed", zap.Duration("cost", keepLeaderDuration), zap.String("campaign-leader-name", s.Name()))

reloadConfigStart := time.Now()
postKeepReloadStart := time.Now()
if err := s.reloadConfigFromKV(); err != nil {
log.Warn("failed to reload configuration", errs.ZapError(err), zap.Duration("cost", time.Since(reloadConfigStart)))
log.Warn("failed to reload configuration", errs.ZapError(err), zap.Duration("cost", time.Since(postKeepReloadStart)))
return
}
reloadConfigDuration := time.Since(reloadConfigStart)
reloadConfigDuration := time.Since(postKeepReloadStart)
log.Info("reload config from KV completed", zap.Duration("cost", reloadConfigDuration))

loadTTLStart := time.Now()
Expand Down Expand Up @@ -2230,7 +2256,7 @@ func (s *Server) GetTSOProxyRecvFromClientTimeout() time.Duration {

// GetLease returns the leader lease.
func (s *Server) GetLease() int64 {
return s.cfg.GetLease()
return s.persistOptions.GetLeaderLease()
}

// GetTSOSaveInterval returns TSO save interval.
Expand Down
54 changes: 54 additions & 0 deletions tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,60 @@ func TestReconnect(t *testing.T) {
}
}

func TestLeaderLeaseConfigAPI(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestCluster(ctx, 3, func(conf *config.Config, _ string) {
conf.LeaderLease = 7
})
re.NoError(err)
defer cluster.Destroy()

re.NoError(cluster.RunInitialServers())
re.NotEmpty(cluster.WaitLeader())
leader := cluster.GetLeaderServer()
configURL := leader.GetAddr() + "/pd/api/v1/config"

var cfg map[string]any
err = testutil.ReadGetJSON(re, tests.TestDialClient, configURL, &cfg)
re.NoError(err)
re.Equal(float64(7), cfg["lease"])
_, hasLeader := cfg["leader"]
re.False(hasLeader)

data, err := json.Marshal(map[string]any{"lease": 10})
re.NoError(err)
err = testutil.CheckPostJSON(tests.TestDialClient, configURL, data, testutil.StatusOK(re))
re.NoError(err)
re.Equal(int64(10), leader.GetServer().GetPersistOptions().GetLeaderLease())

err = testutil.ReadGetJSON(re, tests.TestDialClient, configURL, &cfg)
re.NoError(err)
re.Equal(float64(10), cfg["lease"])
_, hasLeader = cfg["leader"]
re.False(hasLeader)

for _, s := range cluster.GetServers() {
if s.GetServer().Name() == leader.GetServer().Name() {
continue
}
followerConfigURL := s.GetAddr() + "/pd/api/v1/config"
err = testutil.ReadGetJSON(re, tests.TestDialClient, followerConfigURL, &cfg)
re.NoError(err)
re.Equal(float64(10), cfg["lease"])
break
}

for _, lease := range []int64{0, -1} {
data, err = json.Marshal(map[string]any{"lease": lease})
re.NoError(err)
err = testutil.CheckPostJSON(tests.TestDialClient, configURL, data, testutil.Status(re, http.StatusBadRequest))
re.NoError(err)
re.Equal(int64(10), leader.GetServer().GetPersistOptions().GetLeaderLease())
}
}

type middlewareTestSuite struct {
suite.Suite
cleanup func()
Expand Down
Loading