Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (gta *GlobalTSOAllocator) UpdateTSO() (err error) {
// next request succeeds with the new endpoint, according to https://github.com/etcd-io/etcd/issues/8711
maxRetryCount := 3
for range maxRetryCount {
_, err = gta.timestampOracle.UpdateTimestamp(intervalUpdate)
err = gta.timestampOracle.UpdateTimestamp()
if err == nil {
return nil
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/tso/local_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,7 @@ func (lta *LocalTSOAllocator) IsInitialize() bool {
// UpdateTSO is used to update the TSO in memory and the time window in etcd
// for all local TSO allocators this PD server hold.
func (lta *LocalTSOAllocator) UpdateTSO() error {
_, err := lta.timestampOracle.UpdateTimestamp(intervalUpdate)
return err
return lta.timestampOracle.UpdateTimestamp()
}

// SetTSO sets the physical part with given TSO.
Expand Down
2 changes: 0 additions & 2 deletions pkg/tso/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ type tsoMetrics struct {
notLeaderAnymoreEvent prometheus.Counter
logicalOverflowEvent prometheus.Counter
exceededMaxRetryEvent prometheus.Counter
notAllowedSaveTimestampEvent prometheus.Counter
// timestampOracle operation duration
syncSaveDuration prometheus.Observer
resetSaveDuration prometheus.Observer
Expand Down Expand Up @@ -154,7 +153,6 @@ func newTSOMetrics(groupID, dcLocation string) *tsoMetrics {
notLeaderAnymoreEvent: tsoCounter.WithLabelValues("not_leader_anymore", groupID, dcLocation),
logicalOverflowEvent: tsoCounter.WithLabelValues("logical_overflow", groupID, dcLocation),
exceededMaxRetryEvent: tsoCounter.WithLabelValues("exceeded_max_retry", groupID, dcLocation),
notAllowedSaveTimestampEvent: tsoCounter.WithLabelValues("not_allowed_save_timestamp", groupID, dcLocation),
syncSaveDuration: tsoOpDuration.WithLabelValues("sync_save", groupID, dcLocation),
resetSaveDuration: tsoOpDuration.WithLabelValues("reset_save", groupID, dcLocation),
updateSaveDuration: tsoOpDuration.WithLabelValues("update_save", groupID, dcLocation),
Expand Down
104 changes: 19 additions & 85 deletions pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,47 +91,19 @@ func (t *timestampOracle) getStorageTimeout() time.Duration {
return timeout - time.Second
}

// setTSOCondition is the condition for updating physical time.
// if it returns false, the physical time will not be updated.
type setTSOCondition func(t *timestampOracle) bool

// mustInitialized is used to check whether the TSO in memory has been initialized before updating physical time.
func mustInitialized() setTSOCondition {
return func(t *timestampOracle) bool {
return !t.tsoMux.physical.Equal(typeutil.ZeroTime)
}
}

// mustOverflowed is used to check whether the TSO in memory has been overflowed before updating physical time.
func mustOverflowed() setTSOCondition {
return func(t *timestampOracle) bool {
return overflowedLogical(t.tsoMux.logical)
}
}

// setTSOPhysical sets the TSO's physical part with the given time.
// It returns true if the TSO's logical part is overflowed, the caller should wait for the next physical tick.
// It must satisfy all the following conditions before updating physical time.
// If not met, it will skip updating physical time and return whether the logical part is overflowed.
func (t *timestampOracle) setTSOPhysical(next time.Time, conditions ...setTSOCondition) bool {
func (t *timestampOracle) setTSOPhysical(next time.Time, force bool) {
t.tsoMux.Lock()
defer t.tsoMux.Unlock()
// check all conditions before updating physical time, if any condition is not met,
// skip updating physical time and return whether the logical part is overflowed.
for _, condition := range conditions {
if !condition(t) {
return overflowedLogical(t.tsoMux.logical)
}
// Do not update the zero physical time if the `force` flag is false.
if t.tsoMux.physical == typeutil.ZeroTime && !force {
return
}

// make sure the ts won't fall back
if typeutil.SubTSOPhysicalByWallClock(next, t.tsoMux.physical) > 0 {
t.tsoMux.physical = next
t.tsoMux.logical = 0
t.tsoMux.updateTime = time.Now()
t.metrics.saveEvent.Inc()
}
return overflowedLogical(t.tsoMux.logical)
}

func (t *timestampOracle) getTSO() (time.Time, int64) {
Expand Down Expand Up @@ -262,7 +234,7 @@ func (t *timestampOracle) SyncTimestamp() error {
zap.Time("last", last), zap.Time("last-saved", lastSavedTime),
zap.Time("save", save), zap.Time("next", next))
// save into memory
t.setTSOPhysical(next)
t.setTSOPhysical(next, true)
return nil
}

Expand Down Expand Up @@ -339,18 +311,7 @@ func (t *timestampOracle) resetUserTimestampInner(leadership *election.Leadershi
return nil
}

type updatePurpose int

const (
intervalUpdate updatePurpose = iota
overflowUpdate
)

// updateTimestamp is used to update the timestamp.
// updatePurpose indicates the purpose of this update:
// - intervalUpdate: update timestamp due to the periodic interval.
// - overflowUpdate: update timestamp due to the logical overflow.
//
// UpdateTimestamp is used to update the timestamp.
// This function will do two things:
// 1. When the logical time is going to be used up, increase the current physical time.
// 2. When the time window is not big enough, which means the saved etcd time minus the next physical time
Expand All @@ -362,15 +323,11 @@ const (
// 2. The physical time is monotonically increasing.
// 3. The physical time is always less than the saved timestamp.
//
// The first returns a boolean indicates whether the logical time is overflowed, and the second returns an error if any error occurs.
// When met logical overflow, the caller should wait for the next physical tick to retry allocating TSO, which may cause more clock offset issues and etcd load,
// so we want to avoid it as much as possible by updating physical time in advance.
//
// NOTICE: this function should be called after the TSO in memory has been initialized
// and should not be called when the TSO in memory has been reset anymore.
func (t *timestampOracle) UpdateTimestamp(purpose updatePurpose) (bool, error) {
func (t *timestampOracle) UpdateTimestamp() error {
if !t.isInitialized() {
return false, errs.ErrUpdateTimestamp.FastGenByArgs("timestamp in memory has not been initialized")
return errs.ErrUpdateTimestamp.FastGenByArgs("timestamp in memory has not been initialized")
}
prevPhysical, prevLogical := t.getTSO()
t.metrics.tsoPhysicalGauge.Set(float64(prevPhysical.UnixNano() / int64(time.Millisecond)))
Expand All @@ -383,10 +340,10 @@ func (t *timestampOracle) UpdateTimestamp(purpose updatePurpose) (bool, error) {
failpoint.Inject("systemTimeSlow", func() {
now = now.Add(-time.Hour)
})
jetLag := typeutil.SubRealTimeByWallClock(now, prevPhysical)
t.metrics.tsoPhysicalGauge.Set(float64(prevPhysical.UnixNano() / int64(time.Millisecond)))
t.metrics.tsoPhysicalGapGauge.Set(float64(jetLag.Milliseconds()))

t.metrics.saveEvent.Inc()

jetLag := typeutil.SubRealTimeByWallClock(now, prevPhysical)
if jetLag > 3*t.updatePhysicalInterval && jetLag > jetLagWarningThreshold {
log.Warn("clock offset",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
Expand Down Expand Up @@ -415,19 +372,12 @@ func (t *timestampOracle) UpdateTimestamp(purpose updatePurpose) (bool, error) {
} else {
// It will still use the previous physical time to alloc the timestamp.
t.metrics.skipSaveEvent.Inc()
return false, nil
return nil
}

// It is not safe to increase the physical time to `next`.
// The time window needs to be updated and saved to etcd.
if typeutil.SubRealTimeByWallClock(t.getLastSavedTime(), next) <= UpdateTimestampGuard {
// Only IntervalUpdate is allowed to save timestamp into etcd.
// It would be dangerous to save timestamp into etcd when handling overflowUpdate.
// We don't want to update the physical time too frequently because of logical overflow, which may cause more clock offset issues and etcd load.
if purpose != intervalUpdate {
t.metrics.notAllowedSaveTimestampEvent.Inc()
return true, nil
}
save := next.Add(t.saveInterval)
start := time.Now()
ctx, cancelCtx := context.WithTimeout(t.client.Ctx(), t.getStorageTimeout())
Expand All @@ -439,19 +389,15 @@ func (t *timestampOracle) UpdateTimestamp(purpose updatePurpose) (bool, error) {
zap.String("timestamp-path", t.GetTimestampPath()),
zap.Error(err))
t.metrics.errSaveUpdateTSEvent.Inc()
return false, err
return err
}
t.lastSavedTime.Store(save)
t.metrics.updateSaveDuration.Observe(time.Since(start).Seconds())
}
// If it's an IntervalUpdate, we don't need to check logical overflow, just update physical time directly.
// Otherwise, the caller met logical overflow, so it will allocate physical time to alloc more timestamp in concurrent.
// So we need to check logical overflow before updating physical time to avoid allocating too much physical time due to logical overflow.
conditions := []setTSOCondition{mustInitialized()}
if purpose != intervalUpdate {
conditions = append(conditions, mustOverflowed())
}
return t.setTSOPhysical(next, conditions...), nil
// save into memory
t.setTSOPhysical(next, false)

return nil
}

var maxRetryCount = 10
Expand Down Expand Up @@ -479,21 +425,13 @@ func (t *timestampOracle) getTS(ctx context.Context, leadership *election.Leader
if resp.GetPhysical() == 0 {
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory has been reset")
}
if overflowedLogical(resp.GetLogical()) {
if resp.GetLogical() >= maxLogical {
log.Warn("logical part outside of max logical interval, please check ntp time, or adjust config item `tso-update-physical-interval`",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
zap.Reflect("response", resp),
zap.Int("retry-count", i), errs.ZapError(errs.ErrLogicOverflow))
t.metrics.logicalOverflowEvent.Inc()
if overflowed, err := t.UpdateTimestamp(overflowUpdate); err != nil {
log.Warn("update timestamp failed",
logutil.CondUint32("keyspace-group-id", t.keyspaceGroupID, t.keyspaceGroupID > 0),
zap.Bool("overflowed", overflowed),
zap.Error(err))
time.Sleep(t.updatePhysicalInterval)
} else if overflowed {
time.Sleep(t.updatePhysicalInterval)
}
time.Sleep(t.updatePhysicalInterval)
continue
}
// In case lease expired after the first check.
Expand All @@ -517,7 +455,3 @@ func (t *timestampOracle) ResetTimestamp() {
t.tsoMux.updateTime = typeutil.ZeroTime
t.lastSavedTime.Store(typeutil.ZeroTime)
}

func overflowedLogical(logical int64) bool {
return logical >= maxLogical
}
171 changes: 0 additions & 171 deletions pkg/tso/tso_test.go

This file was deleted.

Loading
Loading