Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (gta *GlobalTSOAllocator) UpdateTSO() (err error) {
// next request succeeds with the new endpoint, according to https://github.com/etcd-io/etcd/issues/8711
maxRetryCount := 3
for range maxRetryCount {
err = gta.timestampOracle.UpdateTimestamp()
_, err = gta.timestampOracle.UpdateTimestamp(intervalUpdate)
if err == nil {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ func (lta *LocalTSOAllocator) IsInitialize() bool {
// UpdateTSO is used to update the TSO in memory and the time window in etcd
// for all local TSO allocators this PD server hold.
func (lta *LocalTSOAllocator) UpdateTSO() error {
return lta.timestampOracle.UpdateTimestamp()
_, err := lta.timestampOracle.UpdateTimestamp(intervalUpdate)
return err
}

// SetTSO sets the physical part with given TSO.
Expand Down
2 changes: 2 additions & 0 deletions pkg/tso/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ type tsoMetrics struct {
notLeaderAnymoreEvent prometheus.Counter
logicalOverflowEvent prometheus.Counter
exceededMaxRetryEvent prometheus.Counter
notAllowedSaveTimestampEvent prometheus.Counter
// timestampOracle operation duration
syncSaveDuration prometheus.Observer
resetSaveDuration prometheus.Observer
Expand Down Expand Up @@ -153,6 +154,7 @@ func newTSOMetrics(groupID, dcLocation string) *tsoMetrics {
notLeaderAnymoreEvent: tsoCounter.WithLabelValues("not_leader_anymore", groupID, dcLocation),
logicalOverflowEvent: tsoCounter.WithLabelValues("logical_overflow", groupID, dcLocation),
exceededMaxRetryEvent: tsoCounter.WithLabelValues("exceeded_max_retry", groupID, dcLocation),
notAllowedSaveTimestampEvent: tsoCounter.WithLabelValues("not_allowed_save_timestamp", groupID, dcLocation),
syncSaveDuration: tsoOpDuration.WithLabelValues("sync_save", groupID, dcLocation),
resetSaveDuration: tsoOpDuration.WithLabelValues("reset_save", groupID, dcLocation),
updateSaveDuration: tsoOpDuration.WithLabelValues("update_save", groupID, dcLocation),
Expand Down
104 changes: 85 additions & 19 deletions pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,47 @@ func (t *timestampOracle) getStorageTimeout() time.Duration {
return timeout - time.Second
}

func (t *timestampOracle) setTSOPhysical(next time.Time, force bool) {
// setTSOCondition is the condition for updating physical time.
// if it returns false, the physical time will not be updated.
type setTSOCondition func(t *timestampOracle) bool

// mustInitialized is used to check whether the TSO in memory has been initialized before updating physical time.
func mustInitialized() setTSOCondition {
return func(t *timestampOracle) bool {
return !t.tsoMux.physical.Equal(typeutil.ZeroTime)
}
}

// mustOverflowed is used to check whether the TSO in memory has been overflowed before updating physical time.
func mustOverflowed() setTSOCondition {
return func(t *timestampOracle) bool {
return overflowedLogical(t.tsoMux.logical)
}
}

// setTSOPhysical sets the TSO's physical part with the given time.
// It returns true if the TSO's logical part is overflowed, the caller should wait for the next physical tick.
// It must satisfy all the following conditions before updating physical time.
// If not met, it will skip updating physical time and return whether the logical part is overflowed.
func (t *timestampOracle) setTSOPhysical(next time.Time, conditions ...setTSOCondition) bool {
t.tsoMux.Lock()
defer t.tsoMux.Unlock()
// Do not update the zero physical time if the `force` flag is false.
if t.tsoMux.physical == typeutil.ZeroTime && !force {
return
// check all conditions before updating physical time, if any condition is not met,
// skip updating physical time and return whether the logical part is overflowed.
for _, condition := range conditions {
if !condition(t) {
return overflowedLogical(t.tsoMux.logical)
}
}

// make sure the ts won't fall back
if typeutil.SubTSOPhysicalByWallClock(next, t.tsoMux.physical) > 0 {
t.tsoMux.physical = next
t.tsoMux.logical = 0
t.tsoMux.updateTime = time.Now()
t.metrics.saveEvent.Inc()
}
return overflowedLogical(t.tsoMux.logical)
}

func (t *timestampOracle) getTSO() (time.Time, int64) {
Expand Down Expand Up @@ -234,7 +262,7 @@ func (t *timestampOracle) SyncTimestamp() error {
zap.Time("last", last), zap.Time("last-saved", lastSavedTime),
zap.Time("save", save), zap.Time("next", next))
// save into memory
t.setTSOPhysical(next, true)
t.setTSOPhysical(next)
return nil
}

Expand Down Expand Up @@ -311,7 +339,18 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi
return nil
}

// UpdateTimestamp is used to update the timestamp.
type updatePurpose int

const (
intervalUpdate updatePurpose = iota
overflowUpdate
)

// updateTimestamp is used to update the timestamp.
// updatePurpose indicates the purpose of this update:
// - intervalUpdate: update timestamp due to the periodic interval.
// - overflowUpdate: update timestamp due to the logical overflow.
//
// This function will do two things:
// 1. When the logical time is going to be used up, increase the current physical time.
// 2. When the time window is not big enough, which means the saved etcd time minus the next physical time
Expand All @@ -323,11 +362,15 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi
// 2. The physical time is monotonically increasing.
// 3. The physical time is always less than the saved timestamp.
//
// The first returns a boolean indicates whether the logical time is overflowed, and the second returns an error if any error occurs.
// When met logical overflow, the caller should wait for the next physical tick to retry allocating TSO, which may cause more clock offset issues and etcd load,
// so we want to avoid it as much as possible by updating physical time in advance.
//
// NOTICE: this function should be called after the TSO in memory has been initialized
// and should not be called when the TSO in memory has been reset anymore.
func (t *timestampOracle) UpdateTimestamp() error {
func (t *timestampOracle) UpdateTimestamp(purpose updatePurpose) (bool, error) {
if !t.isInitialized() {
return errs.ErrUpdateTimestamp.FastGenByArgs("timestamp in memory has not been initialized")
return false, errs.ErrUpdateTimestamp.FastGenByArgs("timestamp in memory has not been initialized")
}
prevPhysical, prevLogical := t.getTSO()
t.metrics.tsoPhysicalGauge.Set(float64(prevPhysical.UnixNano() / int64(time.Millisecond)))
Expand All @@ -340,10 +383,10 @@ func (t *timestampOracle) UpdateTimestamp() error {
failpoint.Inject("systemTimeSlow", func() {
now = now.Add(-time.Hour)
})

t.metrics.saveEvent.Inc()

jetLag := typeutil.SubRealTimeByWallClock(now, prevPhysical)
t.metrics.tsoPhysicalGauge.Set(float64(prevPhysical.UnixNano() / int64(time.Millisecond)))
t.metrics.tsoPhysicalGapGauge.Set(float64(jetLag.Milliseconds()))

if jetLag > 3*t.updatePhysicalInterval && jetLag > jetLagWarningThreshold {
log.Warn("clock offset",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
Expand Down Expand Up @@ -372,12 +415,19 @@ func (t *timestampOracle) UpdateTimestamp() error {
} else {
// It will still use the previous physical time to alloc the timestamp.
t.metrics.skipSaveEvent.Inc()
return nil
return false, nil
}

// It is not safe to increase the physical time to `next`.
// The time window needs to be updated and saved to etcd.
if typeutil.SubRealTimeByWallClock(t.getLastSavedTime(), next) <= UpdateTimestampGuard {
// Only IntervalUpdate is allowed to save timestamp into etcd.
// It would be dangerous to save timestamp into etcd when handling overflowUpdate.
// We don't want to update the physical time too frequently because of logical overflow, which may cause more clock offset issues and etcd load.
if purpose != intervalUpdate {
t.metrics.notAllowedSaveTimestampEvent.Inc()
return true, nil
}
save := next.Add(t.saveInterval)
start := time.Now()
ctx, cancelCtx := context.WithTimeout(t.client.Ctx(), t.getStorageTimeout())
Expand All @@ -389,15 +439,19 @@ func (t *timestampOracle) UpdateTimestamp() error {
zap.String("timestamp-path", t.GetTimestampPath()),
zap.Error(err))
t.metrics.errSaveUpdateTSEvent.Inc()
return err
return false, err
}
t.lastSavedTime.Store(save)
t.metrics.updateSaveDuration.Observe(time.Since(start).Seconds())
}
// save into memory
t.setTSOPhysical(next, false)

return nil
// If it's an IntervalUpdate, we don't need to check logical overflow, just update physical time directly.
// Otherwise, the caller met logical overflow, so it will allocate physical time to alloc more timestamp in concurrent.
// So we need to check logical overflow before updating physical time to avoid allocating too much physical time due to logical overflow.
conditions := []setTSOCondition{mustInitialized()}
if purpose != intervalUpdate {
conditions = append(conditions, mustOverflowed())
}
return t.setTSOPhysical(next, conditions...), nil
}

var maxRetryCount = 10
Expand Down Expand Up @@ -425,13 +479,21 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader
if resp.GetPhysical() == 0 {
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory has been reset")
}
if resp.GetLogical() >= maxLogical {
if overflowedLogical(resp.GetLogical()) {
log.Warn("logical part outside of max logical interval, please check ntp time, or adjust config item `tso-update-physical-interval`",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
zap.Reflect("response", resp),
zap.Int("retry-count", i), errs.ZapError(errs.ErrLogicOverflow))
t.metrics.logicalOverflowEvent.Inc()
time.Sleep(t.updatePhysicalInterval)
if overflowed, err := t.UpdateTimestamp(overflowUpdate); err != nil {
log.Warn("update timestamp failed",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
zap.Bool("overflowed", overflowed),
zap.Error(err))
time.Sleep(t.updatePhysicalInterval)
} else if overflowed {
time.Sleep(t.updatePhysicalInterval)
}
continue
}
// In case lease expired after the first check.
Expand All @@ -455,3 +517,7 @@ func (t *timestampOracle) ResetTimestamp() {
t.tsoMux.updateTime = typeutil.ZeroTime
t.lastSavedTime.Store(typeutil.ZeroTime)
}

func overflowedLogical(logical int64) bool {
return logical >= maxLogical
}
171 changes: 171 additions & 0 deletions pkg/tso/tso_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2026 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package tso

import (
"context"
"reflect"
"sync"
"sync/atomic"
"testing"
"time"
"unsafe"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/election"
"github.com/tikv/pd/pkg/utils/typeutil"
)

func mockLeadership() *election.Leadership {
leadership := &election.Leadership{}
lease := &election.Lease{}
expireTime := reflect.ValueOf(lease).Elem().FieldByName("expireTime")
expireTime = reflect.NewAt(expireTime.Type(), unsafe.Pointer(expireTime.UnsafeAddr())).Elem()
expireTime.Addr().Interface().(*atomic.Value).Store(time.Now().Add(time.Hour))
leadership.SetLease(lease)
return leadership
}

func TestLogicalOverflow(t *testing.T) {
re := require.New(t)
re.True(overflowedLogical(maxLogical))
re.False(overflowedLogical(maxLogical - 1))
}

func TestSetTSO(t *testing.T) {
re := require.New(t)
ts := &timestampOracle{
tsoMux: &tsoObject{
physical: typeutil.ZeroTime,
logical: 0,
},
metrics: newTSOMetrics("test", "dc"),
}
ts.setTSOPhysical(time.Now(), mustInitialized())
physical, _ := ts.getTSO()
re.Equal(typeutil.ZeroTime, physical)

ts.setTSOPhysical(time.Now())
physical, _ = ts.getTSO()
re.NotEqual(typeutil.ZeroTime, physical)

current := time.Now().Add(-time.Second)
ts = &timestampOracle{
tsoMux: &tsoObject{
physical: current,
logical: 0,
},
metrics: newTSOMetrics("test", "dc"),
}
ts.setTSOPhysical(time.Now(), mustOverflowed())
physical, _ = ts.getTSO()
re.Equal(current, physical)

ts.setTSOPhysical(time.Now())
physical, _ = ts.getTSO()
re.NotEqual(current, physical)
}

func TestGenerateTSO(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
current := time.Now()
timestampOracle := &timestampOracle{
tsoMux: &tsoObject{
physical: current,
logical: maxLogical - 1,
},
saveInterval: 5 * time.Second,
updatePhysicalInterval: 50 * time.Millisecond,
maxResetTSGap: func() time.Duration { return time.Hour },
metrics: newTSOMetrics("test", "dc"),
}
leadership := mockLeadership()

// update physical time interval failed due to reach the lastSavedTime, it needs to save storage first, but this behavior is not allowed.
_, err := timestampOracle.getTS(ctx, leadership, 2, 0)
re.Error(err)
physical, _ := timestampOracle.getTSO()
re.Equal(current, physical)

// simulate the save to storage operation is done.
timestampOracle.lastSavedTime.Store(time.Now().Add(5 * time.Second))
_, err = timestampOracle.getTS(ctx, leadership, 2, 0)
re.NoError(err)
physical, _ = timestampOracle.getTSO()
re.NotEqual(current, physical)
}

func TestCurrentGetTSO(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
current := time.Now()
timestampOracle := &timestampOracle{
tsoMux: &tsoObject{
physical: current,
logical: maxLogical - 1,
},
saveInterval: 5 * time.Second,
updatePhysicalInterval: 50 * time.Millisecond,
maxResetTSGap: func() time.Duration { return time.Hour },
metrics: newTSOMetrics("test", "dc"),
}
leadership := mockLeadership()
runDuration := 10 * time.Second
runCtx, runCancel := context.WithTimeout(ctx, runDuration-2*time.Second)
defer runCancel()

timestampOracle.lastSavedTime.Store(current.Add(runDuration))

wg := &sync.WaitGroup{}
concurrency := 20
errCh := make(chan error, 1)
wg.Add(concurrency)
changes := atomic.Int32{}
totalTso := atomic.Int32{}
for range concurrency {
go func() {
defer wg.Done()
for {
select {
case <-runCtx.Done():
return
default:
ts, err := timestampOracle.getTS(runCtx, leadership, 1, 0)
totalTso.Add(1)
if err != nil {
select {
case errCh <- err:
runCancel()
default:
}
}
if ts.Logical == 1 {
changes.Add(1)
}
}
}
}()
}

wg.Wait()
close(errCh)
for err := range errCh {
re.NoError(err)
}
re.LessOrEqual(changes.Load(), totalTso.Load()/int32(maxLogical)+1)
}
Loading
Loading