diff --git a/client/clients/gc/client.go b/client/clients/gc/client.go index e1b4ea05116..e9551680c00 100644 --- a/client/clients/gc/client.go +++ b/client/clients/gc/client.go @@ -18,6 +18,8 @@ import ( "context" "math" "time" + + "github.com/pingcap/errors" ) // Client is the interface for GC client. @@ -32,6 +34,45 @@ type Client interface { GetGCStatesClient(keyspaceID uint32) GCStatesClient } +// GCStatesAPIOptions represents all options for GC states API. +// +//nolint:revive +type GCStatesAPIOptions struct { + ExcludeGCBarriers bool + ExcludeGlobalGCBarriers bool +} + +// DefaultGCStatesAPIOptions returns the default options for GC states API. +func DefaultGCStatesAPIOptions() GCStatesAPIOptions { + return GCStatesAPIOptions{ + ExcludeGCBarriers: true, + ExcludeGlobalGCBarriers: true, + } +} + +// GCStatesAPIOption is the type of option for GC states API. +// +//nolint:revive +type GCStatesAPIOption func(*GCStatesAPIOptions) + +// ExcludeGCBarriers controls whether GetGCState and GetAllKeyspacesGCStates should exclude GC barriers. +// Enabling this can reduce the cost of reading GC states, and is recommended for most use cases. +// When GC barriers are needed, explicitly set false to this option. +func ExcludeGCBarriers(whetherToExclude bool) GCStatesAPIOption { + return func(opts *GCStatesAPIOptions) { + opts.ExcludeGCBarriers = whetherToExclude + } +} + +// ExcludeGlobalGCBarriers controls whether GetAllKeyspacesGCStates should exclude global GC barriers. +// Enabling this can reduce the cost of reading GC states, and is recommended for most use cases. +// When global GC barriers are needed, explicitly set false to this option. +func ExcludeGlobalGCBarriers(whetherToExclude bool) GCStatesAPIOption { + return func(opts *GCStatesAPIOptions) { + opts.ExcludeGlobalGCBarriers = whetherToExclude + } +} + // GCStatesClient is the interface for users to access GC states. // KeyspaceID is already bound to this type when created. // @@ -72,7 +113,7 @@ type GCStatesClient interface { // // When this method is called on a keyspace without keyspace-level GC enabled, it will be equivalent to calling it on // the NullKeyspace. - GetGCState(ctx context.Context) (GCState, error) + GetGCState(ctx context.Context, opts ...GCStatesAPIOption) (GCState, error) // SetGlobalGCBarrier sets a global GC barrier, which blocks GC like how GC barriers do, but is effective for all // keyspaces. This API is designed for some special needs to block GC of all keyspaces. // @@ -105,11 +146,11 @@ type GCStatesClient interface { SetGlobalGCBarrier(ctx context.Context, barrierID string, barrierTS uint64, ttl time.Duration) (*GlobalGCBarrierInfo, error) // DeleteGlobalGCBarrier deletes a global GC barrier. DeleteGlobalGCBarrier(ctx context.Context, barrierID string) (*GlobalGCBarrierInfo, error) - // Get the GC states from all keyspaces. + // GetAllKeyspacesGCStates gets GC states from all keyspaces. // The return value includes both GC states and global GC barriers information. // If a keyspace's state is not ENABLED(like DISABLE/ARCHIVED/TOMBSTONE), that keyspace is skipped. // If a keyspace is not configured with keyspace level GC, its GCState data is missing. - GetAllKeyspacesGCStates(ctx context.Context) (ClusterGCStates, error) + GetAllKeyspacesGCStates(ctx context.Context, opts ...GCStatesAPIOption) (ClusterGCStates, error) } // InternalController is the interface for controlling GC execution. @@ -245,16 +286,89 @@ func (b *GlobalGCBarrierInfo) isExpiredImpl(now time.Time) bool { //nolint:revive type GCState struct { // The ID of the keyspace this GC state belongs to. - KeyspaceID uint32 - TxnSafePoint uint64 - GCSafePoint uint64 - GCBarriers []*GCBarrierInfo + KeyspaceID uint32 + TxnSafePoint uint64 + GCSafePoint uint64 + hasGCBarriers bool + gcBarriers []*GCBarrierInfo +} + +// NewGCStateWithoutGCBarriers creates a GCState instance without GC barriers info. +func NewGCStateWithoutGCBarriers(keyspaceID uint32, txnSafePoint uint64, gcSafePoint uint64) GCState { + return GCState{ + KeyspaceID: keyspaceID, + TxnSafePoint: txnSafePoint, + GCSafePoint: gcSafePoint, + hasGCBarriers: false, + gcBarriers: nil, + } +} + +// NewGCStateWithGCBarriers creates a GCState instance with GC barriers info. +func NewGCStateWithGCBarriers(keyspaceID uint32, txnSafePoint uint64, gcSafePoint uint64, gcBarriers []*GCBarrierInfo) GCState { + return GCState{ + KeyspaceID: keyspaceID, + TxnSafePoint: txnSafePoint, + GCSafePoint: gcSafePoint, + hasGCBarriers: true, + gcBarriers: gcBarriers, + } +} + +// HasGCBarriers returns whether the GCState instance carries GC barriers info. Note that valid GC barriers info +// can be empty. +func (s GCState) HasGCBarriers() bool { + return s.hasGCBarriers +} + +// GetGCBarriers retrieves GC barriers from the GCState instance, or returns an error if it doesn't carry any GC barrier +// info. +func (s GCState) GetGCBarriers() ([]*GCBarrierInfo, error) { + if !s.HasGCBarriers() { + return nil, errors.New("trying to get GC barriers from GCState that doesn't provide GC barriers info. " + + "to retrieve GC barriers, pass false to excludeGCBarriers parameter to GC APIs") + } + return s.gcBarriers, nil } // ClusterGCStates represents the information of the GC state for all keyspaces. type ClusterGCStates struct { // Maps from keyspace id to GC state of that keyspace. - GCStates map[uint32]GCState - // All existing global GC barriers. - GlobalGCBarriers []*GlobalGCBarrierInfo + GCStates map[uint32]GCState + hasGlobalGCBarriers bool + globalGCBarriers []*GlobalGCBarrierInfo +} + +// NewClusterGCStatesWithoutGlobalGCBarriers creates a ClusterGCStates instance without global GC barriers info. +func NewClusterGCStatesWithoutGlobalGCBarriers(gcStates map[uint32]GCState) ClusterGCStates { + return ClusterGCStates{ + GCStates: gcStates, + hasGlobalGCBarriers: false, + globalGCBarriers: nil, + } +} + +// NewClusterGCStatesWithGlobalGCBarriers creates a ClusterGCStates instance with global GC barriers info. +func NewClusterGCStatesWithGlobalGCBarriers(gcStates map[uint32]GCState, globalGCBarriers []*GlobalGCBarrierInfo) ClusterGCStates { + return ClusterGCStates{ + GCStates: gcStates, + hasGlobalGCBarriers: true, + globalGCBarriers: globalGCBarriers, + } +} + +// HasGlobalGCBarriers returns whether the ClusterGCStates instance carries global GC barriers info. Note that valid +// global GC barriers info can be empty. +func (s ClusterGCStates) HasGlobalGCBarriers() bool { + return s.hasGlobalGCBarriers +} + +// GetGlobalGCBarriers retrieves global GC barriers from the ClusterGCStates instance, or returns an error if it doesn't +// carry any global GC barrier info. +func (s ClusterGCStates) GetGlobalGCBarriers() ([]*GlobalGCBarrierInfo, error) { + if !s.HasGlobalGCBarriers() { + return nil, errors.New("trying to get global GC barriers from ClusterGCStates that doesn't provide global GC barriers info. " + + "to retrieve global GC barriers, pass false to excludeGlobalGCBarriers parameter to GC APIs") + } + return s.globalGCBarriers, nil } diff --git a/client/clients/gc/client_test.go b/client/clients/gc/client_test.go index e96b7c83999..0361d2fa2c3 100644 --- a/client/clients/gc/client_test.go +++ b/client/clients/gc/client_test.go @@ -58,3 +58,42 @@ func TestGCBarrierInfoExpiration(t *testing.T) { re.False(b1.isExpiredImpl(now)) re.False(b1.isExpiredImpl(now.Add(time.Hour * 24 * 365 * 10))) } + +func TestGCStateAccessors(t *testing.T) { + re := require.New(t) + + state := NewGCStateWithoutGCBarriers(1, 2, 3) + re.False(state.HasGCBarriers()) + barriers, err := state.GetGCBarriers() + re.Error(err) + re.Nil(barriers) + + state = NewGCStateWithGCBarriers(1, 2, 3, []*GCBarrierInfo{ + NewGCBarrierInfo("b1", 4, time.Second, time.Now()), + }) + re.True(state.HasGCBarriers()) + barriers, err = state.GetGCBarriers() + re.NoError(err) + re.Len(barriers, 1) + re.Equal("b1", barriers[0].BarrierID) +} + +func TestClusterGCStatesAccessors(t *testing.T) { + re := require.New(t) + + state := NewGCStateWithoutGCBarriers(1, 2, 3) + clusterState := NewClusterGCStatesWithoutGlobalGCBarriers(map[uint32]GCState{1: state}) + re.False(clusterState.HasGlobalGCBarriers()) + barriers, err := clusterState.GetGlobalGCBarriers() + re.Error(err) + re.Nil(barriers) + + clusterState = NewClusterGCStatesWithGlobalGCBarriers(map[uint32]GCState{1: state}, []*GlobalGCBarrierInfo{ + NewGlobalGCBarrierInfo("b1", 4, time.Second, time.Now()), + }) + re.True(clusterState.HasGlobalGCBarriers()) + barriers, err = clusterState.GetGlobalGCBarriers() + re.NoError(err) + re.Len(barriers, 1) + re.Equal("b1", barriers[0].BarrierID) +} diff --git a/client/gc_client.go b/client/gc_client.go index 9d813fd0180..40548a36edc 100644 --- a/client/gc_client.go +++ b/client/gc_client.go @@ -295,7 +295,7 @@ func (c gcStatesClient) DeleteGCBarrier(ctx context.Context, barrierID string) ( } // GetGCState gets the current GC state. -func (c gcStatesClient) GetGCState(ctx context.Context) (gc.GCState, error) { +func (c gcStatesClient) GetGCState(ctx context.Context, opts ...gc.GCStatesAPIOption) (gc.GCState, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetGCState", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -303,11 +303,17 @@ func (c gcStatesClient) GetGCState(ctx context.Context) (gc.GCState, error) { start := time.Now() defer func() { metrics.CmdDurationGetGCState.Observe(time.Since(start).Seconds()) }() + options := gc.DefaultGCStatesAPIOptions() + for _, opt := range opts { + opt(&options) + } + ctx, cancel := context.WithTimeout(ctx, c.client.inner.option.Timeout) defer cancel() req := &pdpb.GetGCStateRequest{ - Header: c.client.requestHeader(), - KeyspaceScope: wrapKeyspaceScope(c.keyspaceID), + Header: c.client.requestHeader(), + KeyspaceScope: wrapKeyspaceScope(c.keyspaceID), + ExcludeGcBarriers: options.ExcludeGCBarriers, } protoClient, ctx := c.client.getClientAndContext(ctx) if protoClient == nil { @@ -319,10 +325,10 @@ func (c gcStatesClient) GetGCState(ctx context.Context) (gc.GCState, error) { } gcState := resp.GetGcState() - return pbToGCState(gcState, start), nil + return pbToGCState(gcState, start, options.ExcludeGCBarriers), nil } -func pbToGCState(pb *pdpb.GCState, reqStartTime time.Time) gc.GCState { +func pbToGCState(pb *pdpb.GCState, reqStartTime time.Time, excludeGCBarriers bool) gc.GCState { keyspaceID := constants.NullKeyspaceID if pb.KeyspaceScope != nil { keyspaceID = pb.KeyspaceScope.KeyspaceId @@ -331,12 +337,10 @@ func pbToGCState(pb *pdpb.GCState, reqStartTime time.Time) gc.GCState { for _, b := range pb.GetGcBarriers() { gcBarriers = append(gcBarriers, pbToGCBarrierInfo(b, reqStartTime)) } - return gc.GCState{ - KeyspaceID: keyspaceID, - TxnSafePoint: pb.GetTxnSafePoint(), - GCSafePoint: pb.GetGcSafePoint(), - GCBarriers: gcBarriers, + if !excludeGCBarriers { + return gc.NewGCStateWithGCBarriers(keyspaceID, pb.GetTxnSafePoint(), pb.GetGcSafePoint(), gcBarriers) } + return gc.NewGCStateWithoutGCBarriers(keyspaceID, pb.GetTxnSafePoint(), pb.GetGcSafePoint()) } // SetGlobalGCBarrier sets (creates or updates) a global GC barrier. @@ -394,7 +398,7 @@ func (c gcStatesClient) DeleteGlobalGCBarrier(ctx context.Context, barrierID str } // GetAllKeyspacesGCStates gets the GC states from all keyspaces. -func (c gcStatesClient) GetAllKeyspacesGCStates(ctx context.Context) (gc.ClusterGCStates, error) { +func (c gcStatesClient) GetAllKeyspacesGCStates(ctx context.Context, opts ...gc.GCStatesAPIOption) (gc.ClusterGCStates, error) { if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { span = span.Tracer().StartSpan("pdclient.GetAllKeyspacesGCState", opentracing.ChildOf(span.Context())) defer span.Finish() @@ -402,10 +406,17 @@ func (c gcStatesClient) GetAllKeyspacesGCStates(ctx context.Context) (gc.Cluster start := time.Now() defer func() { metrics.CmdDurationGetAllKeyspacesGCStates.Observe(time.Since(start).Seconds()) }() + options := gc.DefaultGCStatesAPIOptions() + for _, opt := range opts { + opt(&options) + } + ctx, cancel := context.WithTimeout(ctx, c.client.inner.option.Timeout) defer cancel() req := &pdpb.GetAllKeyspacesGCStatesRequest{ - Header: c.client.requestHeader(), + Header: c.client.requestHeader(), + ExcludeGcBarriers: options.ExcludeGCBarriers, + ExcludeGlobalGcBarriers: options.ExcludeGlobalGCBarriers, } protoClient, ctx := c.client.getClientAndContext(ctx) if protoClient == nil { @@ -417,8 +428,7 @@ func (c gcStatesClient) GetAllKeyspacesGCStates(ctx context.Context) (gc.Cluster return gc.ClusterGCStates{}, err } - var ret gc.ClusterGCStates - ret.GCStates = make(map[uint32]gc.GCState, len(resp.GetGcStates())) + gcStates := make(map[uint32]gc.GCState, len(resp.GetGcStates())) for _, state := range resp.GetGcStates() { var keyspaceID uint32 if state.KeyspaceScope == nil { @@ -426,10 +436,15 @@ func (c gcStatesClient) GetAllKeyspacesGCStates(ctx context.Context) (gc.Cluster } else { keyspaceID = state.KeyspaceScope.KeyspaceId } - ret.GCStates[keyspaceID] = pbToGCState(state, start) + gcStates[keyspaceID] = pbToGCState(state, start, options.ExcludeGCBarriers) } + if options.ExcludeGlobalGCBarriers { + return gc.NewClusterGCStatesWithoutGlobalGCBarriers(gcStates), nil + } + + globalGCBarriers := make([]*gc.GlobalGCBarrierInfo, 0, len(resp.GetGlobalGcBarriers())) for _, barrier := range resp.GetGlobalGcBarriers() { - ret.GlobalGCBarriers = append(ret.GlobalGCBarriers, pbToGlobalGCBarrierInfo(barrier, start)) + globalGCBarriers = append(globalGCBarriers, pbToGlobalGCBarrierInfo(barrier, start)) } - return ret, nil + return gc.NewClusterGCStatesWithGlobalGCBarriers(gcStates, globalGCBarriers), nil } diff --git a/client/go.mod b/client/go.mod index b8181aebbde..df7c208681a 100644 --- a/client/go.mod +++ b/client/go.mod @@ -10,7 +10,7 @@ require ( github.com/opentracing/opentracing-go v1.2.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 - github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359 + github.com/pingcap/kvproto v0.0.0-20260514102340-daa7c864b473 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.20.5 github.com/stretchr/testify v1.9.0 diff --git a/client/go.sum b/client/go.sum index 6a12f558c40..2916b077851 100644 --- a/client/go.sum +++ b/client/go.sum @@ -53,8 +53,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= -github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359 h1:oteLtLuoWZN3uvfH836U0IIJ+s3UOk11q7GaQ0Tk+wc= -github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE= +github.com/pingcap/kvproto v0.0.0-20260514102340-daa7c864b473 h1:n6QWAac97mv2NJhn17iFPFnsE5fMgtPLNmsGZeqq78o= +github.com/pingcap/kvproto v0.0.0-20260514102340-daa7c864b473/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/go.mod b/go.mod index a3a993b884e..ff8905d4680 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 - github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359 + github.com/pingcap/kvproto v0.0.0-20260514102340-daa7c864b473 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pingcap/metering_sdk v0.0.0-20260203082503-b9f282339654 github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21 diff --git a/go.sum b/go.sum index a8c6345a634..dcdd0d8a419 100644 --- a/go.sum +++ b/go.sum @@ -480,8 +480,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359 h1:oteLtLuoWZN3uvfH836U0IIJ+s3UOk11q7GaQ0Tk+wc= -github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE= +github.com/pingcap/kvproto v0.0.0-20260514102340-daa7c864b473 h1:n6QWAac97mv2NJhn17iFPFnsE5fMgtPLNmsGZeqq78o= +github.com/pingcap/kvproto v0.0.0-20260514102340-daa7c864b473/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/pkg/gc/gc_state_manager.go b/pkg/gc/gc_state_manager.go index 1aed4b98fab..42387e2d131 100644 --- a/pkg/gc/gc_state_manager.go +++ b/pkg/gc/gc_state_manager.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "slices" + "sync/atomic" "time" "go.uber.org/zap" @@ -102,9 +103,85 @@ import ( // 4. (weakened) For each GC barrier `b`, each advancement of the txn safe point should not push it to a new value // that is larger than `b.BarrierTS` (`t' <= max{t, min(b.BarrierTS for b in B)}` where `B` is the set of GC // barriers). -// -// TODO: Explicitly state the versions that GCStateManager starts to be functional and the old APIs/concepts/terms is -// deprecated when these work are all done. + +type gcStateCacheEntry struct { + TxnSafePoint uint64 + GCSafePoint uint64 +} + +const gcStateCacheShardBits = 5 // 32 shards + +type gcStateCacheShard struct { + mu syncutil.RWMutex + gcStateCache map[uint32]gcStateCacheEntry +} + +type gcStateCache struct { + shards [1 << gcStateCacheShardBits]gcStateCacheShard +} + +func newGCStateCache() *gcStateCache { + c := &gcStateCache{} + for i := range len(c.shards) { + c.shards[i] = gcStateCacheShard{ + gcStateCache: make(map[uint32]gcStateCacheEntry), + } + } + return c +} + +func (c *gcStateCache) getShard(keyspaceID uint32) *gcStateCacheShard { + return &c.shards[keyspaceID&((1<" } +// OnNodeBecomesLeader marks the current PD node as leader for GC state watches. +func (m *GCStateManager) OnNodeBecomesLeader() { + m.mu.Lock() + defer m.mu.Unlock() + + m.nodeLeadership.Add(1) + + // Also trigger cache invalidation even when transitioning from follower to leader, as a protection against + // potential inconsistent cache state left from the last leadership. + m.gcStateCache.clearAll() +} + +// OnNodeBecomesFollower marks the current PD node as follower and closes all existing GC state watches. +func (m *GCStateManager) OnNodeBecomesFollower() { + m.mu.Lock() + defer m.mu.Unlock() + + m.nodeLeadership.Add(-1) + + // Invalidate the cache. + m.gcStateCache.clearAll() +} + +func (m *GCStateManager) nodeIsLeader() bool { + return m.nodeLeadership.Load() > 0 +} + // redirectKeyspace checks the given keyspaceID, and returns the actual keyspaceID to operate on. // // This function also returns the target keyspace name for diagnostic purpose. But note that it returns a user-friendly @@ -185,6 +301,10 @@ func (m *GCStateManager) CompatibleLoadGCSafePoint(keyspaceID uint32) (uint64, e return 0, err } + if cachedGCState, ok := m.gcStateCache.load(keyspaceID); ok { + return cachedGCState.GCSafePoint, nil + } + // No need to acquire the lock as a single-key read operation is atomic. return m.gcMetaStorage.LoadGCSafePoint(keyspaceID) } @@ -227,6 +347,7 @@ func (m *GCStateManager) CompatibleUpdateGCSafePoint(keyspaceID uint32, target u func (m *GCStateManager) advanceGCSafePointImpl(ctx context.Context, keyspaceID uint32, target uint64, compatible bool) (oldGCSafePoint uint64, newGCSafePoint uint64, err error) { newGCSafePoint = target + var txnSafePoint uint64 err = m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error { var err1 error @@ -247,7 +368,7 @@ func (m *GCStateManager) advanceGCSafePointImpl(ctx context.Context, keyspaceID // Otherwise, return error to reject the operation explicitly. return errs.ErrDecreasingGCSafePoint.GenWithStackByArgs(oldGCSafePoint, target) } - txnSafePoint, err1 := m.gcMetaStorage.LoadTxnSafePoint(keyspaceID) + txnSafePoint, err1 = m.gcMetaStorage.LoadTxnSafePoint(keyspaceID) if err1 != nil { return err1 } @@ -261,9 +382,17 @@ func (m *GCStateManager) advanceGCSafePointImpl(ctx context.Context, keyspaceID log.Error("failed to advance GC safe point", zap.Uint32("keyspace-id", keyspaceID), zap.String("keyspace-name", getKeyspaceNameFromCtx(ctx)), zap.Uint64("target", target), zap.Bool("compatible-mode", compatible), zap.Error(err)) + // Invalidate cache on error. + m.gcStateCache.remove(keyspaceID) return 0, 0, err } + // Update cache. + m.gcStateCache.store(keyspaceID, gcStateCacheEntry{ + TxnSafePoint: txnSafePoint, + GCSafePoint: newGCSafePoint, + }) + if newGCSafePoint != oldGCSafePoint { log.Info("advanced GC safe point", zap.Uint32("keyspace-id", keyspaceID), zap.String("keyspace-name", getKeyspaceNameFromCtx(ctx)), @@ -328,6 +457,7 @@ func (m *GCStateManager) advanceTxnSafePointImpl(ctx context.Context, keyspaceID blockingBarrier *endpoint.GCBarrier blockingGlobalBarrier *endpoint.GlobalGCBarrier blockingMinStartTSOwner *string + gcSafePoint uint64 ) err := m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error { @@ -336,6 +466,10 @@ func (m *GCStateManager) advanceTxnSafePointImpl(ctx context.Context, keyspaceID if err1 != nil { return err1 } + gcSafePoint, err1 = m.gcMetaStorage.LoadGCSafePoint(keyspaceID) + if err1 != nil { + return err1 + } if target < oldTxnSafePoint { return errs.ErrDecreasingTxnSafePoint.GenWithStackByArgs(oldTxnSafePoint, target) @@ -420,9 +554,21 @@ func (m *GCStateManager) advanceTxnSafePointImpl(ctx context.Context, keyspaceID return wb.SetTxnSafePoint(keyspaceID, newTxnSafePoint) }) if err != nil { + log.Error("failed to advance GC safe point", + zap.Uint32("keyspace-id", keyspaceID), zap.String("keyspace-name", getKeyspaceNameFromCtx(ctx)), + zap.Uint64("old-txn-safe-point", oldTxnSafePoint), zap.Uint64("target", target), + zap.Uint64("new-txn-safe-point", newTxnSafePoint), zap.Bool("downgrade-compatible-mode", downgradeCompatibleMode), zap.Error(err)) + // Invalidate cache on error. + m.gcStateCache.remove(keyspaceID) return AdvanceTxnSafePointResult{}, err } + // Update cache. + m.gcStateCache.store(keyspaceID, gcStateCacheEntry{ + TxnSafePoint: newTxnSafePoint, + GCSafePoint: gcSafePoint, + }) + blockerDesc := "" simulatedServiceID := "" // Note the order of check blockingGlobalBarrier/blockingMinStartTSOwner/blockingBarrier @@ -627,12 +773,82 @@ func (m *GCStateManager) deleteGCBarrierImpl(ctx context.Context, keyspaceID uin return deletedBarrier, nil } +func (m *GCStateManager) getGCStateImpl(keyspaceID uint32, excludeGCBarriers bool) (GCState, error) { + // Try getting from cache if possible. + if excludeGCBarriers && m.nodeIsLeader() { + if cachedGCState, ok := m.gcStateCache.load(keyspaceID); ok { + failpoint.InjectCall("getGCStateCacheAccess", "hit") + gcStateCacheAccessHitCounter.Inc() + return GCState{ + KeyspaceID: keyspaceID, + IsKeyspaceLevel: keyspaceID != constant.NullKeyspaceID, + TxnSafePoint: cachedGCState.TxnSafePoint, + GCSafePoint: cachedGCState.GCSafePoint, + }, nil + } + } + + return m.getGCStateImplSlow(keyspaceID, excludeGCBarriers) +} + +func (m *GCStateManager) getGCStateImplSlow(keyspaceID uint32, excludeGCBarriers bool) (GCState, error) { + // Keep this hook before taking the manager lock so leader transitions are + // not blocked while tests pin the request at the slow-path boundary. + failpoint.InjectCall("getGCStateBeforeSlowPath") + + m.mu.RLock() + defer m.mu.RUnlock() + + if excludeGCBarriers && m.nodeIsLeader() { + // Check cache again after entering the lock. + // When entering this slow path after a cache miss, it's possible that some other concurrent call has caused + // cache update before we acquire the mutex. + if cachedGCState, ok := m.gcStateCache.load(keyspaceID); ok { + failpoint.InjectCall("getGCStateCacheAccess", "slow_hit") + gcStateCacheAccessSlowHitCounter.Inc() + return GCState{ + KeyspaceID: keyspaceID, + IsKeyspaceLevel: keyspaceID != constant.NullKeyspaceID, + TxnSafePoint: cachedGCState.TxnSafePoint, + GCSafePoint: cachedGCState.GCSafePoint, + }, nil + } + } + + failpoint.InjectCall("getGCStateCacheAccess", "miss") + gcStateCacheAccessMissCounter.Inc() + + var result GCState + err := m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error { + var err1 error + result, err1 = m.getGCStateInTransaction(keyspaceID, excludeGCBarriers, wb) + return err1 + }) + if err != nil { + return GCState{}, err + } + + // Update cache on successfully load GC states from IO. + // Note that we don't update cache when excludeGCBarriers is set to false. The reason is that, when + // excludeGCBarrier is set and the code executes to here, it must be because that there was a cache miss; but if + // it's not set, as it won't go through the cache, here it's likely that the cache is actually valid, and updating + // it causes unnecessary overhead and locking. + if excludeGCBarriers { + m.gcStateCache.store(keyspaceID, gcStateCacheEntry{ + TxnSafePoint: result.TxnSafePoint, + GCSafePoint: result.GCSafePoint, + }) + } + + return result, nil +} + // getGCStateInTransaction gets all properties in GC states within a context of gcMetaStorage.RunInGCStateTransaction. // This read only and won't write anything to the GCStateWriteBatch. It still receives a write batch to ensure // it's running in a in-transaction context. // The parameter `keyspaceID` is expected to be either the NullKeyspaceID or the ID of a keyspace that has // keyspace-level GC enabled. Otherwise, the result would be undefined. -func (m *GCStateManager) getGCStateInTransaction(keyspaceID uint32, _ *endpoint.GCStateWriteBatch) (GCState, error) { +func (m *GCStateManager) getGCStateInTransaction(keyspaceID uint32, excludeGCBarriers bool, _ *endpoint.GCStateWriteBatch) (GCState, error) { result := GCState{ KeyspaceID: keyspaceID, } @@ -653,16 +869,18 @@ func (m *GCStateManager) getGCStateInTransaction(keyspaceID uint32, _ *endpoint. return GCState{}, err } - result.GCBarriers, err = m.gcMetaStorage.LoadAllGCBarriers(keyspaceID) - if err != nil { - return GCState{}, err - } + if !excludeGCBarriers { + result.GCBarriers, err = m.gcMetaStorage.LoadAllGCBarriers(keyspaceID) + if err != nil { + return GCState{}, err + } - // Remove GC barrier whose barrierID is "gc_worker", which is only exists for providing compatibility with the old - // versions. - result.GCBarriers = slices.DeleteFunc(result.GCBarriers, func(b *endpoint.GCBarrier) bool { - return b.BarrierID == keypath.GCWorkerServiceSafePointID - }) + // Remove GC barrier whose barrierID is "gc_worker", which is only exists for providing compatibility with the old + // versions. + result.GCBarriers = slices.DeleteFunc(result.GCBarriers, func(b *endpoint.GCBarrier) bool { + return b.BarrierID == keypath.GCWorkerServiceSafePointID + }) + } return result, nil } @@ -671,22 +889,20 @@ func (m *GCStateManager) getGCStateInTransaction(keyspaceID uint32, _ *endpoint. // // When this method is called on a keyspace without keyspace-level GC enabled, it will be equivalent to calling it on // the NullKeyspace. -func (m *GCStateManager) GetGCState(keyspaceID uint32) (GCState, error) { - keyspaceID, _, err := m.redirectKeyspace(keyspaceID, true) +func (m *GCStateManager) GetGCState(keyspaceID uint32, excludeGCBarriers bool) (GCState, error) { + keyspaceID, keyspaceName, err := m.redirectKeyspace(keyspaceID, true) if err != nil { return GCState{}, err } - m.mu.Lock() - defer m.mu.Unlock() - var result GCState - err = m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error { - var err1 error - result, err1 = m.getGCStateInTransaction(keyspaceID, wb) - return err1 - }) + gcState, err := m.getGCStateImpl(keyspaceID, excludeGCBarriers) + + if err != nil { + log.Error("failed to get GC satete", zap.Uint32("keyspace-id", keyspaceID), + zap.String("keyspace-name", keyspaceName), zap.Bool("exclude-gc-barriers", excludeGCBarriers), zap.Error(err)) + } - return result, err + return gcState, err } // GetAllKeyspacesGCStates returns the GC state of all keyspaces. @@ -698,96 +914,102 @@ func (m *GCStateManager) GetGCState(keyspaceID uint32) (GCState, error) { // the caller should NEVER change the content of the returned result and error. It's guaranteed that the returned result // must be fetched AFTER the beginning of the current invocation, and it never reuses the result of invocations that // started earlier than the current one. -func (m *GCStateManager) GetAllKeyspacesGCStates(ctx context.Context) (map[uint32]GCState, error) { - return m.allKeyspacesGCStatesSingleFlight.Do(ctx, func(execCtx context.Context) (map[uint32]GCState, error) { - result, err := m.getAllKeyspacesGCStatesImpl(execCtx) +func (m *GCStateManager) GetAllKeyspacesGCStates(ctx context.Context, excludeGCBarriers bool) (map[uint32]GCState, error) { + if !excludeGCBarriers { + return m.allKeyspacesGCStatesSingleFlight.Do(ctx, func(execCtx context.Context) (map[uint32]GCState, error) { + result := make(map[uint32]GCState) + err := m.iterateAllKeyspacesGCStates(execCtx, excludeGCBarriers, + func(_keyspaceID uint32) bool { return true }, + func(gcState GCState) { + result[gcState.KeyspaceID] = gcState + }) + failpoint.Inject("onGetAllKeyspacesGCStatesFinish", func() {}) + return result, err + }) + } + + // If excludeGCBarriers is set, most required information should be available in the cache. + // Note that the invocation with and without `excludeGCBarriers` should go to different singleflights, otherwise + // invocation with different parameters may share their results incorrectly. + return m.allKeyspacesGCStatesExcludeGCBarriersSingleFlight.Do(ctx, func(execCtx context.Context) (map[uint32]GCState, error) { + cachedGCStates := make(map[uint32]GCState) + if m.nodeIsLeader() { + cachedGCStates = m.gcStateCache.cloneAllAsGCStates() + } + err := m.iterateAllKeyspacesGCStates(execCtx, excludeGCBarriers, func(keyspaceID uint32) bool { + _, ok := cachedGCStates[keyspaceID] + return !ok + }, func(gcState GCState) { + cachedGCStates[gcState.KeyspaceID] = gcState + }) failpoint.Inject("onGetAllKeyspacesGCStatesFinish", func() {}) - return result, err + return cachedGCStates, err }) } -func (m *GCStateManager) getAllKeyspacesGCStatesImpl(ctx context.Context) (map[uint32]GCState, error) { +// iterateAllKeyspacesGCStates iterates GC states of all keyspaces, and calls the given callback on each of them. +func (m *GCStateManager) iterateAllKeyspacesGCStates( + ctx context.Context, + excludeGCBarriers bool, + keyspacePred func(keyspaceID uint32) bool, + cb func(GCState), +) error { failpoint.InjectCall("onGetAllKeyspacesGCStatesStart") - - mutexLocked := false - lock := func() { - m.mu.Lock() - mutexLocked = true - } - unlock := func() { - m.mu.Unlock() - mutexLocked = false - } - - ensureUnlocked := func() { - if mutexLocked { - unlock() + failpoint.Inject("iterateAllKeyspacesGCStatesError", func(val failpoint.Value) { + if errMsg, ok := val.(string); ok { + failpoint.Return(errors.New(errMsg)) } - } - defer ensureUnlocked() + failpoint.Return(errors.New("mock iterate all keyspaces gc states error")) + }) keyspaceIterator := m.keyspaceManager.IterateKeyspaces() // Do not guarantee atomicity among different keyspaces here. - results := make(map[uint32]GCState) - lock() - err := m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error { - nullKeyspaceState, err1 := m.getGCStateInTransaction(constant.NullKeyspaceID, wb) - if err1 != nil { - return err1 + + if keyspacePred(constant.NullKeyspaceID) { + nullKeyspaceGCState, err := m.getGCStateImpl(constant.NullKeyspaceID, excludeGCBarriers) + if err != nil { + return err } - results[constant.NullKeyspaceID] = nullKeyspaceState - return nil - }) - unlock() - if err != nil { - return nil, err + cb(nullKeyspaceGCState) } for { select { case <-ctx.Done(): - return nil, ctx.Err() + return ctx.Err() default: } keyspaceMeta, ok, err := keyspaceIterator.Next() if err != nil { - return nil, err + return err } if !ok { break } - // Just handle the active keyspace, leave the others up to keyspace management. - if keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED { + if !keyspacePred(keyspaceMeta.GetId()) || keyspaceMeta.State != keyspacepb.KeyspaceState_ENABLED { continue } if keyspaceMeta.Config[keyspace.GCManagementType] != keyspace.KeyspaceLevelGC { - results[keyspaceMeta.Id] = GCState{ + gcState := GCState{ KeyspaceID: keyspaceMeta.Id, IsKeyspaceLevel: false, } + cb(gcState) continue } - lock() - err = m.gcMetaStorage.RunInGCStateTransaction(func(wb *endpoint.GCStateWriteBatch) error { - state, err1 := m.getGCStateInTransaction(keyspaceMeta.Id, wb) - if err1 != nil { - return err1 - } - results[keyspaceMeta.Id] = state - return nil - }) - unlock() + gcState, err := m.getGCStateImpl(keyspaceMeta.GetId(), excludeGCBarriers) if err != nil { - return nil, err + return err } + cb(gcState) } - return results, nil + return nil } // LoadAllGlobalGCBarriers returns global GC barriers. diff --git a/pkg/gc/gc_state_manager_test.go b/pkg/gc/gc_state_manager_test.go index a35e113546f..287bb6fe36d 100644 --- a/pkg/gc/gc_state_manager_test.go +++ b/pkg/gc/gc_state_manager_test.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "math" + "math/rand/v2" "os" "slices" "strconv" @@ -29,8 +30,10 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/server/v3/embed" "go.uber.org/goleak" + "go.uber.org/zap/zapcore" "github.com/pingcap/errors" "github.com/pingcap/failpoint" @@ -91,7 +94,9 @@ type newGCStateManagerForTestOptions struct { // Nil for generating initial keyspaces by the default preset // Non-nil (including empty) for generating specified keyspaces specifyInitialKeyspaces []*keyspace.CreateKeyspaceByIDRequest + serverNodes int etcdServerCfgModifier func(cfg *embed.Config) + etcdClientCfgModifier etcdutil.CreateEtcdClientOpt } func (opt *newGCStateManagerForTestOptions) generateKeyspacesByCount(count int) { @@ -114,8 +119,13 @@ func newGCStateManagerForTest(t testing.TB, opt newGCStateManagerForTestOptions) var etcdClusterOpt etcdutil.TestEtcdClusterOptions etcdClusterOpt.ServerCfgModifier = opt.etcdServerCfgModifier + etcdClusterOpt.ClientCfgModifier = opt.etcdClientCfgModifier - _, client, clean := etcdutil.NewTestEtcdCluster(t, 1, &etcdClusterOpt) + numNodes := opt.serverNodes + if numNodes <= 0 { + numNodes = 1 + } + _, client, clean := etcdutil.NewTestEtcdCluster(t, numNodes, &etcdClusterOpt) kvBase := kv.NewEtcdKVBase(client) // Simulate a member which id.Allocator may need to check. @@ -198,6 +208,8 @@ func newGCStateManagerForTest(t testing.TB, opt newGCStateManagerForTestOptions) } } + gcStateManager.OnNodeBecomesLeader() + return s, s.GetGCStateProvider(), gcStateManager, clean, cancel } @@ -228,9 +240,64 @@ func (s *gcStateManagerTestSuite) TearDownTest() { s.clean() } +type gcStateCacheAccessCount struct { + hit int + slowHit int + miss int +} + +type gcStateCacheAccessCounters struct { + sync.Mutex + gcStateCacheAccessCount +} + +type gcStateCacheAccessCounterSnapshot struct { + gcStateCacheAccessCount +} + +func (s *gcStateManagerTestSuite) ensureMarkedLeader() { + if !s.manager.nodeIsLeader() { + s.manager.OnNodeBecomesLeader() + } +} + +func (s *gcStateManagerTestSuite) trackGCStateCacheAccessCounters() *gcStateCacheAccessCounters { + tracker := &gcStateCacheAccessCounters{} + failpointName := "github.com/tikv/pd/pkg/gc/getGCStateCacheAccess" + // Track cache access through a failpoint instead of reading Prometheus metrics directly. + // This keeps the test local to this suite and avoids adding a direct go.mod dependency + // on Prometheus' client_model package just for test assertions. + s.Require().NoError(failpoint.EnableCall(failpointName, func(result string) { + tracker.Lock() + defer tracker.Unlock() + switch result { + case "hit": + tracker.hit++ + case "slow_hit": + tracker.slowHit++ + case "miss": + tracker.miss++ + default: + s.Require().FailNowf("unexpected GC state cache access result", "result: %s", result) + } + })) + s.T().Cleanup(func() { + s.Require().NoError(failpoint.Disable(failpointName)) + }) + return tracker +} + +func (c *gcStateCacheAccessCounters) snapshot() gcStateCacheAccessCounterSnapshot { + c.Lock() + defer c.Unlock() + return gcStateCacheAccessCounterSnapshot{ + gcStateCacheAccessCount: c.gcStateCacheAccessCount, + } +} + func (s *gcStateManagerTestSuite) checkTxnSafePoint(keyspaceID uint32, expectedTxnSafePoint uint64) { re := s.Require() - state, err := s.manager.GetGCState(keyspaceID) + state, err := s.manager.GetGCState(keyspaceID, true) re.NoError(err) re.Equal(expectedTxnSafePoint, state.TxnSafePoint) } @@ -358,7 +425,7 @@ func (s *gcStateManagerTestSuite) TestAdvanceGCSafePointBasic() { re := s.Require() checkGCSafePoint := func(keyspaceID uint32, expectedGCSafePoint uint64) { - state, err := s.manager.GetGCState(keyspaceID) + state, err := s.manager.GetGCState(keyspaceID, true) re.NoError(err) re.Equal(expectedGCSafePoint, state.GCSafePoint) } @@ -484,7 +551,7 @@ func (s *gcStateManagerTestSuite) TestCompatibleUpdateGCSafePointSequentiallyWit func (s *gcStateManagerTestSuite) TestCompatibleUpdateGCSafePointSequentiallyWithNewLoad() { for _, keyspaceID := range s.keyspacePresets.manageable { s.testCompatibleGCSafePointUpdateSequentiallyImpl(keyspaceID, func(keyspaceID uint32) (uint64, error) { - state, err := s.manager.GetGCState(keyspaceID) + state, err := s.manager.GetGCState(keyspaceID, true) if err != nil { return 0, err } @@ -661,7 +728,7 @@ func (s *gcStateManagerTestSuite) TestCompatibleServiceGCSafePointRoundingTTL() re.NoError(err) re.True(updated) - state, err := s.manager.GetGCState(keyspaceID) + state, err := s.manager.GetGCState(keyspaceID, false) re.NoError(err) re.Len(state.GCBarriers, 1) re.Equal("svc1", state.GCBarriers[0].BarrierID) @@ -673,7 +740,7 @@ func (s *gcStateManagerTestSuite) TestCompatibleServiceGCSafePointRoundingTTL() re.NoError(err) re.True(updated) - state, err = s.manager.GetGCState(keyspaceID) + state, err = s.manager.GetGCState(keyspaceID, false) re.NoError(err) re.Len(state.GCBarriers, 1) re.Equal("svc1", state.GCBarriers[0].BarrierID) @@ -685,7 +752,7 @@ func (s *gcStateManagerTestSuite) TestCompatibleServiceGCSafePointRoundingTTL() func (s *gcStateManagerTestSuite) getGCBarrier(keyspaceID uint32, barrierID string) *endpoint.GCBarrier { re := s.Require() - state, err := s.manager.GetGCState(keyspaceID) + state, err := s.manager.GetGCState(keyspaceID, false) re.NoError(err) idx := slices.IndexFunc(state.GCBarriers, func(b *endpoint.GCBarrier) bool { return b.BarrierID == barrierID @@ -698,7 +765,7 @@ func (s *gcStateManagerTestSuite) getGCBarrier(keyspaceID uint32, barrierID stri func (s *gcStateManagerTestSuite) getAllGCBarriers(keyspaceID uint32) []*endpoint.GCBarrier { re := s.Require() - state, err := s.manager.GetGCState(keyspaceID) + state, err := s.manager.GetGCState(keyspaceID, false) re.NoError(err) return state.GCBarriers } @@ -1733,7 +1800,7 @@ func (s *gcStateManagerTestSuite) TestRedirectKeyspace() { // Check all public methods that accepts keyspaceID are all correctly redirected. testedFunc := []func(keyspaceID uint32) error{ func(keyspaceID uint32) error { - _, err1 := s.manager.GetGCState(keyspaceID) + _, err1 := s.manager.GetGCState(keyspaceID, false) return errors.AddStack(err1) }, func(keyspaceID uint32) error { @@ -1789,14 +1856,14 @@ func (s *gcStateManagerTestSuite) TestGetGCState() { // Check the result of GetAllKeyspaceGCStates and GetGCState are matching. checkAllKeyspaceGCStates := func() { - allStates, err := s.manager.GetAllKeyspacesGCStates(context.Background()) + allStates, err := s.manager.GetAllKeyspacesGCStates(context.Background(), false) re.NoError(err) re.Len(allStates, len(s.keyspacePresets.all)) for keyspaceID, state := range allStates { if slices.Contains(s.keyspacePresets.manageable, keyspaceID) { re.Equal(keyspaceID, state.KeyspaceID) - s, err := s.manager.GetGCState(keyspaceID) + s, err := s.manager.GetGCState(keyspaceID, false) re.NoError(err) re.Equal(s, state) } else { @@ -1808,7 +1875,7 @@ func (s *gcStateManagerTestSuite) TestGetGCState() { } for _, keyspaceID := range s.keyspacePresets.manageable { - state, err := s.manager.GetGCState(keyspaceID) + state, err := s.manager.GetGCState(keyspaceID, false) re.NoError(err) re.Equal(keyspaceID, state.KeyspaceID) if keyspaceID == constant.NullKeyspaceID { @@ -1822,7 +1889,7 @@ func (s *gcStateManagerTestSuite) TestGetGCState() { } for _, keyspaceID := range slices.Concat(s.keyspacePresets.unmanageable, s.keyspacePresets.nullSynonyms) { - state, err := s.manager.GetGCState(keyspaceID) + state, err := s.manager.GetGCState(keyspaceID, false) re.NoError(err) re.Equal(constant.NullKeyspaceID, state.KeyspaceID) re.False(state.IsKeyspaceLevel) @@ -1832,7 +1899,7 @@ func (s *gcStateManagerTestSuite) TestGetGCState() { } for _, keyspaceID := range s.keyspacePresets.notExisting { - _, err := s.manager.GetGCState(keyspaceID) + _, err := s.manager.GetGCState(keyspaceID, false) re.Error(err) re.ErrorIs(err, errs.ErrKeyspaceNotFound) } @@ -1859,7 +1926,7 @@ func (s *gcStateManagerTestSuite) TestGetGCState() { _, err = s.manager.SetGCBarrier(2, "b3", 60, time.Duration(math.MaxInt64), now) re.NoError(err) - state, err := s.manager.GetGCState(constant.NullKeyspaceID) + state, err := s.manager.GetGCState(constant.NullKeyspaceID, false) re.NoError(err) re.Equal(constant.NullKeyspaceID, state.KeyspaceID) re.False(state.IsKeyspaceLevel) @@ -1870,7 +1937,7 @@ func (s *gcStateManagerTestSuite) TestGetGCState() { endpoint.NewGCBarrier("b2", 25, ptime(now.Add(time.Hour*2))), }, state.GCBarriers) - state, err = s.manager.GetGCState(2) + state, err = s.manager.GetGCState(2, false) re.NoError(err) re.Equal(uint32(2), state.KeyspaceID) re.True(state.IsKeyspaceLevel) @@ -1881,9 +1948,365 @@ func (s *gcStateManagerTestSuite) TestGetGCState() { endpoint.NewGCBarrier("b3", 60, nil), }, state.GCBarriers) + // Check excluding GC barriers + state, err = s.manager.GetGCState(2, true) + re.NoError(err) + re.Equal(uint32(2), state.KeyspaceID) + re.True(state.IsKeyspaceLevel) + re.Equal(uint64(50), state.TxnSafePoint) + re.Equal(uint64(45), state.GCSafePoint) + re.Empty(state.GCBarriers) + checkAllKeyspaceGCStates() } +func (s *gcStateManagerTestSuite) TestGetAllKeyspacesGCStatesExcludingGCBarriers() { + re := s.Require() + + for _, keyspaceID := range s.keyspacePresets.manageable { + _, err := s.manager.SetGCBarrier(keyspaceID, fmt.Sprintf("b1-%d", keyspaceID), 25, time.Hour, time.Now()) + re.NoError(err) + } + + allStates, err := s.manager.GetAllKeyspacesGCStates(context.Background(), false) + re.NoError(err) + re.Len(allStates, len(s.keyspacePresets.all)) + + for _, keyspaceID := range s.keyspacePresets.manageable { + state, ok := allStates[keyspaceID] + re.True(ok) + re.Len(state.GCBarriers, 1) + re.Equal(fmt.Sprintf("b1-%d", keyspaceID), state.GCBarriers[0].BarrierID) + re.Equal(uint64(25), state.GCBarriers[0].BarrierTS) + } + + allStates, err = s.manager.GetAllKeyspacesGCStates(context.Background(), true) + re.NoError(err) + re.Len(allStates, len(s.keyspacePresets.all)) + + for _, keyspaceID := range s.keyspacePresets.manageable { + state, ok := allStates[keyspaceID] + re.True(ok) + re.Empty(state.GCBarriers) + } +} + +func (s *gcStateManagerTestSuite) TestGetGCStateCacheMissConcurrent() { + re := s.Require() + s.ensureMarkedLeader() + tracker := s.trackGCStateCacheAccessCounters() + + const keyspaceID = uint32(2) + before := tracker.snapshot() + + // Make every worker stop after the fast-path cache miss but before it can + // continue the slow path. Once released, every request is already past the + // fast-path cache lookup, so each request can only end up as either a + // slow-path miss or a slow-path cache hit, depending on scheduler + // interleaving around the cache update. + reachedSlowPath := make(chan struct{}) + releaseSlowPath := make(chan struct{}) + var releaseOnce sync.Once + releaseWorkers := func() { + releaseOnce.Do(func() { + close(releaseSlowPath) + }) + } + defer releaseWorkers() + + const concurrency = 6 + var reachedCount atomic.Int32 + failpointName := "github.com/tikv/pd/pkg/gc/getGCStateBeforeSlowPath" + re.NoError(failpoint.EnableCall(failpointName, func() { + if reachedCount.Add(1) == concurrency { + close(reachedSlowPath) + } + <-releaseSlowPath + })) + defer func() { + re.NoError(failpoint.Disable(failpointName)) + }() + + type result struct { + state GCState + err error + } + results := make(chan result, concurrency) + var wg sync.WaitGroup + wg.Add(concurrency) + for range concurrency { + go func() { + defer wg.Done() + state, err := s.manager.GetGCState(keyspaceID, true) + results <- result{state: state, err: err} + }() + } + + select { + case <-reachedSlowPath: + case <-time.After(5 * time.Second): + re.FailNow("not all concurrent GetGCState calls reached the slow path") + } + releaseWorkers() + + wg.Wait() + close(results) + + var first GCState + for i := range concurrency { + res := <-results + re.NoError(res.err) + if i == 0 { + first = res.state + continue + } + re.Equal(first, res.state) + } + + after := tracker.snapshot() + re.Equal(before.hit, after.hit) + re.Greater(after.miss, before.miss) + re.Equal( + before.miss+before.slowHit+concurrency, + after.miss+after.slowHit, + ) + + cachedState, ok := s.manager.gcStateCache.load(keyspaceID) + re.True(ok) + re.Equal(first.TxnSafePoint, cachedState.TxnSafePoint) + re.Equal(first.GCSafePoint, cachedState.GCSafePoint) +} + +func (s *gcStateManagerTestSuite) TestGetGCStateCacheHitConcurrent() { + re := s.Require() + s.ensureMarkedLeader() + tracker := s.trackGCStateCacheAccessCounters() + + const keyspaceID = uint32(2) + // Warm the cache once. Every concurrent request below should then stay on + // the read-lock-protected fast path and avoid both slow hits and storage reads. + expected, err := s.manager.GetGCState(keyspaceID, true) + re.NoError(err) + + before := tracker.snapshot() + + const concurrency = 6 + type result struct { + state GCState + err error + } + results := make(chan result, concurrency) + var wg sync.WaitGroup + wg.Add(concurrency) + for range concurrency { + go func() { + defer wg.Done() + state, err := s.manager.GetGCState(keyspaceID, true) + results <- result{state: state, err: err} + }() + } + wg.Wait() + close(results) + + for res := range results { + re.NoError(res.err) + re.Equal(expected, res.state) + } + + after := tracker.snapshot() + re.Equal(before.hit+concurrency, after.hit) + re.Equal(before.slowHit, after.slowHit) + re.Equal(before.miss, after.miss) +} + +func (s *gcStateManagerTestSuite) TestGetGCStateReadFailureDoesNotPopulateCache() { + re := s.Require() + s.ensureMarkedLeader() + tracker := s.trackGCStateCacheAccessCounters() + + const keyspaceID = uint32(2) + before := tracker.snapshot() + + // Corrupt only the persisted txn safe point. A failed load must return an + // error and, more importantly, must not install a zero-valued partial state + // into the cache. + re.NoError(s.storage.Save(keypath.TxnSafePointPath(keyspaceID), "invalid-txn-safe-point")) + + _, err := s.manager.GetGCState(keyspaceID, true) + re.Error(err) + re.Contains(err.Error(), "invalid syntax") + _, ok := s.manager.gcStateCache.load(keyspaceID) + re.False(ok) + + afterFailure := tracker.snapshot() + re.Equal(before.miss+1, afterFailure.miss) + re.Equal(before.hit, afterFailure.hit) + re.Equal(before.slowHit, afterFailure.slowHit) + + re.NoError(s.storage.Save(keypath.TxnSafePointPath(keyspaceID), "123")) + + // After repairing storage, GetGCState must go back to storage again instead + // of returning a stale/empty cached value from the previous failed attempt. + state, err := s.manager.GetGCState(keyspaceID, true) + re.NoError(err) + re.Equal(uint64(123), state.TxnSafePoint) + re.Empty(state.GCBarriers) + + cachedState, ok := s.manager.gcStateCache.load(keyspaceID) + re.True(ok) + re.Equal(uint64(123), cachedState.TxnSafePoint) + + afterRecovery := tracker.snapshot() + re.Equal(before.miss+2, afterRecovery.miss) + re.Equal(before.hit, afterRecovery.hit) + re.Equal(before.slowHit, afterRecovery.slowHit) +} + +func (s *gcStateManagerTestSuite) TestDeleteGCBarrierWithoutCacheTriggersFreshRead() { + re := s.Require() + s.ensureMarkedLeader() + tracker := s.trackGCStateCacheAccessCounters() + + const keyspaceID = uint32(2) + now := time.Now().Truncate(time.Second) + _, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 20, now) + re.NoError(err) + s.manager.gcStateCache.remove(keyspaceID) + _, err = s.manager.SetGCBarrier(keyspaceID, "b1", 30, time.Hour, now) + re.NoError(err) + + // GC barrier mutations do not populate the safe-point cache. + _, ok := s.manager.gcStateCache.load(keyspaceID) + re.False(ok) + + _, err = s.manager.DeleteGCBarrier(keyspaceID, "b1") + re.NoError(err) + _, ok = s.manager.gcStateCache.load(keyspaceID) + re.False(ok) + + before := tracker.snapshot() + + // Since no cache exists, the first excludeGCBarriers read after deletion + // must miss and reload the safe points from storage. + state, err := s.manager.GetGCState(keyspaceID, true) + re.NoError(err) + re.Equal(uint64(20), state.TxnSafePoint) + + after := tracker.snapshot() + re.Equal(before.miss+1, after.miss) + re.Equal(before.hit, after.hit) + re.Equal(before.slowHit, after.slowHit) +} + +func (s *gcStateManagerTestSuite) TestDeleteGCBarrierKeepsWarmSafePointCacheUsable() { + re := s.Require() + s.ensureMarkedLeader() + tracker := s.trackGCStateCacheAccessCounters() + + const keyspaceID = uint32(2) + now := time.Now().Truncate(time.Second) + _, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 20, now) + re.NoError(err) + _, err = s.manager.SetGCBarrier(keyspaceID, "b1", 30, time.Hour, now) + re.NoError(err) + + _, err = s.manager.GetGCState(keyspaceID, true) + re.NoError(err) + + // The cache only stores safe points. Deleting a barrier does not change + // those values, so a warmed excludeGCBarriers cache should stay reusable. + beforeDelete := tracker.snapshot() + _, err = s.manager.DeleteGCBarrier(keyspaceID, "b1") + re.NoError(err) + + cachedState, ok := s.manager.gcStateCache.load(keyspaceID) + re.True(ok) + re.Equal(uint64(20), cachedState.TxnSafePoint) + + state, err := s.manager.GetGCState(keyspaceID, true) + re.NoError(err) + re.Equal(uint64(20), state.TxnSafePoint) + + afterRead := tracker.snapshot() + re.Equal(beforeDelete.hit+1, afterRead.hit) + re.Equal(beforeDelete.slowHit, afterRead.slowHit) + re.Equal(beforeDelete.miss, afterRead.miss) +} + +func (s *gcStateManagerTestSuite) TestAdvanceGCSafePointUpdatesWarmCache() { + re := s.Require() + s.ensureMarkedLeader() + tracker := s.trackGCStateCacheAccessCounters() + + const keyspaceID = uint32(2) + _, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 50, time.Now()) + re.NoError(err) + + state, err := s.manager.GetGCState(keyspaceID, true) + re.NoError(err) + re.Equal(uint64(0), state.GCSafePoint) + + beforeAdvance := tracker.snapshot() + oldGCSafePoint, newGCSafePoint, err := s.manager.AdvanceGCSafePoint(keyspaceID, 40) + re.NoError(err) + re.Equal(uint64(0), oldGCSafePoint) + re.Equal(uint64(40), newGCSafePoint) + + state, err = s.manager.GetGCState(keyspaceID, true) + re.NoError(err) + re.Equal(uint64(40), state.GCSafePoint) + + // AdvanceGCSafePoint is expected to patch an existing cache entry rather + // than invalidate it or force the next read through storage. + afterRead := tracker.snapshot() + re.Equal(beforeAdvance.hit+1, afterRead.hit) + re.Equal(beforeAdvance.slowHit, afterRead.slowHit) + re.Equal(beforeAdvance.miss, afterRead.miss) +} + +func (s *gcStateManagerTestSuite) TestSetGCBarrierKeepsWarmSafePointCacheUsable() { + re := s.Require() + s.ensureMarkedLeader() + tracker := s.trackGCStateCacheAccessCounters() + + const keyspaceID = uint32(2) + now := time.Now().Truncate(time.Second) + + _, err := s.manager.AdvanceTxnSafePoint(keyspaceID, 25, now) + re.NoError(err) + _, _, err = s.manager.AdvanceGCSafePoint(keyspaceID, 10) + re.NoError(err) + + state, err := s.manager.GetGCState(keyspaceID, true) + re.NoError(err) + re.Equal(uint64(25), state.TxnSafePoint) + re.Equal(uint64(10), state.GCSafePoint) + + // Setting a barrier should not invalidate or mutate the safe-point-only + // cache entry. excludeGCBarriers reads should keep hitting the warmed cache. + beforeSet := tracker.snapshot() + _, err = s.manager.SetGCBarrier(keyspaceID, "b1", 30, time.Hour, now) + re.NoError(err) + + state, err = s.manager.GetGCState(keyspaceID, true) + re.NoError(err) + re.Equal(uint64(25), state.TxnSafePoint) + re.Equal(uint64(10), state.GCSafePoint) + + afterFirstRead := tracker.snapshot() + re.Equal(beforeSet.hit+1, afterFirstRead.hit) + re.Equal(beforeSet.slowHit, afterFirstRead.slowHit) + re.Equal(beforeSet.miss, afterFirstRead.miss) + + // A full read still sees the barrier from storage, showing that the cache + // remains intentionally limited to safe points. + state, err = s.manager.GetGCState(keyspaceID, false) + re.NoError(err) + re.Equal([]*endpoint.GCBarrier{ + endpoint.NewGCBarrier("b1", 30, ptime(now.Add(time.Hour))), + }, state.GCBarriers) +} + func (s *gcStateManagerTestSuite) TestGetAllKeyspacesMaxTxnSafePoint() { re := s.Require() @@ -1986,22 +2409,22 @@ func (s *gcStateManagerTestSuite) testDowngradeCompatibility(keyspaceID uint32) re.Equal(uint64(25), s.getLegacyGCWorkerServiceSafePoint(keyspaceID).SafePoint) // Not visible by GetGCStates or GetAllKeyspacesGCStates. - gcState, err := s.manager.GetGCState(keyspaceID) + gcState, err := s.manager.GetGCState(keyspaceID, false) re.NoError(err) re.Empty(gcState.GCBarriers) - allGCStates, err := s.manager.GetAllKeyspacesGCStates(context.Background()) + allGCStates, err := s.manager.GetAllKeyspacesGCStates(context.Background(), false) re.NoError(err) re.Empty(allGCStates[keyspaceID].GCBarriers) // And it works correctly when there are other valid GC barriers. _, err = s.manager.SetGCBarrier(keyspaceID, "b1", 40, time.Hour, now) re.NoError(err) - gcState, err = s.manager.GetGCState(keyspaceID) + gcState, err = s.manager.GetGCState(keyspaceID, false) re.NoError(err) re.Len(gcState.GCBarriers, 1) re.Equal("b1", gcState.GCBarriers[0].BarrierID) re.Equal(uint64(40), gcState.GCBarriers[0].BarrierTS) - allGCStates, err = s.manager.GetAllKeyspacesGCStates(context.Background()) + allGCStates, err = s.manager.GetAllKeyspacesGCStates(context.Background(), false) re.NoError(err) re.Len(allGCStates[keyspaceID].GCBarriers, 1) re.Equal("b1", allGCStates[keyspaceID].GCBarriers[0].BarrierID) @@ -2036,7 +2459,7 @@ func (s *gcStateManagerTestSuite) TestGetAllKeyspacesGCStatesConcurrentCallShari ch := make(chan result, 10) callOnce := func() { - gcStates, err := s.manager.GetAllKeyspacesGCStates(context.Background()) + gcStates, err := s.manager.GetAllKeyspacesGCStates(context.Background(), false) ch <- result{gcStates: gcStates, err: err} } @@ -2091,6 +2514,128 @@ func (s *gcStateManagerTestSuite) TestGetAllKeyspacesGCStatesConcurrentCallShari re.Equal(int64(2), executionCount.Load()) } +func (s *gcStateManagerTestSuite) TestGetAllKeyspacesGCStatesDifferentParametersCallsDoNotShareResult() { + re := s.Require() + + const keyspaceID = uint32(2) + _, err := s.manager.SetGCBarrier(keyspaceID, "b1", 25, time.Hour, time.Now()) + re.NoError(err) + + type result struct { + caller string + excludeGCBarriers bool + gcStates map[uint32]GCState + err error + } + + runScenario := func(firstExcludeGCBarriers bool) { + var executionCount atomic.Int64 + finishFailpointEnabled := true + + fullExecBefore := s.manager.allKeyspacesGCStatesSingleFlight.ExecCount() + excludeExecBefore := s.manager.allKeyspacesGCStatesExcludeGCBarriersSingleFlight.ExecCount() + + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/gc/onGetAllKeyspacesGCStatesFinish", "pause")) + re.NoError(failpoint.EnableCall("github.com/tikv/pd/pkg/gc/onGetAllKeyspacesGCStatesStart", func() { + executionCount.Add(1) + })) + defer func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/gc/onGetAllKeyspacesGCStatesStart")) + if finishFailpointEnabled { + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/gc/onGetAllKeyspacesGCStatesFinish")) + } + }() + + ch := make(chan result, 3) + callOnce := func(caller string, excludeGCBarriers bool) { + gcStates, err := s.manager.GetAllKeyspacesGCStates(context.Background(), excludeGCBarriers) + ch <- result{ + caller: caller, + excludeGCBarriers: excludeGCBarriers, + gcStates: gcStates, + err: err, + } + } + + go callOnce("first", firstExcludeGCBarriers) + + select { + case res := <-ch: + re.FailNowf("failpoint not taking effect to block the first invocation to GetAllKeyspacesGCStates", "caller: %s, excludeGCBarriers: %v, result: %v, err: %v", res.caller, res.excludeGCBarriers, res.gcStates, res.err) + case <-time.After(200 * time.Millisecond): + } + re.Equal(int64(1), executionCount.Load()) + + // The second call uses the other excludeGCBarriers value, so with the + // correct implementation it must start a separate execution immediately. + go callOnce("second", !firstExcludeGCBarriers) + + // The third call uses the same parameter as the first one. If the two + // parameter variants were incorrectly routed to a single OrderedSingleFlight + // instance, this third call could be merged into the second call's pending + // batch and receive a result for the wrong parameter. + go callOnce("third", firstExcludeGCBarriers) + + deadline := time.Now().Add(time.Second) + for executionCount.Load() < 2 && time.Now().Before(deadline) { + time.Sleep(10 * time.Millisecond) + } + re.Equal( + int64(2), + executionCount.Load(), + "calls with different excludeGCBarriers values should use different OrderedSingleFlight instances", + ) + + select { + case res := <-ch: + re.FailNowf("expected all invocations to stay blocked before finish failpoint is released", "caller: %s, excludeGCBarriers: %v, result: %v, err: %v", res.caller, res.excludeGCBarriers, res.gcStates, res.err) + case <-time.After(100 * time.Millisecond): + } + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/gc/onGetAllKeyspacesGCStatesFinish")) + finishFailpointEnabled = false + + fullCallCount := 0 + excludeCallCount := 0 + for range 3 { + var res result + select { + case res = <-ch: + case <-time.After(time.Second): + re.FailNow("GetAllKeyspacesGCStates blocked while expected to return") + } + re.NoError(res.err) + if res.excludeGCBarriers { + excludeCallCount++ + re.Empty(res.gcStates[keyspaceID].GCBarriers) + } else { + fullCallCount++ + re.Len(res.gcStates[keyspaceID].GCBarriers, 1) + re.Equal("b1", res.gcStates[keyspaceID].GCBarriers[0].BarrierID) + re.Equal(uint64(25), res.gcStates[keyspaceID].GCBarriers[0].BarrierTS) + } + } + + expectedFullCalls := 1 + expectedExcludeCalls := 2 + expectedFullExecs := fullExecBefore + 1 + expectedExcludeExecs := excludeExecBefore + 2 + if !firstExcludeGCBarriers { + expectedFullCalls = 2 + expectedExcludeCalls = 1 + expectedFullExecs = fullExecBefore + 2 + expectedExcludeExecs = excludeExecBefore + 1 + } + re.Equal(expectedFullCalls, fullCallCount) + re.Equal(expectedExcludeCalls, excludeCallCount) + re.Equal(expectedFullExecs, s.manager.allKeyspacesGCStatesSingleFlight.ExecCount()) + re.Equal(expectedExcludeExecs, s.manager.allKeyspacesGCStatesExcludeGCBarriersSingleFlight.ExecCount()) + } + + runScenario(false) + runScenario(true) +} + func TestGetAllKeysapcesGCStatesOnTooManyKeyspaces(t *testing.T) { re := require.New(t) @@ -2107,7 +2652,7 @@ func TestGetAllKeysapcesGCStatesOnTooManyKeyspaces(t *testing.T) { clean() }() - gcStates, err := gcStateManager.GetAllKeyspacesGCStates(context.Background()) + gcStates, err := gcStateManager.GetAllKeyspacesGCStates(context.Background(), false) re.Len(gcStates, totalKeyspaces+2) // Including the null keyspace, the default keyspace or the system keyspace. re.NoError(err) @@ -2173,15 +2718,20 @@ func TestGetMaxTxnSafePointAmongAllKeyspacesOnTooManyKeyspaces(t *testing.T) { } } -func benchmarkGetAllKeyspacesGCStatesImpl(b *testing.B, keyspacesCount int, parallelism int) { +func benchmarkGetAllKeyspacesGCStatesImpl(b *testing.B, excludeGCBarriers bool, keyspacesCount int, parallelism int) { re := require.New(b) fname := testutil.InitTempFileLogger("info") defer os.Remove(fname) opt := newGCStateManagerForTestOptions{ specifyInitialKeyspaces: make([]*keyspace.CreateKeyspaceByIDRequest, 0, keyspacesCount), + serverNodes: 1, etcdServerCfgModifier: func(cfg *embed.Config) { cfg.LogOutputs = []string{fname} + cfg.LogLevel = "error" + }, + etcdClientCfgModifier: func(cfg *clientv3.Config) { + cfg.LogConfig.Level.SetLevel(zapcore.ErrorLevel) }, } createTime := time.Now().Unix() @@ -2206,52 +2756,230 @@ func benchmarkGetAllKeyspacesGCStatesImpl(b *testing.B, keyspacesCount int, para b.ResetTimer() if parallelism == 0 { for range b.N { - _, err := gcStateManager.GetAllKeyspacesGCStates(context.Background()) + _, err := gcStateManager.GetAllKeyspacesGCStates(context.Background(), excludeGCBarriers) re.NoError(err) } } else { b.SetParallelism(parallelism) b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _, err := gcStateManager.GetAllKeyspacesGCStates(context.Background()) + _, err := gcStateManager.GetAllKeyspacesGCStates(context.Background(), excludeGCBarriers) re.NoError(err) } }) } b.StopTimer() execCount := gcStateManager.allKeyspacesGCStatesSingleFlight.ExecCount() + if excludeGCBarriers { + execCount = gcStateManager.allKeyspacesGCStatesExcludeGCBarriersSingleFlight.ExecCount() + } b.ReportMetric(float64(execCount), "exec/op") b.ReportMetric(1-float64(execCount)/float64(b.N), "reusing_rate") } func BenchmarkGetAllKeyspacesGCStates_KS1_SingleThread(b *testing.B) { - benchmarkGetAllKeyspacesGCStatesImpl(b, 1, 0) + benchmarkGetAllKeyspacesGCStatesImpl(b, false, 1, 0) } func BenchmarkGetAllKeyspacesGCStates_KS1_P1(b *testing.B) { - benchmarkGetAllKeyspacesGCStatesImpl(b, 1, 1) + benchmarkGetAllKeyspacesGCStatesImpl(b, false, 1, 1) } func BenchmarkGetAllKeyspacesGCStates_KS1_P8(b *testing.B) { - benchmarkGetAllKeyspacesGCStatesImpl(b, 1, 8) + benchmarkGetAllKeyspacesGCStatesImpl(b, false, 1, 8) } func BenchmarkGetAllKeyspacesGCStates_KS1_P128(b *testing.B) { - benchmarkGetAllKeyspacesGCStatesImpl(b, 1, 128) + benchmarkGetAllKeyspacesGCStatesImpl(b, false, 1, 128) +} + +func BenchmarkGetAllKeyspacesGCStates_KS100_SingleThread(b *testing.B) { + benchmarkGetAllKeyspacesGCStatesImpl(b, false, 100, 0) +} + +func BenchmarkGetAllKeyspacesGCStates_KS100_P1(b *testing.B) { + benchmarkGetAllKeyspacesGCStatesImpl(b, false, 100, 1) +} + +func BenchmarkGetAllKeyspacesGCStates_KS100_P8(b *testing.B) { + benchmarkGetAllKeyspacesGCStatesImpl(b, false, 100, 8) +} + +func BenchmarkGetAllKeyspacesGCStates_KS100_P128(b *testing.B) { + benchmarkGetAllKeyspacesGCStatesImpl(b, false, 100, 128) +} + +func BenchmarkGetAllKeyspacesGCStates_ExcludeGCBarriers_KS1_SingleThread(b *testing.B) { + benchmarkGetAllKeyspacesGCStatesImpl(b, true, 1, 0) +} + +func BenchmarkGetAllKeyspacesGCStates_ExcludeGCBarriers_KS1_P1(b *testing.B) { + benchmarkGetAllKeyspacesGCStatesImpl(b, true, 1, 1) +} + +func BenchmarkGetAllKeyspacesGCStates_ExcludeGCBarriers_KS1_P8(b *testing.B) { + benchmarkGetAllKeyspacesGCStatesImpl(b, true, 1, 8) +} + +func BenchmarkGetAllKeyspacesGCStates_ExcludeGCBarriers_KS1_P128(b *testing.B) { + benchmarkGetAllKeyspacesGCStatesImpl(b, true, 1, 128) +} + +func BenchmarkGetAllKeyspacesGCStates_ExcludeGCBarriers_KS100_SingleThread(b *testing.B) { + benchmarkGetAllKeyspacesGCStatesImpl(b, true, 100, 0) +} + +func BenchmarkGetAllKeyspacesGCStates_ExcludeGCBarriers_KS100_P1(b *testing.B) { + benchmarkGetAllKeyspacesGCStatesImpl(b, true, 100, 1) +} + +func BenchmarkGetAllKeyspacesGCStates_ExcludeGCBarriers_KS100_P8(b *testing.B) { + benchmarkGetAllKeyspacesGCStatesImpl(b, true, 100, 8) +} + +func BenchmarkGetAllKeyspacesGCStates_ExcludeGCBarriers_KS100_P128(b *testing.B) { + benchmarkGetAllKeyspacesGCStatesImpl(b, true, 100, 128) +} + +func benchmarkGetGCStateImpl(b *testing.B, excludeGCBarriers bool, keyspacesCount int, parallelism int, concurrentWriteThreads int) { + re := require.New(b) + + fname := testutil.InitTempFileLogger("info") + defer os.Remove(fname) + + opt := newGCStateManagerForTestOptions{ + specifyInitialKeyspaces: make([]*keyspace.CreateKeyspaceByIDRequest, 0, keyspacesCount), + serverNodes: 1, + etcdServerCfgModifier: func(cfg *embed.Config) { + cfg.LogOutputs = []string{fname} + cfg.LogLevel = "error" + }, + etcdClientCfgModifier: func(cfg *clientv3.Config) { + cfg.LogConfig.Level.SetLevel(zapcore.ErrorLevel) + }, + } + createTime := time.Now().Unix() + for i := range keyspacesCount { + id := new(uint32) + *id = uint32(i + 1) + opt.specifyInitialKeyspaces = append(opt.specifyInitialKeyspaces, &keyspace.CreateKeyspaceByIDRequest{ + ID: id, + Name: fmt.Sprintf("ks%d", *id), + Config: map[string]string{keyspace.GCManagementType: keyspace.KeyspaceLevelGC}, + CreateTime: createTime, + }) + } + + _, _, gcStateManager, clean, cancel := newGCStateManagerForTest(b, opt) + defer func() { + b.StopTimer() + cancel() + clean() + }() + + stopWriteCh := make(chan struct{}, 1) + var wg sync.WaitGroup + wg.Add(concurrentWriteThreads) + for range concurrentWriteThreads { + go func() { + defer wg.Done() + for i := uint64(1); ; i++ { + select { + case <-stopWriteCh: + return + default: + } + keyspaceID := rand.Uint32N(uint32(keyspacesCount)) + 1 + _, err := gcStateManager.AdvanceTxnSafePoint(keyspaceID, i, time.Now()) + re.NoError(err) + } + }() + } + defer func() { + close(stopWriteCh) + wg.Wait() + }() + + b.ResetTimer() + if parallelism == 0 { + for range b.N { + keyspaceID := rand.Uint32N(uint32(keyspacesCount)) + 1 + _, err := gcStateManager.GetGCState(keyspaceID, excludeGCBarriers) + re.NoError(err) + } + } else { + b.SetParallelism(parallelism) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + keyspaceID := rand.Uint32N(uint32(keyspacesCount)) + 1 + _, err := gcStateManager.GetGCState(keyspaceID, excludeGCBarriers) + re.NoError(err) + } + }) + } + b.StopTimer() +} + +func BenchmarkGetGCState_ExcludeGCBarriers_KS1_SingleThread(b *testing.B) { + benchmarkGetGCStateImpl(b, true, 1, 0, 0) +} + +func BenchmarkGetGCState_KS1_SingleThread(b *testing.B) { + benchmarkGetGCStateImpl(b, false, 1, 0, 0) +} + +func BenchmarkGetGCState_ExcludeGCBarriers_KS1_W10_SingleThread(b *testing.B) { + benchmarkGetGCStateImpl(b, true, 1, 0, 10) +} + +func BenchmarkGetGCState_KS1_W10_SingleThread(b *testing.B) { + benchmarkGetGCStateImpl(b, false, 1, 0, 10) +} + +func BenchmarkGetGCState_ExcludeGCBarriers_KS1_P64(b *testing.B) { + benchmarkGetGCStateImpl(b, true, 1, 64, 0) +} + +func BenchmarkGetGCState_KS1_P64(b *testing.B) { + benchmarkGetGCStateImpl(b, false, 1, 64, 0) +} + +func BenchmarkGetGCState_ExcludeGCBarriers_KS1_P64_W10(b *testing.B) { + benchmarkGetGCStateImpl(b, true, 1, 64, 10) +} + +func BenchmarkGetGCState_KS1_P64_W10(b *testing.B) { + benchmarkGetGCStateImpl(b, false, 1, 64, 10) +} + +func BenchmarkGetGCState_ExcludeGCBarriers_KS128_SingleThread(b *testing.B) { + benchmarkGetGCStateImpl(b, true, 128, 0, 0) +} + +func BenchmarkGetGCState_KS128_SingleThread(b *testing.B) { + benchmarkGetGCStateImpl(b, false, 128, 0, 0) +} + +func BenchmarkGetGCState_ExcludeGCBarriers_KS128_W10_SingleThread(b *testing.B) { + benchmarkGetGCStateImpl(b, true, 128, 0, 10) +} + +func BenchmarkGetGCState_KS128_W10_SingleThread(b *testing.B) { + benchmarkGetGCStateImpl(b, false, 128, 0, 10) } -func BenchmarkGetAllKeyspacesGCStates_KS10_SingleThread(b *testing.B) { - benchmarkGetAllKeyspacesGCStatesImpl(b, 10, 0) +func BenchmarkGetGCState_ExcludeGCBarriers_KS128_P64(b *testing.B) { + benchmarkGetGCStateImpl(b, true, 128, 64, 0) } -func BenchmarkGetAllKeyspacesGCStates_KS10_P1(b *testing.B) { - benchmarkGetAllKeyspacesGCStatesImpl(b, 10, 1) +func BenchmarkGetGCState_KS128_P64(b *testing.B) { + benchmarkGetGCStateImpl(b, false, 128, 64, 0) } -func BenchmarkGetAllKeyspacesGCStates_KS10_P8(b *testing.B) { - benchmarkGetAllKeyspacesGCStatesImpl(b, 10, 8) +func BenchmarkGetGCState_ExcludeGCBarriers_KS128_P64_W10(b *testing.B) { + benchmarkGetGCStateImpl(b, true, 128, 64, 10) } -func BenchmarkGetAllKeyspacesGCStates_KS10_P128(b *testing.B) { - benchmarkGetAllKeyspacesGCStatesImpl(b, 10, 128) +func BenchmarkGetGCState_KS128_P64_W10(b *testing.B) { + benchmarkGetGCStateImpl(b, false, 128, 64, 10) } diff --git a/pkg/gc/metrics.go b/pkg/gc/metrics.go index e70ecbf2209..1392f0aaee0 100644 --- a/pkg/gc/metrics.go +++ b/pkg/gc/metrics.go @@ -24,8 +24,20 @@ var ( Name: "gc_safepoint", Help: "The ts of gc safepoint", }, []string{"type"}) + gcStateCacheAccessCounter = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: "pd", + Subsystem: "gc", + Name: "gc_state_cache_access_total", + Help: "Counter of GC state cache accesses by result.", + }, []string{"result"}) + + gcStateCacheAccessHitCounter = gcStateCacheAccessCounter.WithLabelValues("hit") + gcStateCacheAccessSlowHitCounter = gcStateCacheAccessCounter.WithLabelValues("slow_hit") + gcStateCacheAccessMissCounter = gcStateCacheAccessCounter.WithLabelValues("miss") ) func init() { prometheus.MustRegister(gcSafePointGauge) + prometheus.MustRegister(gcStateCacheAccessCounter) } diff --git a/pkg/utils/etcdutil/etcdutil.go b/pkg/utils/etcdutil/etcdutil.go index 7ee357ce80e..aa9f729aa7a 100644 --- a/pkg/utils/etcdutil/etcdutil.go +++ b/pkg/utils/etcdutil/etcdutil.go @@ -277,30 +277,37 @@ const ( McsEtcdClientPurpose EtcdClientPurpose = "mcs-etcd-client" ) -func newClient(tlsConfig *tls.Config, endpoints ...string) (*clientv3.Client, error) { +// CreateEtcdClientOpt is an alias of the function that edits clientv3.Config. +type CreateEtcdClientOpt func(*clientv3.Config) + +func newClient(tlsConfig *tls.Config, endpoints []string, opts ...CreateEtcdClientOpt) (*clientv3.Client, error) { if len(endpoints) == 0 { return nil, errs.ErrNewEtcdClient.FastGenByArgs("empty etcd endpoints") } lgc := zap.NewProductionConfig() lgc.Encoding = log.ZapEncodingName - client, err := clientv3.New(clientv3.Config{ + cfg := clientv3.Config{ Endpoints: endpoints, DialTimeout: defaultEtcdClientTimeout, TLS: tlsConfig, LogConfig: &lgc, DialKeepAliveTime: defaultDialKeepAliveTime, DialKeepAliveTimeout: defaultDialKeepAliveTimeout, - }) + } + for _, opt := range opts { + opt(&cfg) + } + client, err := clientv3.New(cfg) return client, err } // CreateEtcdClient creates etcd v3 client with detecting endpoints. -func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL, purpose EtcdClientPurpose, enableChecker bool) (*clientv3.Client, error) { +func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL, purpose EtcdClientPurpose, enableChecker bool, opts ...CreateEtcdClientOpt) (*clientv3.Client, error) { urls := make([]string, 0, len(acURLs)) for _, u := range acURLs { urls = append(urls, u.String()) } - client, err := newClient(tlsConfig, urls...) + client, err := newClient(tlsConfig, urls, opts...) if err != nil { return nil, err } @@ -310,7 +317,7 @@ func CreateEtcdClient(tlsConfig *tls.Config, acURLs []url.URL, purpose EtcdClien tickerInterval = 100 * time.Millisecond }) if enableChecker { - initHealthChecker(tickerInterval, tlsConfig, client, purpose) + initHealthChecker(tickerInterval, tlsConfig, client, purpose, opts...) } return client, err diff --git a/pkg/utils/etcdutil/health_checker.go b/pkg/utils/etcdutil/health_checker.go index 5bb3ab5b8d3..87a22ccad13 100644 --- a/pkg/utils/etcdutil/health_checker.go +++ b/pkg/utils/etcdutil/health_checker.go @@ -62,6 +62,8 @@ type healthChecker struct { // the checked healthy endpoints dynamically and periodically. client *clientv3.Client + clientOpts []CreateEtcdClientOpt + endpointCountState prometheus.Gauge } @@ -71,12 +73,14 @@ func initHealthChecker( tlsConfig *tls.Config, client *clientv3.Client, purpose EtcdClientPurpose, + opts ...CreateEtcdClientOpt, ) { healthChecker := &healthChecker{ source: string(purpose), tickerInterval: tickerInterval, tlsConfig: tlsConfig, client: client, + clientOpts: opts, endpointCountState: etcdStateGauge.WithLabelValues(string(purpose), endpointLabel), } // A health checker has the same lifetime with the given etcd client. @@ -362,7 +366,7 @@ func (checker *healthChecker) update() { for ep := range epMap { client := checker.loadClient(ep) if client == nil { - checker.initClient(ep) + checker.initClient(ep, checker.clientOpts...) continue } since := time.Since(client.lastHealth) @@ -413,8 +417,8 @@ func (checker *healthChecker) loadClient(ep string) *healthyClient { return nil } -func (checker *healthChecker) initClient(ep string) { - client, err := newClient(checker.tlsConfig, ep) +func (checker *healthChecker) initClient(ep string, opts ...CreateEtcdClientOpt) { + client, err := newClient(checker.tlsConfig, []string{ep}, opts...) if err != nil { log.Error("failed to create etcd healthy client", zap.String("endpoint", ep), diff --git a/pkg/utils/etcdutil/testutil.go b/pkg/utils/etcdutil/testutil.go index e0199294e9a..e3e8664fbb0 100644 --- a/pkg/utils/etcdutil/testutil.go +++ b/pkg/utils/etcdutil/testutil.go @@ -66,6 +66,7 @@ func genRandName() string { // TestEtcdClusterOptions is the options for NewTestEtcdCluster. type TestEtcdClusterOptions struct { ServerCfgModifier func(cfg *embed.Config) + ClientCfgModifier CreateEtcdClientOpt } // NewTestEtcdCluster is used to create a etcd cluster for the unit test purpose. @@ -80,7 +81,11 @@ func NewTestEtcdCluster(t testing.TB, count int, opt *TestEtcdClusterOptions) (s } etcd, err := embed.StartEtcd(cfg) re.NoError(err) - etcdClient, err = CreateEtcdClient(nil, cfg.ListenClientUrls, TestEtcdClientPurpose, true) + var clientOpts []CreateEtcdClientOpt + if opt != nil && opt.ClientCfgModifier != nil { + clientOpts = append(clientOpts, opt.ClientCfgModifier) + } + etcdClient, err = CreateEtcdClient(nil, cfg.ListenClientUrls, TestEtcdClientPurpose, true, clientOpts...) re.NoError(err) <-etcd.Server.ReadyNotify() servers = append(servers, etcd) @@ -127,6 +132,7 @@ func MustAddEtcdMember(t testing.TB, cfg1 *embed.Config, client *clientv3.Client cfg2.Name = genRandName() cfg2.InitialCluster = cfg1.InitialCluster + fmt.Sprintf(",%s=%s", cfg2.Name, &cfg2.ListenPeerUrls[0]) cfg2.ClusterState = embed.ClusterStateFlagExisting + cfg2.LogLevel = cfg1.LogLevel peerURL := cfg2.ListenPeerUrls[0].String() addResp, err := AddEtcdMember(client, []string{peerURL}) re.NoError(err) diff --git a/server/api/service_gc_safepoint.go b/server/api/service_gc_safepoint.go index d978dabd3cd..cc8640be8a7 100644 --- a/server/api/service_gc_safepoint.go +++ b/server/api/service_gc_safepoint.go @@ -59,7 +59,7 @@ type ListServiceGCSafepoint struct { // @Router /gc/safepoint [get] func (h *serviceGCSafepointHandler) GetGCSafePoint(w http.ResponseWriter, _ *http.Request) { gcStateManager := h.svr.GetGCStateManager() - gcState, err := gcStateManager.GetGCState(constant.NullKeyspaceID) + gcState, err := gcStateManager.GetGCState(constant.NullKeyspaceID, false) if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return diff --git a/server/apiv2/handlers/safe_point.go b/server/apiv2/handlers/safe_point.go index a40e123a6ac..3c225409126 100644 --- a/server/apiv2/handlers/safe_point.go +++ b/server/apiv2/handlers/safe_point.go @@ -61,7 +61,7 @@ func LoadGCSafePoint(c *gin.Context) { c.AbortWithStatusJSON(http.StatusInternalServerError, managerUninitializedErr) return } - gcState, err := manager.GetGCState(uint32(keyspaceID)) + gcState, err := manager.GetGCState(uint32(keyspaceID), true) if err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 56ff919f27f..a10b49d1817 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -43,6 +43,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/core/storelimit" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/gc" "github.com/tikv/pd/pkg/gctuner" "github.com/tikv/pd/pkg/id" "github.com/tikv/pd/pkg/keyspace" @@ -141,6 +142,7 @@ type Server interface { GetKeyspaceGroupManager() *keyspace.GroupManager IsKeyspaceGroupEnabled() bool GetMeteringWriter() *metering.Writer + GetGCStateManager() *gc.GCStateManager } // RaftCluster is used for cluster config management. @@ -202,6 +204,8 @@ type RaftCluster struct { logRunner ratelimit.Runner // syncRegionRunner is used to sync region asynchronously. syncRegionRunner ratelimit.Runner + + stopGCStateManager func() } // Status saves some state information. @@ -470,6 +474,10 @@ func (c *RaftCluster) Start(s Server, bootstrap bool) (err error) { go c.startGCTuner() go c.startProgressGC() go c.runStorageSizeCollector(s.GetMeteringWriter(), c.regionLabeler, s.GetKeyspaceManager()) + + s.GetGCStateManager().OnNodeBecomesLeader() + c.stopGCStateManager = s.GetGCStateManager().OnNodeBecomesFollower + log.Info("start background jobs completed", zap.Duration("cost", time.Since(backgroundJobsStart))) runnersStart := time.Now() c.running = true @@ -981,6 +989,9 @@ func (c *RaftCluster) Stop() { c.miscRunner.Stop() c.logRunner.Stop() c.syncRegionRunner.Stop() + if c.stopGCStateManager != nil { + c.stopGCStateManager() + } c.Unlock() c.wg.Wait() diff --git a/server/gc_service.go b/server/gc_service.go index 7c4149acc91..efb925f12e3 100644 --- a/server/gc_service.go +++ b/server/gc_service.go @@ -24,6 +24,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" @@ -440,7 +441,7 @@ func (s *GrpcServer) GetAllGCSafePointV2(ctx context.Context, request *pdpb.GetA return &pdpb.GetAllGCSafePointV2Response{Header: grpcutil.NotBootstrappedHeader()}, nil } - gcStates, err := s.gcStateManager.GetAllKeyspacesGCStates(ctx) + gcStates, err := s.gcStateManager.GetAllKeyspacesGCStates(ctx, true) if err != nil { return &pdpb.GetAllGCSafePointV2Response{ Header: grpcutil.WrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), @@ -704,13 +705,22 @@ func (s *GrpcServer) GetGCState(ctx context.Context, request *pdpb.GetGCStateReq return &pdpb.GetGCStateResponse{Header: grpcutil.NotBootstrappedHeader()}, nil } - gcState, err := s.gcStateManager.GetGCState(getKeyspaceID(request.GetKeyspaceScope())) + gcState, err := s.gcStateManager.GetGCState(getKeyspaceID(request.GetKeyspaceScope()), request.GetExcludeGcBarriers()) if err != nil { return &pdpb.GetGCStateResponse{ Header: grpcutil.WrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), }, nil } + // Keep this hook immediately before the second cluster check so tests can + // deterministically cover requests that lose leadership after GC state is + // loaded but before the response is committed. + failpoint.InjectCall("getGCStateBeforeSecondClusterCheck") + + rc = s.GetRaftCluster() + if rc == nil { + return &pdpb.GetGCStateResponse{Header: grpcutil.NotBootstrappedHeader()}, nil + } return &pdpb.GetGCStateResponse{ Header: grpcutil.WrapHeader(), GcState: gcStateToProto(gcState, time.Now()), @@ -740,7 +750,7 @@ func (s *GrpcServer) GetAllKeyspacesGCStates(ctx context.Context, request *pdpb. return &pdpb.GetAllKeyspacesGCStatesResponse{Header: grpcutil.NotBootstrappedHeader()}, nil } - gcStates, err := s.gcStateManager.GetAllKeyspacesGCStates(ctx) + gcStates, err := s.gcStateManager.GetAllKeyspacesGCStates(ctx, request.GetExcludeGcBarriers()) if err != nil { return &pdpb.GetAllKeyspacesGCStatesResponse{ Header: grpcutil.WrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), @@ -752,15 +762,18 @@ func (s *GrpcServer) GetAllKeyspacesGCStates(ctx context.Context, request *pdpb. gcStatesPb = append(gcStatesPb, gcStateToProto(gcState, now)) } - globalBarriers, err := s.gcStateManager.LoadAllGlobalGCBarriers() - if err != nil { - return &pdpb.GetAllKeyspacesGCStatesResponse{ - Header: grpcutil.WrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), - }, nil - } - gcBarriersPb := make([]*pdpb.GlobalGCBarrierInfo, 0, len(globalBarriers)) - for _, barrier := range globalBarriers { - gcBarriersPb = append(gcBarriersPb, globalGCBarrierToProto(barrier, now)) + var gcBarriersPb []*pdpb.GlobalGCBarrierInfo + if !request.GetExcludeGlobalGcBarriers() { + globalBarriers, err := s.gcStateManager.LoadAllGlobalGCBarriers() + if err != nil { + return &pdpb.GetAllKeyspacesGCStatesResponse{ + Header: grpcutil.WrapErrorToHeader(pdpb.ErrorType_UNKNOWN, err.Error()), + }, nil + } + gcBarriersPb = make([]*pdpb.GlobalGCBarrierInfo, 0, len(globalBarriers)) + for _, barrier := range globalBarriers { + gcBarriersPb = append(gcBarriersPb, globalGCBarrierToProto(barrier, now)) + } } return &pdpb.GetAllKeyspacesGCStatesResponse{ diff --git a/tests/integrations/client/client_test.go b/tests/integrations/client/client_test.go index 768b10c4a36..a7870afb00f 100644 --- a/tests/integrations/client/client_test.go +++ b/tests/integrations/client/client_test.go @@ -2070,10 +2070,12 @@ func (*clientStatefulTestSuite) waitForGCBarrierExpiring(re *require.Assertions, // checkGCBarrier checks whether the specified GC barrier has the specified barrier TS. This function assumes the // barrier TS is never 0, and passing 0 means asserting the GC barrier does not exist. func (s *clientStatefulTestSuite) checkGCBarrier(re *require.Assertions, keyspaceID uint32, barrierID string, expectedBarrierTS uint64) { - gcState, err := s.client.GetGCStatesClient(keyspaceID).GetGCState(context.Background()) + gcState, err := s.client.GetGCStatesClient(keyspaceID).GetGCState(context.Background(), gc.ExcludeGCBarriers(false)) + re.NoError(err) + gcBarriers, err := gcState.GetGCBarriers() re.NoError(err) found := false - for _, b := range gcState.GCBarriers { + for _, b := range gcBarriers { if b.BarrierID == barrierID { if found { re.Failf("duplicated barrier ID found in the GC states", "barrierID: %s, GC state: %+v", barrierID, gcState) @@ -2096,10 +2098,15 @@ func (s *clientStatefulTestSuite) checkGCBarrier(re *require.Assertions, keyspac // checkGlobalGCBarrier checks whether the specified global GC barrier has the specified barrier TS. // This function assumes the barrier TS is never 0, and passing 0 means asserting the GC barrier does not exist. func (s *clientStatefulTestSuite) checkGlobalGCBarrier(re *require.Assertions, barrierID string, expectedBarrierTS uint64) { - gcStates, err := s.client.GetGCStatesClient(constants.NullKeyspaceID).GetAllKeyspacesGCStates(context.Background()) + gcStates, err := s.client.GetGCStatesClient(constants.NullKeyspaceID).GetAllKeyspacesGCStates( + context.Background(), + gc.ExcludeGlobalGCBarriers(false), + ) + re.NoError(err) + globalGCBarriers, err := gcStates.GetGlobalGCBarriers() re.NoError(err) found := false - for _, b := range gcStates.GlobalGCBarriers { + for _, b := range globalGCBarriers { if b.BarrierID == barrierID { if found { re.Failf("duplicated barrier ID found in the global GC barriers", "barrierID: %s", barrierID) @@ -2172,7 +2179,7 @@ func (s *clientStatefulTestSuite) testUpdateServiceGCSafePointImpl(keyspaceID ui // Suppress the unuseful lint warning. //nolint:unparam loadServiceGCSafePointByServiceID := func(serviceID string) *endpoint.ServiceSafePoint { - gcStates, err := s.srv.GetGCStateManager().GetGCState(keyspaceID) + gcStates, err := s.srv.GetGCStateManager().GetGCState(keyspaceID, false) re.NoError(err) for _, b := range gcStates.GCBarriers { if b.BarrierID == serviceID { @@ -2454,6 +2461,12 @@ func (s *clientStatefulTestSuite) TestGCBarriers() { re.Equal("b1", b.BarrierID) re.Equal(uint64(10), b.BarrierTS) re.Equal(int64(math.MaxInt64), int64(b.TTL)) + // Test GetGCState's behavior of excluding GC barriers by default. + state, err := cli.GetGCState(ctx) + re.NoError(err) + re.False(state.HasGCBarriers()) + _, err = state.GetGCBarriers() + re.Error(err) s.checkGCBarrier(re, keyspaceID, "b1", 10) // Allows advancing to a value below the GC barrier. @@ -2706,21 +2719,31 @@ func (s *clientStatefulTestSuite) TestGetAllKeyspaceGCStates() { re.NoError(err) res, err := cli.GetAllKeyspacesGCStates(ctx) re.NoError(err) - re.Len(res.GlobalGCBarriers, 1) - re.Equal("b1", res.GlobalGCBarriers[0].BarrierID) - re.Equal(uint64(10), res.GlobalGCBarriers[0].BarrierTS) - re.Equal(time.Duration(math.MaxInt64), res.GlobalGCBarriers[0].TTL) + re.False(res.HasGlobalGCBarriers()) + _, err = res.GetGlobalGCBarriers() + re.Error(err) + + res, err = cli.GetAllKeyspacesGCStates(ctx, gc.ExcludeGlobalGCBarriers(false)) + re.NoError(err) + globalGCBarriers, err := res.GetGlobalGCBarriers() + re.NoError(err) + re.Len(globalGCBarriers, 1) + re.Equal("b1", globalGCBarriers[0].BarrierID) + re.Equal(uint64(10), globalGCBarriers[0].BarrierTS) + re.Equal(time.Duration(math.MaxInt64), globalGCBarriers[0].TTL) _, err = cli.SetGlobalGCBarrier(ctx, "b2", 12, 2*time.Second) re.NoError(err) - res, err = cli.GetAllKeyspacesGCStates(ctx) + res, err = cli.GetAllKeyspacesGCStates(ctx, gc.ExcludeGlobalGCBarriers(false)) + re.NoError(err) + globalGCBarriers, err = res.GetGlobalGCBarriers() re.NoError(err) - re.Len(res.GlobalGCBarriers, 2) - re.Equal("b2", res.GlobalGCBarriers[1].BarrierID) - re.Equal(uint64(12), res.GlobalGCBarriers[1].BarrierTS) + re.Len(globalGCBarriers, 2) + re.Equal("b2", globalGCBarriers[1].BarrierID) + re.Equal(uint64(12), globalGCBarriers[1].BarrierTS) // Returned TTL is rounded to seconds, so it can be exactly 1s here. - re.GreaterOrEqual(res.GlobalGCBarriers[1].TTL, time.Second) - re.LessOrEqual(res.GlobalGCBarriers[1].TTL, 2*time.Second) + re.GreaterOrEqual(globalGCBarriers[1].TTL, time.Second) + re.LessOrEqual(globalGCBarriers[1].TTL, 2*time.Second) cli1 := s.client.GetGCStatesClient(1) _, err = cli1.SetGCBarrier(ctx, "b3", 13, math.MaxInt64) @@ -2729,22 +2752,34 @@ func (s *clientStatefulTestSuite) TestGetAllKeyspaceGCStates() { re.NoError(err) state, ok := res.GCStates[1] re.True(ok) - re.Equal("b3", state.GCBarriers[0].BarrierID) - re.Equal(uint64(13), state.GCBarriers[0].BarrierTS) - re.Equal(time.Duration(math.MaxInt64), state.GCBarriers[0].TTL) + re.False(state.HasGCBarriers()) + _, err = state.GetGCBarriers() + re.Error(err) + + res, err = cli.GetAllKeyspacesGCStates(ctx, gc.ExcludeGCBarriers(false), gc.ExcludeGlobalGCBarriers(false)) + re.NoError(err) + state, ok = res.GCStates[1] + re.True(ok) + gcBarriers, err := state.GetGCBarriers() + re.NoError(err) + re.Equal("b3", gcBarriers[0].BarrierID) + re.Equal(uint64(13), gcBarriers[0].BarrierTS) + re.Equal(time.Duration(math.MaxInt64), gcBarriers[0].TTL) cli2 := s.client.GetGCStatesClient(2) _, err = cli2.SetGCBarrier(ctx, "b4", 14, 3*time.Second) re.NoError(err) - res, err = cli.GetAllKeyspacesGCStates(ctx) + res, err = cli.GetAllKeyspacesGCStates(ctx, gc.ExcludeGCBarriers(false), gc.ExcludeGlobalGCBarriers(false)) re.NoError(err) state, ok = res.GCStates[2] re.True(ok) - re.Equal("b4", state.GCBarriers[0].BarrierID) - re.Equal(uint64(14), state.GCBarriers[0].BarrierTS) + gcBarriers, err = state.GetGCBarriers() + re.NoError(err) + re.Equal("b4", gcBarriers[0].BarrierID) + re.Equal(uint64(14), gcBarriers[0].BarrierTS) // Returned TTL is rounded to seconds, so it can be exactly 2s here. - re.GreaterOrEqual(state.GCBarriers[0].TTL, 2*time.Second) - re.LessOrEqual(state.GCBarriers[0].TTL, 3*time.Second) + re.GreaterOrEqual(gcBarriers[0].TTL, 2*time.Second) + re.LessOrEqual(gcBarriers[0].TTL, 3*time.Second) } func TestDecodeHttpKeyRange(t *testing.T) { diff --git a/tests/integrations/go.mod b/tests/integrations/go.mod index f4a28d2de92..d7e9bbe9486 100644 --- a/tests/integrations/go.mod +++ b/tests/integrations/go.mod @@ -14,7 +14,7 @@ require ( github.com/go-sql-driver/mysql v1.7.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 - github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359 + github.com/pingcap/kvproto v0.0.0-20260514102340-daa7c864b473 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/prometheus/client_golang v1.20.5 github.com/prometheus/client_model v0.6.1 diff --git a/tests/integrations/go.sum b/tests/integrations/go.sum index 002a419b646..77db71e9ab9 100644 --- a/tests/integrations/go.sum +++ b/tests/integrations/go.sum @@ -473,8 +473,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359 h1:oteLtLuoWZN3uvfH836U0IIJ+s3UOk11q7GaQ0Tk+wc= -github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE= +github.com/pingcap/kvproto v0.0.0-20260514102340-daa7c864b473 h1:n6QWAac97mv2NJhn17iFPFnsE5fMgtPLNmsGZeqq78o= +github.com/pingcap/kvproto v0.0.0-20260514102340-daa7c864b473/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/tests/server/api/service_gc_safepoint_test.go b/tests/server/api/service_gc_safepoint_test.go index fcf839d07b0..6d9c3732b80 100644 --- a/tests/server/api/service_gc_safepoint_test.go +++ b/tests/server/api/service_gc_safepoint_test.go @@ -115,10 +115,13 @@ func (suite *serviceGCSafepointTestSuite) checkServiceGCSafepoint(cluster *tests re.NoError(err) re.Equal(list, listResp) + // The following delete bypasses GCStateManager and writes storage directly. + // Subsequent reads through both the manager and the public HTTP endpoint + // should still reflect the deletion. err = testutil.CheckDelete(tests.TestDialClient, sspURL+"/a", testutil.StatusOK(re)) re.NoError(err) - state, err := gcStateManager.GetGCState(constant.NullKeyspaceID) + state, err := gcStateManager.GetGCState(constant.NullKeyspaceID, false) re.NoError(err) left := state.GCBarriers leftSsps := make([]*endpoint.ServiceSafePoint, 0, len(left)) @@ -127,4 +130,18 @@ func (suite *serviceGCSafepointTestSuite) checkServiceGCSafepoint(cluster *tests } // Exclude the gc_worker as it's not included in GetGCState's result. re.Equal(list.ServiceGCSafepoints[1:3], leftSsps) + + resAfterDelete, err := tests.TestDialClient.Get(sspURL) + re.NoError(err) + defer resAfterDelete.Body.Close() + listRespAfterDelete := &api.ListServiceGCSafepoint{} + err = apiutil.ReadJSON(resAfterDelete.Body, listRespAfterDelete) + re.NoError(err) + // Also verify the public HTTP view, not only the direct GCStateManager read. + expectedAfterDelete := &api.ListServiceGCSafepoint{ + ServiceGCSafepoints: list.ServiceGCSafepoints[1:], + GCSafePoint: list.GCSafePoint, + MinServiceGcSafepoint: list.MinServiceGcSafepoint, + } + re.Equal(expectedAfterDelete, listRespAfterDelete) } diff --git a/tests/server/gc/gc_test.go b/tests/server/gc/gc_test.go index 3b261a205c8..e18d09c927c 100644 --- a/tests/server/gc/gc_test.go +++ b/tests/server/gc/gc_test.go @@ -18,12 +18,14 @@ import ( "context" "math" "slices" + "sync" "testing" "time" "github.com/stretchr/testify/require" "go.uber.org/goleak" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/pkg/keyspace" @@ -41,6 +43,95 @@ func TestMain(m *testing.M) { goleak.VerifyTestMain(m, testutil.LeakOptions...) } +const ( + getGCStateBeforeSecondClusterCheckFailpoint = "github.com/tikv/pd/server/getGCStateBeforeSecondClusterCheck" + getGCStateBeforeSlowPathFailpoint = "github.com/tikv/pd/pkg/gc/getGCStateBeforeSlowPath" + skipCampaignLeaderCheckFailpoint = "github.com/tikv/pd/pkg/member/skipCampaignLeaderCheck" +) + +func newGCStateLeaderTransitionCluster(t *testing.T) (*tests.TestCluster, *pdpb.GetGCStateRequest, func()) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + + // These tests need two PDs so the request can target a concrete old leader + // while another node takes over leadership. + cluster, err := tests.NewTestCluster(ctx, 2, func(conf *config.Config, _ string) { + conf.Keyspace.WaitRegionSplit = false + }) + re.NoError(err) + re.NoError(cluster.RunInitialServers()) + re.NotEmpty(cluster.WaitLeader()) + re.NoError(failpoint.Enable(skipCampaignLeaderCheckFailpoint, "return(true)")) + + leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + re.NoError(leaderServer.BootstrapCluster()) + + req := &pdpb.GetGCStateRequest{ + Header: testutil.NewRequestHeader(leaderServer.GetClusterID()), + KeyspaceScope: &pdpb.KeyspaceScope{KeyspaceId: constant.NullKeyspaceID}, + } + cleanup := func() { + re.NoError(failpoint.Disable(skipCampaignLeaderCheckFailpoint)) + cancel() + cluster.Destroy() + } + return cluster, req, cleanup +} + +func getGCStateFromAddr(ctx context.Context, re *require.Assertions, addr string, req *pdpb.GetGCStateRequest) (*pdpb.GetGCStateResponse, error) { + grpcPDClient, conn := testutil.MustNewGrpcClient(re, addr) + defer conn.Close() + return grpcPDClient.GetGCState(ctx, req) +} + +type blockingFailpoint struct { + name string + reached chan struct{} + release chan struct{} + reachedOnce sync.Once + releaseOnce sync.Once + disableOnce sync.Once +} + +func enableBlockingFailpoint(re *require.Assertions, name string) *blockingFailpoint { + point := &blockingFailpoint{ + name: name, + reached: make(chan struct{}), + release: make(chan struct{}), + } + re.NoError(failpoint.EnableCall(name, func() { + point.reachedOnce.Do(func() { + close(point.reached) + }) + <-point.release + })) + return point +} + +func (p *blockingFailpoint) wait(re *require.Assertions, description string) { + // Always use a bounded wait: if a future refactor moves/removes the target + // failpoint, the test should fail quickly instead of hanging until the + // request context times out. + select { + case <-p.reached: + case <-time.After(5 * time.Second): + re.FailNow("GetGCState did not reach the failpoint", description) + } +} + +func (p *blockingFailpoint) releaseAndDisable(re *require.Assertions) { + // Tests may release explicitly and still run deferred cleanup. Make both + // operations idempotent so failure paths do not panic on double close or + // double disable. + p.releaseOnce.Do(func() { + close(p.release) + }) + p.disableOnce.Do(func() { + re.NoError(failpoint.Disable(p.name)) + }) +} + func TestGCOperations(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) @@ -217,6 +308,13 @@ func TestGCOperations(t *testing.T) { re.Equal(uint64(15), resp.GetGcState().GetGcBarriers()[0].GetBarrierTs()) re.Greater(resp.GetGcState().GetGcBarriers()[0].GetTtlSeconds(), int64(3500)) re.Less(resp.GetGcState().GetGcBarriers()[0].GetTtlSeconds(), int64(3601)) + + req.ExcludeGcBarriers = true + resp, err = grpcPDClient.GetGCState(ctx, req) + re.NoError(err) + re.NotNil(resp.Header) + re.Nil(resp.Header.Error) + re.Empty(resp.GetGcState().GetGcBarriers()) } } @@ -267,6 +365,15 @@ func TestGCOperations(t *testing.T) { re.Less(gcState.GetGcBarriers()[0].GetTtlSeconds(), int64(3601)) } + req.ExcludeGcBarriers = true + resp, err = grpcPDClient.GetAllKeyspacesGCStates(ctx, req) + re.NoError(err) + re.NotNil(resp.Header) + re.Nil(resp.Header.Error) + for _, gcState := range resp.GetGcStates() { + re.Empty(gcState.GetGcBarriers()) + } + // Global GC Barrier API for _, keyspaceID := range []uint32{constant.NullKeyspaceID, ks1.Id} { // Cleanup before test @@ -322,6 +429,18 @@ func TestGCOperations(t *testing.T) { re.Equal(math.MaxInt64, int(resp.GetNewBarrierInfo().GetTtlSeconds())) } + { + req := &pdpb.GetAllKeyspacesGCStatesRequest{ + Header: header, + ExcludeGlobalGcBarriers: true, + } + resp, err := grpcPDClient.GetAllKeyspacesGCStates(ctx, req) + re.NoError(err) + re.NotNil(resp.Header) + re.Nil(resp.Header.Error) + re.Empty(resp.GetGlobalGcBarriers()) + } + { // Failed to set a global GC barrier (below txn safe point) req := &pdpb.SetGlobalGCBarrierRequest{ @@ -386,3 +505,251 @@ func TestGCOperations(t *testing.T) { re.Contains(resp.GetBlockerDescription(), "b2") } } + +func TestGetGCStateRejectsOldLeaderAfterTransfer(t *testing.T) { + re := require.New(t) + cluster, req, cleanup := newGCStateLeaderTransitionCluster(t) + defer cleanup() + + oldLeader := cluster.GetLeader() + re.NotEmpty(oldLeader) + oldLeaderServer := cluster.GetServer(oldLeader) + re.NotNil(oldLeaderServer) + + // Once the cluster agrees on a new PD leader, the old leader should reject a + // direct GetGCState request at the normal gRPC role check, regardless of any + // local GC state cache it may have had. + re.NoError(oldLeaderServer.ResignLeaderWithRetry()) + newLeader := cluster.WaitLeader() + re.NotEmpty(newLeader) + re.NotEqual(oldLeader, newLeader) + + _, err := getGCStateFromAddr(context.Background(), re, oldLeaderServer.GetAddr(), req) + re.ErrorContains(err, "not leader") +} + +func TestGetGCStateFailsIfLeaderLostBeforeReply(t *testing.T) { + re := require.New(t) + cluster, req, cleanup := newGCStateLeaderTransitionCluster(t) + defer cleanup() + + leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + req.ExcludeGcBarriers = true + + // Warm the local cache first so the request is guaranteed to pass the manager + // quickly and block only on the server-side recheck window. + _, err := leaderServer.GetServer().GetGCStateManager().AdvanceTxnSafePoint(constant.NullKeyspaceID, 10, time.Now()) + re.NoError(err) + resp, err := getGCStateFromAddr(context.Background(), re, leaderServer.GetAddr(), req) + re.NoError(err) + re.Nil(resp.GetHeader().GetError()) + + // Block after GCStateManager has returned but before GetGCState's second + // cluster check. This isolates the exact window where the handler has a + // state result but must still avoid replying as a non-leader. + point := enableBlockingFailpoint(re, getGCStateBeforeSecondClusterCheckFailpoint) + defer point.releaseAndDisable(re) + + grpcPDClient, conn := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + defer conn.Close() + + type result struct { + resp *pdpb.GetGCStateResponse + err error + } + resultCh := make(chan result, 1) + reqCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + go func() { + resp, err := grpcPDClient.GetGCState(reqCtx, req) + resultCh <- result{resp: resp, err: err} + }() + + point.wait(re, "before the second cluster check") + oldLeader := leaderServer.GetConfig().Name + // The old leader does not recover before the blocked request continues, so + // the final cluster check should translate the leadership loss into a + // NOT_BOOTSTRAPPED response header. + re.NoError(leaderServer.ResignLeaderWithRetry()) + newLeader := cluster.WaitLeader() + re.NotEmpty(newLeader) + re.NotEqual(oldLeader, newLeader) + point.releaseAndDisable(re) + + res := <-resultCh + re.NoError(res.err) + re.NotNil(res.resp.GetHeader().GetError()) + re.Equal(pdpb.ErrorType_NOT_BOOTSTRAPPED, res.resp.GetHeader().GetError().GetType()) +} + +func TestGetGCStateReturnsCachedStateAfterLeadershipRecovery(t *testing.T) { + re := require.New(t) + cluster, req, cleanup := newGCStateLeaderTransitionCluster(t) + defer cleanup() + + leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + oldLeader := leaderServer.GetConfig().Name + req.ExcludeGcBarriers = true + + _, err := leaderServer.GetServer().GetGCStateManager().AdvanceTxnSafePoint(constant.NullKeyspaceID, 10, time.Now()) + re.NoError(err) + resp, err := getGCStateFromAddr(context.Background(), re, leaderServer.GetAddr(), req) + re.NoError(err) + re.Nil(resp.GetHeader().GetError()) + re.Equal(uint64(10), resp.GetGcState().GetTxnSafePoint()) + + // This test pins the current cache-hit behavior; it does not claim that this + // is the ideal long-term contract. Once the request has already obtained the + // cached GC state before leadership churn, regaining leadership before the + // final response check currently lets it return that earlier cached value. + // If GetGCState is intentionally tightened in the future, update this test + // together with the corresponding semantics documentation. + point := enableBlockingFailpoint(re, getGCStateBeforeSecondClusterCheckFailpoint) + defer point.releaseAndDisable(re) + + grpcPDClient, conn := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + defer conn.Close() + + type result struct { + resp *pdpb.GetGCStateResponse + err error + } + resultCh := make(chan result, 1) + reqCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + go func() { + resp, err := grpcPDClient.GetGCState(reqCtx, req) + resultCh <- result{resp: resp, err: err} + }() + + point.wait(re, "before the second cluster check") + re.NoError(leaderServer.ResignLeaderWithRetry()) + newLeader := cluster.WaitLeader() + re.NotEmpty(newLeader) + re.NotEqual(oldLeader, newLeader) + + // Let the temporary new leader advance the persisted state. The blocked + // request should still return its already-read cached value after the old + // leader regains leadership, while a later fresh request should see this + // newer value. + _, err = cluster.GetLeaderServer().GetServer().GetGCStateManager().AdvanceTxnSafePoint(constant.NullKeyspaceID, 20, time.Now()) + re.NoError(err) + + re.NoError(cluster.GetServer(newLeader).ResignLeaderWithRetry()) + re.Equal(oldLeader, cluster.WaitLeader()) + + point.releaseAndDisable(re) + res := <-resultCh + re.NoError(res.err) + re.Nil(res.resp.GetHeader().GetError()) + re.Equal(uint64(10), res.resp.GetGcState().GetTxnSafePoint()) + + freshResp, err := getGCStateFromAddr(context.Background(), re, cluster.GetServer(oldLeader).GetAddr(), req) + re.NoError(err) + re.Nil(freshResp.GetHeader().GetError()) + re.Equal(uint64(20), freshResp.GetGcState().GetTxnSafePoint()) +} + +func TestGetGCStateSlowPathFailsIfLeaderLostBeforeRead(t *testing.T) { + re := require.New(t) + cluster, req, cleanup := newGCStateLeaderTransitionCluster(t) + defer cleanup() + + leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + oldLeader := leaderServer.GetConfig().Name + req.ExcludeGcBarriers = true + + // Unlike the cache-hit cases above, this request starts with no local cache. + // It will pass the first leader-gated cache check and then block before the + // slow path reads storage. + point := enableBlockingFailpoint(re, getGCStateBeforeSlowPathFailpoint) + defer point.releaseAndDisable(re) + + grpcPDClient, conn := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + defer conn.Close() + + type result struct { + resp *pdpb.GetGCStateResponse + err error + } + resultCh := make(chan result, 1) + reqCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + go func() { + resp, err := grpcPDClient.GetGCState(reqCtx, req) + resultCh <- result{resp: resp, err: err} + }() + + // This request has no cache. It already passed the first leader-gated cache + // check, then waits before serializing the slow path and reading storage. + point.wait(re, "before the no-cache slow path reads from storage") + re.NoError(leaderServer.ResignLeaderWithRetry()) + newLeader := cluster.WaitLeader() + re.NotEmpty(newLeader) + re.NotEqual(oldLeader, newLeader) + + // The old leader is still not running the cluster when the slow path resumes. + // It may finish reading storage, but the handler's final cluster check must + // reject the response. + point.releaseAndDisable(re) + res := <-resultCh + re.NoError(res.err) + re.NotNil(res.resp.GetHeader().GetError()) + re.Equal(pdpb.ErrorType_NOT_BOOTSTRAPPED, res.resp.GetHeader().GetError().GetType()) +} + +func TestGetGCStateSlowPathReadsLatestStateAfterLeadershipRecovery(t *testing.T) { + re := require.New(t) + cluster, req, cleanup := newGCStateLeaderTransitionCluster(t) + defer cleanup() + + leaderServer := cluster.GetLeaderServer() + re.NotNil(leaderServer) + oldLeader := leaderServer.GetConfig().Name + req.ExcludeGcBarriers = true + + // Start with no local cache and stop immediately before the slow path. This + // ensures the request has not read storage yet, so it can observe updates + // made by the temporary new leader before the old leader resumes. + point := enableBlockingFailpoint(re, getGCStateBeforeSlowPathFailpoint) + defer point.releaseAndDisable(re) + + grpcPDClient, conn := testutil.MustNewGrpcClient(re, leaderServer.GetAddr()) + defer conn.Close() + + type result struct { + resp *pdpb.GetGCStateResponse + err error + } + resultCh := make(chan result, 1) + reqCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + go func() { + resp, err := grpcPDClient.GetGCState(reqCtx, req) + resultCh <- result{resp: resp, err: err} + }() + + point.wait(re, "before the no-cache slow path reads from storage") + re.NoError(leaderServer.ResignLeaderWithRetry()) + newLeader := cluster.WaitLeader() + re.NotEmpty(newLeader) + re.NotEqual(oldLeader, newLeader) + + // The new leader writes a newer GC state while the old leader's request is + // blocked. Because the old request has not entered RunInGCStateTransaction + // yet, it should read this latest value after leadership returns. + _, err := cluster.GetLeaderServer().GetServer().GetGCStateManager().AdvanceTxnSafePoint(constant.NullKeyspaceID, 20, time.Now()) + re.NoError(err) + + re.NoError(cluster.GetServer(newLeader).ResignLeaderWithRetry()) + re.Equal(oldLeader, cluster.WaitLeader()) + + point.releaseAndDisable(re) + res := <-resultCh + re.NoError(res.err) + re.Nil(res.resp.GetHeader().GetError()) + re.Equal(uint64(20), res.resp.GetGcState().GetTxnSafePoint()) +} diff --git a/tools/go.mod b/tools/go.mod index 7571bba04da..5ce52bac8c0 100644 --- a/tools/go.mod +++ b/tools/go.mod @@ -23,7 +23,7 @@ require ( github.com/mattn/go-shellwords v1.0.12 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 - github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359 + github.com/pingcap/kvproto v0.0.0-20260514102340-daa7c864b473 github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 github.com/pmezard/go-difflib v1.0.0 github.com/prometheus/client_golang v1.20.5 diff --git a/tools/go.sum b/tools/go.sum index df33be33529..a4a08b87464 100644 --- a/tools/go.sum +++ b/tools/go.sum @@ -478,8 +478,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE= github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= -github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359 h1:oteLtLuoWZN3uvfH836U0IIJ+s3UOk11q7GaQ0Tk+wc= -github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE= +github.com/pingcap/kvproto v0.0.0-20260514102340-daa7c864b473 h1:n6QWAac97mv2NJhn17iFPFnsE5fMgtPLNmsGZeqq78o= +github.com/pingcap/kvproto v0.0.0-20260514102340-daa7c864b473/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw= github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=