Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions server/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (h *confHandler) GetConfig(w http.ResponseWriter, r *http.Request) {
mergedCfg := localCfg
mergedCfg.Replication = leaderCfg.Replication
mergedCfg.Schedule = leaderCfg.Schedule
mergedCfg.LeaderLease = leaderCfg.LeaderLease
h.rd.JSON(w, http.StatusOK, mergedCfg)
return
}
Expand Down Expand Up @@ -215,6 +216,8 @@ func (h *confHandler) updateConfig(cfg *config.Config, key string, value any) er
return h.updateMicroserviceConfig(cfg, kp[len(kp)-1], value)
case "controller":
return h.updateControllerConfig(kp[len(kp)-1], value)
case "lease":
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please consider version-gating this API until all PD members understand the persisted lease. During rolling upgrade, an old PD can still persist lease: 0 in the config blob, which makes the new code ignore the online value and fall back to local toml on the next campaign.

return h.updateLeaderLease(cfg, value)
}
return errors.Errorf("config prefix %s not found", kp[0])
}
Expand Down Expand Up @@ -254,6 +257,20 @@ func (h *confHandler) updateMicroserviceConfig(config *config.Config, key string
return err
}

func (h *confHandler) updateLeaderLease(config *config.Config, value any) error {
updated, found, err := jsonutil.AddKeyValue(config, "lease", value)
if err != nil {
return err
}
if !found {
return errors.New("config item lease not found")
}
if updated {
return h.svr.SetLeaderLease(config.LeaderLease)
}
return nil
}

func (h *confHandler) updateSchedule(config *config.Config, key string, value any) error {
updated, found, err := jsonutil.AddKeyValue(&config.Schedule, key, value)
if err != nil {
Expand Down
144 changes: 144 additions & 0 deletions server/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,150 @@ func TestReloadConfig(t *testing.T) {
re.Equal(int64(512), newOpt.GetMaxMovableHotPeerSize())
}

func TestLeaderLeasePersistAndReload(t *testing.T) {
re := require.New(t)
cfg := NewConfig()
re.NoError(cfg.Adjust(nil, false))
cfg.LeaderLease = 7
opt := NewPersistOptions(cfg)
opt.SetMaxReplicas(5)
storage := storage.NewStorageWithMemoryBackend()
re.NoError(opt.Persist(storage))

var stored struct {
LeaderLease int64 `json:"lease"`
}
exists, err := storage.LoadConfig(&stored)
re.NoError(err)
re.True(exists)
re.Equal(int64(7), stored.LeaderLease)

cfg.LeaderLease = 9
newOpt := NewPersistOptions(cfg)
re.NoError(newOpt.Reload(storage))
re.Equal(int64(7), newOpt.GetLeaderLease())
}

func TestLeaderLeaseSetPersistAndReload(t *testing.T) {
re := require.New(t)
opt, err := newTestScheduleOption()
re.NoError(err)
opt.SetLeaderLease(7)
storage := storage.NewStorageWithMemoryBackend()
re.NoError(opt.Persist(storage))
var stored struct {
LeaderLease int64 `json:"lease"`
}
exists, err := storage.LoadConfig(&stored)
re.NoError(err)
re.True(exists)
re.Equal(int64(7), stored.LeaderLease)

cfg := NewConfig()
re.NoError(cfg.Adjust(nil, false))
cfg.LeaderLease = 9
newOpt := NewPersistOptions(cfg)
re.NoError(newOpt.Reload(storage))
re.Equal(int64(7), newOpt.GetLeaderLease())
}

func TestLeaderLeaseReloadIgnoresNonPositivePersistedValues(t *testing.T) {
re := require.New(t)
assertReloadKeepsLeaderLeaseForPersistedValue := func(persistedLease int64) {
storage := storage.NewStorageWithMemoryBackend()
re.NoError(storage.SaveConfig(struct {
LeaderLease int64 `json:"lease"`
}{LeaderLease: persistedLease}))

cfg := NewConfig()
re.NoError(cfg.Adjust(nil, false))
cfg.LeaderLease = 7
opt := NewPersistOptions(cfg)
re.NoError(opt.Reload(storage))
re.Equal(int64(7), opt.GetLeaderLease())
}

assertReloadKeepsLeaderLeaseForPersistedValue(0)
assertReloadKeepsLeaderLeaseForPersistedValue(-1)
}

func TestReloadKeepsCustomLeaderLeaseWhenPersistedConfigMissesLease(t *testing.T) {
re := require.New(t)
storage := storage.NewStorageWithMemoryBackend()

opt, err := newTestScheduleOption()
re.NoError(err)
type OldConfig struct {
Schedule sc.ScheduleConfig `toml:"schedule" json:"schedule"`
Replication sc.ReplicationConfig `toml:"replication" json:"replication"`
}
old := &OldConfig{
Schedule: *opt.GetScheduleConfig(),
Replication: *opt.GetReplicationConfig(),
}
re.NoError(storage.SaveConfig(old))

cfg := NewConfig()
re.NoError(cfg.Adjust(nil, false))
cfg.LeaderLease = 7
newOpt := NewPersistOptions(cfg)
re.NoError(newOpt.Reload(storage))
re.Equal(int64(7), newOpt.GetLeaderLease())
}

func TestValidateLeaderLease(t *testing.T) {
re := require.New(t)
re.NoError(ValidateLeaderLease(1))
re.Error(ValidateLeaderLease(0))
re.Error(ValidateLeaderLease(-1))
}

func TestLoadPersistedLeaderLease(t *testing.T) {
re := require.New(t)
const inMemoryLease = int64(7)

newOpt := func() *PersistOptions {
cfg := NewConfig()
re.NoError(cfg.Adjust(nil, false))
cfg.LeaderLease = inMemoryLease
return NewPersistOptions(cfg)
}

st := storage.NewStorageWithMemoryBackend()
lease, err := newOpt().LoadPersistedLeaderLease(st)
re.NoError(err)
re.Equal(inMemoryLease, lease)

st = storage.NewStorageWithMemoryBackend()
writer, err := newTestScheduleOption()
re.NoError(err)
writer.SetLeaderLease(11)
re.NoError(writer.Persist(st))
lease, err = newOpt().LoadPersistedLeaderLease(st)
re.NoError(err)
re.Equal(int64(11), lease)

st = storage.NewStorageWithMemoryBackend()
re.NoError(st.SaveConfig(struct {
Schedule sc.ScheduleConfig `json:"schedule"`
}{}))
lease, err = newOpt().LoadPersistedLeaderLease(st)
re.NoError(err)
re.Equal(inMemoryLease, lease)

assertLoadKeepsLeaderLeaseForPersistedValue := func(persistedLease int64) {
st := storage.NewStorageWithMemoryBackend()
re.NoError(st.SaveConfig(struct {
LeaderLease int64 `json:"lease"`
}{LeaderLease: persistedLease}))
lease, err := newOpt().LoadPersistedLeaderLease(st)
re.NoError(err)
re.Equal(inMemoryLease, lease)
}
assertLoadKeepsLeaderLeaseForPersistedValue(0)
assertLoadKeepsLeaderLeaseForPersistedValue(-1)
}

func TestReloadUpgrade(t *testing.T) {
re := require.New(t)
opt, err := newTestScheduleOption()
Expand Down
82 changes: 81 additions & 1 deletion server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package config

import (
"context"
"encoding/json"
"reflect"
"strconv"
"sync/atomic"
Expand Down Expand Up @@ -55,6 +56,7 @@ type PersistOptions struct {
keyspace atomic.Value
microservice atomic.Value
storeConfig atomic.Value
leaderLease atomic.Int64
clusterVersion unsafe.Pointer
}

Expand All @@ -68,6 +70,7 @@ func NewPersistOptions(cfg *Config) *PersistOptions {
o.labelProperty.Store(cfg.LabelProperty)
o.keyspace.Store(&cfg.Keyspace)
o.microservice.Store(&cfg.Microservice)
o.leaderLease.Store(cfg.LeaderLease)
// storeConfig will be fetched from TiKV later,
// set it to an empty config here first.
o.storeConfig.Store(&sc.StoreConfig{})
Expand Down Expand Up @@ -156,6 +159,16 @@ func (o *PersistOptions) SetStoreConfig(cfg *sc.StoreConfig) {
o.storeConfig.Store(cfg)
}

// GetLeaderLease returns the PD leader lease timeout in seconds.
func (o *PersistOptions) GetLeaderLease() int64 {
return o.leaderLease.Load()
}

// SetLeaderLease sets the PD leader lease timeout in seconds.
func (o *PersistOptions) SetLeaderLease(lease int64) {
o.leaderLease.Store(lease)
}

// GetClusterVersion returns the cluster version.
func (o *PersistOptions) GetClusterVersion() *semver.Version {
return (*semver.Version)(atomic.LoadPointer(&o.clusterVersion))
Expand Down Expand Up @@ -761,6 +774,30 @@ type persistedConfig struct {
*Config
// StoreConfig is injected into Config to avoid breaking the original API.
StoreConfig sc.StoreConfig `json:"store"`
// leaseLoaded records whether the persisted etcd blob explicitly included
// the top-level "lease" field. Older blobs without that field should not
// silently overwrite the startup value with the default-filled lease.
leaseLoaded bool
}

// UnmarshalJSON records whether the persisted blob explicitly contains the
// leader lease field while preserving the default Config JSON decoding.
func (cfg *persistedConfig) UnmarshalJSON(data []byte) error {
if cfg.Config == nil {
cfg.Config = &Config{}
}
var fields map[string]json.RawMessage
if err := json.Unmarshal(data, &fields); err != nil {
return err
}
_, cfg.leaseLoaded = fields["lease"]

type persistedConfigAlias persistedConfig
return json.Unmarshal(data, (*persistedConfigAlias)(cfg))
}

func (cfg *persistedConfig) hasValidLeaderLease() bool {
return cfg.leaseLoaded && IsValidLeaderLease(cfg.LeaderLease)
}

// SwitchRaftV2 update some config if tikv raft engine switch into partition raft v2
Expand All @@ -781,6 +818,7 @@ func (o *PersistOptions) Persist(storage endpoint.ConfigStorage) error {
Keyspace: *o.GetKeyspaceConfig(),
Microservice: *o.GetMicroserviceConfig(),
ClusterVersion: *o.GetClusterVersion(),
LeaderLease: o.GetLeaderLease(),
},
StoreConfig: *o.GetStoreConfig(),
}
Expand All @@ -797,7 +835,6 @@ func (o *PersistOptions) Reload(storage endpoint.ConfigStorage) error {
if err := cfg.Adjust(nil, true); err != nil {
return err
}

isExist, err := storage.LoadConfig(cfg)
if err != nil {
return err
Expand All @@ -813,12 +850,55 @@ func (o *PersistOptions) Reload(storage endpoint.ConfigStorage) error {
o.labelProperty.Store(cfg.LabelProperty)
o.keyspace.Store(&cfg.Keyspace)
o.microservice.Store(&cfg.Microservice)
if cfg.hasValidLeaderLease() {
o.SetLeaderLease(cfg.LeaderLease)
}
o.storeConfig.Store(&cfg.StoreConfig)
o.SetClusterVersion(&cfg.ClusterVersion)
}
return nil
}

// LoadPersistedLeaderLease returns the leader lease that should be used for the
// next campaign. It reads only the persisted leader lease instead of reloading
// the whole configuration, so it is cheap enough to run on the election path.
// When the persisted blob carries no valid lease (e.g. a blob written before
// online lease update was supported), the current in-memory value is returned,
// preserving the previous campaign behavior. Storage and decode errors are
// returned to the caller, which decides whether to fall back.
func (o *PersistOptions) LoadPersistedLeaderLease(storage endpoint.ConfigStorage) (int64, error) {
failpoint.Inject("loadLeaderLeaseFail", func() {
failpoint.Return(int64(0), errors.New("failpoint: fail to load persisted leader lease"))
})
cfg := &persistedConfig{Config: &Config{}}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still unmarshals the full persisted config on the election path. Could we decode only a small {lease} struct here? That avoids extra work and prevents unrelated config decode issues from changing campaign behavior.

isExist, err := storage.LoadConfig(cfg)
if err != nil {
return 0, err
}
if isExist && cfg.hasValidLeaderLease() {
return cfg.LeaderLease, nil
}
return o.GetLeaderLease(), nil
}

// IsValidLeaderLease returns whether the given PD leader lease timeout is valid.
// It only checks positivity, because it also gates whether a persisted lease
// should be honored on reload.
func IsValidLeaderLease(lease int64) bool {
return lease > 0
}

// ValidateLeaderLease validates a leader lease (in seconds) supplied through
// the online update API. Keep the accepted range aligned with file config
// behavior: reject explicit non-positive updates, but do not add an online-only
// upper bound.
func ValidateLeaderLease(lease int64) error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add an upper bound here? Values larger than math.MaxInt64 / int64(time.Second) can be persisted successfully, but time.Duration(lease) * time.Second overflows later in election keepalive and may break the next campaign.

if !IsValidLeaderLease(lease) {
return errors.Errorf("leader lease must be positive, got %d", lease)
}
return nil
}

func adjustScheduleCfg(scheduleCfg *sc.ScheduleConfig) {
// In case we add new default schedulers.
for _, ps := range sc.DefaultSchedulers {
Expand Down
Loading
Loading