From 4b6fe8f283090fdb1cd055a2e9bb6ec0e2aadcf8 Mon Sep 17 00:00:00 2001 From: tongjian <1045931706@qq.com> Date: Thu, 12 Feb 2026 17:27:26 +0800 Subject: [PATCH 1/2] tso:update tso timestamp when met logic exhaust (#10137) close tikv/pd#10138 1. Advance the physical timestamp by the GetTs request, not only by the ticker. 2. Only the ticker can save the timestamp into the storage when it reaches the save time to avoid frequent updating. Signed-off-by: bufferflies <1045931706@qq.com> --- pkg/tso/global_allocator.go | 2 +- pkg/tso/local_allocator.go | 3 +- pkg/tso/metrics.go | 2 + pkg/tso/tso.go | 104 ++++++++++++++++++---- pkg/tso/tso_test.go | 171 ++++++++++++++++++++++++++++++++++++ 5 files changed, 261 insertions(+), 21 deletions(-) create mode 100644 pkg/tso/tso_test.go diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index cffe84ed5c2..4230e0091ea 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -200,7 +200,7 @@ func (gta *GlobalTSOAllocator) UpdateTSO() (err error) { // next request succeeds with the new endpoint, according to https://github.com/etcd-io/etcd/issues/8711 maxRetryCount := 3 for range maxRetryCount { - err = gta.timestampOracle.UpdateTimestamp() + _, err = gta.timestampOracle.UpdateTimestamp(intervalUpdate) if err == nil { return nil } diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index 040b5891d12..52eebcf83d4 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -112,7 +112,8 @@ func (lta *LocalTSOAllocator) IsInitialize() bool { // UpdateTSO is used to update the TSO in memory and the time window in etcd // for all local TSO allocators this PD server hold. func (lta *LocalTSOAllocator) UpdateTSO() error { - return lta.timestampOracle.UpdateTimestamp() + _, err := lta.timestampOracle.UpdateTimestamp(intervalUpdate) + return err } // SetTSO sets the physical part with given TSO. diff --git a/pkg/tso/metrics.go b/pkg/tso/metrics.go index 02e72ebb376..12eb251f68b 100644 --- a/pkg/tso/metrics.go +++ b/pkg/tso/metrics.go @@ -116,6 +116,7 @@ type tsoMetrics struct { notLeaderAnymoreEvent prometheus.Counter logicalOverflowEvent prometheus.Counter exceededMaxRetryEvent prometheus.Counter + notAllowedSaveTimestampEvent prometheus.Counter // timestampOracle operation duration syncSaveDuration prometheus.Observer resetSaveDuration prometheus.Observer @@ -153,6 +154,7 @@ func newTSOMetrics(groupID, dcLocation string) *tsoMetrics { notLeaderAnymoreEvent: tsoCounter.WithLabelValues("not_leader_anymore", groupID, dcLocation), logicalOverflowEvent: tsoCounter.WithLabelValues("logical_overflow", groupID, dcLocation), exceededMaxRetryEvent: tsoCounter.WithLabelValues("exceeded_max_retry", groupID, dcLocation), + notAllowedSaveTimestampEvent: tsoCounter.WithLabelValues("not_allowed_save_timestamp", groupID, dcLocation), syncSaveDuration: tsoOpDuration.WithLabelValues("sync_save", groupID, dcLocation), resetSaveDuration: tsoOpDuration.WithLabelValues("reset_save", groupID, dcLocation), updateSaveDuration: tsoOpDuration.WithLabelValues("update_save", groupID, dcLocation), diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index fac8de03889..70f1cee5d7d 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -91,19 +91,47 @@ func (t *timestampOracle) getStorageTimeout() time.Duration { return timeout - time.Second } -func (t *timestampOracle) setTSOPhysical(next time.Time, force bool) { +// setTSOCondition is the condition for updating physical time. +// if it returns false, the physical time will not be updated. +type setTSOCondition func(t *timestampOracle) bool + +// mustInitialized is used to check whether the TSO in memory has been initialized before updating physical time. +func mustInitialized() setTSOCondition { + return func(t *timestampOracle) bool { + return !t.tsoMux.physical.Equal(typeutil.ZeroTime) + } +} + +// mustOverflowed is used to check whether the TSO in memory has been overflowed before updating physical time. +func mustOverflowed() setTSOCondition { + return func(t *timestampOracle) bool { + return overflowedLogical(t.tsoMux.logical) + } +} + +// setTSOPhysical sets the TSO's physical part with the given time. +// It returns true if the TSO's logical part is overflowed, the caller should wait for the next physical tick. +// It must satisfy all the following conditions before updating physical time. +// If not met, it will skip updating physical time and return whether the logical part is overflowed. +func (t *timestampOracle) setTSOPhysical(next time.Time, conditions ...setTSOCondition) bool { t.tsoMux.Lock() defer t.tsoMux.Unlock() - // Do not update the zero physical time if the `force` flag is false. - if t.tsoMux.physical == typeutil.ZeroTime && !force { - return + // check all conditions before updating physical time, if any condition is not met, + // skip updating physical time and return whether the logical part is overflowed. + for _, condition := range conditions { + if !condition(t) { + return overflowedLogical(t.tsoMux.logical) + } } + // make sure the ts won't fall back if typeutil.SubTSOPhysicalByWallClock(next, t.tsoMux.physical) > 0 { t.tsoMux.physical = next t.tsoMux.logical = 0 t.tsoMux.updateTime = time.Now() + t.metrics.saveEvent.Inc() } + return overflowedLogical(t.tsoMux.logical) } func (t *timestampOracle) getTSO() (time.Time, int64) { @@ -234,7 +262,7 @@ func (t *timestampOracle) SyncTimestamp() error { zap.Time("last", last), zap.Time("last-saved", lastSavedTime), zap.Time("save", save), zap.Time("next", next)) // save into memory - t.setTSOPhysical(next, true) + t.setTSOPhysical(next) return nil } @@ -311,7 +339,18 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi return nil } -// UpdateTimestamp is used to update the timestamp. +type updatePurpose int + +const ( + intervalUpdate updatePurpose = iota + overflowUpdate +) + +// updateTimestamp is used to update the timestamp. +// updatePurpose indicates the purpose of this update: +// - intervalUpdate: update timestamp due to the periodic interval. +// - overflowUpdate: update timestamp due to the logical overflow. +// // This function will do two things: // 1. When the logical time is going to be used up, increase the current physical time. // 2. When the time window is not big enough, which means the saved etcd time minus the next physical time @@ -323,11 +362,15 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi // 2. The physical time is monotonically increasing. // 3. The physical time is always less than the saved timestamp. // +// The first returns a boolean indicates whether the logical time is overflowed, and the second returns an error if any error occurs. +// When met logical overflow, the caller should wait for the next physical tick to retry allocating TSO, which may cause more clock offset issues and etcd load, +// so we want to avoid it as much as possible by updating physical time in advance. +// // NOTICE: this function should be called after the TSO in memory has been initialized // and should not be called when the TSO in memory has been reset anymore. -func (t *timestampOracle) UpdateTimestamp() error { +func (t *timestampOracle) UpdateTimestamp(purpose updatePurpose) (bool, error) { if !t.isInitialized() { - return errs.ErrUpdateTimestamp.FastGenByArgs("timestamp in memory has not been initialized") + return false, errs.ErrUpdateTimestamp.FastGenByArgs("timestamp in memory has not been initialized") } prevPhysical, prevLogical := t.getTSO() t.metrics.tsoPhysicalGauge.Set(float64(prevPhysical.UnixNano() / int64(time.Millisecond))) @@ -340,10 +383,10 @@ func (t *timestampOracle) UpdateTimestamp() error { failpoint.Inject("systemTimeSlow", func() { now = now.Add(-time.Hour) }) - - t.metrics.saveEvent.Inc() - jetLag := typeutil.SubRealTimeByWallClock(now, prevPhysical) + t.metrics.tsoPhysicalGauge.Set(float64(prevPhysical.UnixNano() / int64(time.Millisecond))) + t.metrics.tsoPhysicalGapGauge.Set(float64(jetLag.Milliseconds())) + if jetLag > 3*t.updatePhysicalInterval && jetLag > jetLagWarningThreshold { log.Warn("clock offset", logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0), @@ -372,12 +415,19 @@ func (t *timestampOracle) UpdateTimestamp() error { } else { // It will still use the previous physical time to alloc the timestamp. t.metrics.skipSaveEvent.Inc() - return nil + return false, nil } // It is not safe to increase the physical time to `next`. // The time window needs to be updated and saved to etcd. if typeutil.SubRealTimeByWallClock(t.getLastSavedTime(), next) <= UpdateTimestampGuard { + // Only IntervalUpdate is allowed to save timestamp into etcd. + // It would be dangerous to save timestamp into etcd when handling overflowUpdate. + // We don't want to update the physical time too frequently because of logical overflow, which may cause more clock offset issues and etcd load. + if purpose != intervalUpdate { + t.metrics.notAllowedSaveTimestampEvent.Inc() + return true, nil + } save := next.Add(t.saveInterval) start := time.Now() ctx, cancelCtx := context.WithTimeout(t.client.Ctx(), t.getStorageTimeout()) @@ -389,15 +439,19 @@ func (t *timestampOracle) UpdateTimestamp() error { zap.String("timestamp-path", t.GetTimestampPath()), zap.Error(err)) t.metrics.errSaveUpdateTSEvent.Inc() - return err + return false, err } t.lastSavedTime.Store(save) t.metrics.updateSaveDuration.Observe(time.Since(start).Seconds()) } - // save into memory - t.setTSOPhysical(next, false) - - return nil + // If it's an IntervalUpdate, we don't need to check logical overflow, just update physical time directly. + // Otherwise, the caller met logical overflow, so it will allocate physical time to alloc more timestamp in concurrent. + // So we need to check logical overflow before updating physical time to avoid allocating too much physical time due to logical overflow. + conditions := []setTSOCondition{mustInitialized()} + if purpose != intervalUpdate { + conditions = append(conditions, mustOverflowed()) + } + return t.setTSOPhysical(next, conditions...), nil } var maxRetryCount = 10 @@ -425,13 +479,21 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader if resp.GetPhysical() == 0 { return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory has been reset") } - if resp.GetLogical() >= maxLogical { + if overflowedLogical(resp.GetLogical()) { log.Warn("logical part outside of max logical interval, please check ntp time, or adjust config item `tso-update-physical-interval`", logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0), zap.Reflect("response", resp), zap.Int("retry-count", i), errs.ZapError(errs.ErrLogicOverflow)) t.metrics.logicalOverflowEvent.Inc() - time.Sleep(t.updatePhysicalInterval) + if overflowed, err := t.UpdateTimestamp(overflowUpdate); err != nil { + log.Warn("update timestamp failed", + logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0), + zap.Bool("overflowed", overflowed), + zap.Error(err)) + time.Sleep(t.updatePhysicalInterval) + } else if overflowed { + time.Sleep(t.updatePhysicalInterval) + } continue } // In case lease expired after the first check. @@ -455,3 +517,7 @@ func (t *timestampOracle) ResetTimestamp() { t.tsoMux.updateTime = typeutil.ZeroTime t.lastSavedTime.Store(typeutil.ZeroTime) } + +func overflowedLogical(logical int64) bool { + return logical >= maxLogical +} diff --git a/pkg/tso/tso_test.go b/pkg/tso/tso_test.go new file mode 100644 index 00000000000..2abcdcd0e2b --- /dev/null +++ b/pkg/tso/tso_test.go @@ -0,0 +1,171 @@ +// Copyright 2026 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tso + +import ( + "context" + "reflect" + "sync" + "sync/atomic" + "testing" + "time" + "unsafe" + + "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/election" + "github.com/tikv/pd/pkg/utils/typeutil" +) + +func mockLeadership() *election.Leadership { + leadership := &election.Leadership{} + lease := &election.Lease{} + expireTime := reflect.ValueOf(lease).Elem().FieldByName("expireTime") + expireTime = reflect.NewAt(expireTime.Type(), unsafe.Pointer(expireTime.UnsafeAddr())).Elem() + expireTime.Addr().Interface().(*atomic.Value).Store(time.Now().Add(time.Hour)) + leadership.SetLease(lease) + return leadership +} + +func TestLogicalOverflow(t *testing.T) { + re := require.New(t) + re.True(overflowedLogical(maxLogical)) + re.False(overflowedLogical(maxLogical - 1)) +} + +func TestSetTSO(t *testing.T) { + re := require.New(t) + ts := ×tampOracle{ + tsoMux: &tsoObject{ + physical: typeutil.ZeroTime, + logical: 0, + }, + metrics: newTSOMetrics("test", "dc"), + } + ts.setTSOPhysical(time.Now(), mustInitialized()) + physical, _ := ts.getTSO() + re.Equal(typeutil.ZeroTime, physical) + + ts.setTSOPhysical(time.Now()) + physical, _ = ts.getTSO() + re.NotEqual(typeutil.ZeroTime, physical) + + current := time.Now().Add(-time.Second) + ts = ×tampOracle{ + tsoMux: &tsoObject{ + physical: current, + logical: 0, + }, + metrics: newTSOMetrics("test", "dc"), + } + ts.setTSOPhysical(time.Now(), mustOverflowed()) + physical, _ = ts.getTSO() + re.Equal(current, physical) + + ts.setTSOPhysical(time.Now()) + physical, _ = ts.getTSO() + re.NotEqual(current, physical) +} + +func TestGenerateTSO(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + current := time.Now() + timestampOracle := ×tampOracle{ + tsoMux: &tsoObject{ + physical: current, + logical: maxLogical - 1, + }, + saveInterval: 5 * time.Second, + updatePhysicalInterval: 50 * time.Millisecond, + maxResetTSGap: func() time.Duration { return time.Hour }, + metrics: newTSOMetrics("test", "dc"), + } + leadership := mockLeadership() + + // update physical time interval failed due to reach the lastSavedTime, it needs to save storage first, but this behavior is not allowed. + _, err := timestampOracle.getTS(ctx, leadership, 2, 0) + re.Error(err) + physical, _ := timestampOracle.getTSO() + re.Equal(current, physical) + + // simulate the save to storage operation is done. + timestampOracle.lastSavedTime.Store(time.Now().Add(5 * time.Second)) + _, err = timestampOracle.getTS(ctx, leadership, 2, 0) + re.NoError(err) + physical, _ = timestampOracle.getTSO() + re.NotEqual(current, physical) +} + +func TestCurrentGetTSO(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + current := time.Now() + timestampOracle := ×tampOracle{ + tsoMux: &tsoObject{ + physical: current, + logical: maxLogical - 1, + }, + saveInterval: 5 * time.Second, + updatePhysicalInterval: 50 * time.Millisecond, + maxResetTSGap: func() time.Duration { return time.Hour }, + metrics: newTSOMetrics("test", "dc"), + } + leadership := mockLeadership() + runDuration := 10 * time.Second + runCtx, runCancel := context.WithTimeout(ctx, runDuration-2*time.Second) + defer runCancel() + + timestampOracle.lastSavedTime.Store(current.Add(runDuration)) + + wg := &sync.WaitGroup{} + concurrency := 20 + errCh := make(chan error, 1) + wg.Add(concurrency) + changes := atomic.Int32{} + totalTso := atomic.Int32{} + for range concurrency { + go func() { + defer wg.Done() + for { + select { + case <-runCtx.Done(): + return + default: + ts, err := timestampOracle.getTS(runCtx, leadership, 1, 0) + totalTso.Add(1) + if err != nil { + select { + case errCh <- err: + runCancel() + default: + } + } + if ts.Logical == 1 { + changes.Add(1) + } + } + } + }() + } + + wg.Wait() + close(errCh) + for err := range errCh { + re.NoError(err) + } + re.LessOrEqual(changes.Load(), totalTso.Load()/int32(maxLogical)+1) +} From 99ebff823f568f8c519c5a4bac302c1d9a32618a Mon Sep 17 00:00:00 2001 From: bufferflies <1045931706@qq.com> Date: Wed, 20 May 2026 11:52:48 +0800 Subject: [PATCH 2/2] pass ut Signed-off-by: bufferflies <1045931706@qq.com> --- tests/server/tso/global_tso_test.go | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/tests/server/tso/global_tso_test.go b/tests/server/tso/global_tso_test.go index bf39c57e3d6..fb7b5dd7864 100644 --- a/tests/server/tso/global_tso_test.go +++ b/tests/server/tso/global_tso_test.go @@ -156,7 +156,9 @@ func TestLogicalOverflow(t *testing.T) { re.NoError(err) defer tsoClient.CloseSend() - begin := time.Now() + var prevTS *pdpb.Timestamp + var firstTS *pdpb.Timestamp + var lastTS *pdpb.Timestamp for i := range 3 { req := &pdpb.TsoRequest{ Header: testutil.NewRequestHeader(clusterID), @@ -164,15 +166,23 @@ func TestLogicalOverflow(t *testing.T) { DcLocation: tso.GlobalDCLocation, } re.NoError(tsoClient.Send(req)) - _, err = tsoClient.Recv() + resp, err := tsoClient.Recv() re.NoError(err) - if i == 1 { - // the 2nd request may (but not must) overflow, as max logical interval is 262144 - re.Less(time.Since(begin), updateInterval+50*time.Millisecond) // additional 50ms for gRPC latency + currentTS := checkAndReturnTimestampResponse(re, req, resp) + if i == 0 { + firstTS = currentTS } + if prevTS != nil { + re.GreaterOrEqual(currentTS.GetPhysical(), prevTS.GetPhysical()) + if currentTS.GetPhysical() == prevTS.GetPhysical() { + re.Greater(currentTS.GetLogical(), prevTS.GetLogical()) + } + } + prevTS = currentTS + lastTS = currentTS } - // the 3rd request must overflow - re.GreaterOrEqual(time.Since(begin), updateInterval) + // 3 * 150000 > 262144, so the physical part must advance at least once. + re.Greater(lastTS.GetPhysical(), firstTS.GetPhysical()) } for _, updateInterval := range []int{1, 5, 30, 50} {