Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 81 additions & 6 deletions pkg/schedule/checker/affinity_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package checker
import (
"bytes"
"context"
"math/rand"
"slices"
"time"

Expand All @@ -34,6 +35,7 @@ import (
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
"github.com/tikv/pd/pkg/schedule/types"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/utils/logutil"
)

Expand All @@ -44,20 +46,24 @@ const recentMergeTTL = time.Minute
type AffinityChecker struct {
PauseController
cluster sche.CheckerCluster
ruleManager *placement.RuleManager
affinityManager *affinity.Manager
conf config.CheckerConfigProvider
recentMergeCache *cache.TTLUint64
startTime time.Time
r *rand.Rand
}

// NewAffinityChecker create an affinity checker.
func NewAffinityChecker(ctx context.Context, cluster sche.CheckerCluster, conf config.CheckerConfigProvider) *AffinityChecker {
return &AffinityChecker{
cluster: cluster,
ruleManager: cluster.GetRuleManager(),
affinityManager: cluster.GetAffinityManager(),
conf: conf,
recentMergeCache: cache.NewIDTTL(ctx, gcInterval, recentMergeTTL),
startTime: time.Now(),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

Expand All @@ -83,6 +89,10 @@ func (c *AffinityChecker) Check(region *core.RegionInfo) []*operator.Operator {
affinityCheckerPausedCounter.Inc()
return nil
}
if !c.cluster.GetSharedConfig().IsPlacementRulesEnabled() {
affinityCheckerPlacementRulesDisabledCounter.Inc()
return nil
}

// Check region state
if region.GetLeader() == nil {
Expand All @@ -93,15 +103,21 @@ func (c *AffinityChecker) Check(region *core.RegionInfo) []*operator.Operator {
affinityCheckerUnhealthyRegionCounter.Inc()
return nil
}
if !filter.IsRegionReplicated(c.cluster, region) {
affinityCheckerAbnormalReplicaCounter.Inc()

if !c.hasAffinityGroups() {
return nil
}

// Get the affinity group for this region
// Check the affinity group before the heavier best-location gate. Most regions
// are not affinity regions.
group, isAffinity := c.affinityManager.GetAndCacheRegionAffinityGroupState(region)
if group == nil {
// Region doesn't belong to any affinity group
// Region doesn't belong to any affinity group.
return nil
}

if !c.isRegionPlacementRuleSatisfiedWithBestLocation(region, true /* isExistingRegion */) {
affinityCheckerAbnormalReplicaCounter.Inc()
return nil
}

Expand All @@ -115,8 +131,9 @@ func (c *AffinityChecker) Check(region *core.RegionInfo) []*operator.Operator {
// so expire the group first, then provide the available Region information and fetch the Group state again.
if !isAffinity {
targetRegion := cloneRegionWithReplacePeerStores(region, group.LeaderStoreID, group.VoterStoreIDs...)
if targetRegion == nil || !filter.IsRegionReplicated(c.cluster, targetRegion) {
if targetRegion == nil || !c.isRegionPlacementRuleSatisfiedWithBestLocation(targetRegion, false /* isExistingRegion */) {
c.affinityManager.ExpireAffinityGroup(group.ID)
group = c.affinityManager.GetAffinityGroupState(group.ID)
needRefetch = true
}
}
Expand Down Expand Up @@ -383,7 +400,7 @@ func (c *AffinityChecker) checkAffinityMergeTarget(region, adjacent *core.Region
return false
}

if !filter.IsRegionReplicated(c.cluster, adjacent) {
if !c.isRegionPlacementRuleSatisfiedWithBestLocation(adjacent, true /* isExistingRegion */) {
affinityMergeCheckerAdjAbnormalReplicaCounter.Inc()
return false
}
Expand Down Expand Up @@ -493,3 +510,61 @@ func (c *AffinityChecker) RecordOpSuccess(op *operator.Operator) {
c.recentMergeCache.PutWithTTL(op.RegionID(), nil, recentMergeTTL)
c.recentMergeCache.PutWithTTL(relatedID, nil, recentMergeTTL)
}

// isRegionPlacementRuleSatisfiedWithBestLocation is an affinity-specific
// scheduling gate. Besides requiring the region to satisfy placement rules, it
// also requires that affinity scheduling cannot find a strictly better location
// under the current topology and that the matched rule's isolation level is
// satisfied. This is intentionally stricter than filter.IsRegionReplicated and
// should not be used as a general replicated-state check.
func (c *AffinityChecker) isRegionPlacementRuleSatisfiedWithBestLocation(region *core.RegionInfo, isExistingRegion bool) bool {
// Get the RegionFit for the given Region. If the Region is not an existing Region but a virtual target state,
// use FitRegionWithoutCache to bypass the cache.
var fit *placement.RegionFit
if isExistingRegion {
fit = c.ruleManager.FitRegion(c.cluster, region)
} else {
fit = c.ruleManager.FitRegionWithoutCache(c.cluster, region)
}

// Check region is satisfied
if fit == nil || !fit.IsSatisfied() {
return false
}

// Check whether all peers covered by the rules are at the best isolation level.
// This logic is based on `RuleChecker.fixBetterLocation`.
for _, rf := range fit.RuleFits {
if len(rf.Rule.LocationLabels) == 0 {
continue
}
isWitness := rf.Rule.IsWitness && isWitnessEnabled(c.cluster)
// If the peer to be moved is a witness, since no snapshot is needed, we also reuse the fast failover logic.
strategy := c.strategy(c.r, region, rf.Rule, isWitness)
_, newStoreID, filterByTempState := strategy.getBetterLocation(c.cluster, region, fit, rf)
// filterByTempState being true means a better placement exists but is temporarily unschedulable.
// This is also considered not satisfied.
if newStoreID != 0 || filterByTempState {
return false
}
// If the isolation level does not meet the requirement, it is also considered not to be at the best location.
if !statistics.IsRegionLabelIsolationSatisfied(rf.Stores, rf.Rule.LocationLabels, rf.Rule.IsolationLevel) {
return false
}
}

return true
}

func (c *AffinityChecker) strategy(r *rand.Rand, region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy {
return &ReplicaStrategy{
checkerName: c.Name(),
cluster: c.cluster,
isolationLevel: rule.IsolationLevel,
locationLabels: rule.LocationLabels,
region: region,
extraFilters: []filter.Filter{filter.NewLabelConstraintFilter(c.Name(), rule.LabelConstraints)},
fastFailover: fastFailover,
r: r,
}
}
158 changes: 150 additions & 8 deletions pkg/schedule/checker/affinity_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/schedule/affinity"
scheconfig "github.com/tikv/pd/pkg/schedule/config"
sche "github.com/tikv/pd/pkg/schedule/core"
"github.com/tikv/pd/pkg/schedule/filter"
"github.com/tikv/pd/pkg/schedule/labeler"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/placement"
Expand Down Expand Up @@ -2014,9 +2015,9 @@ func TestAffinityCheckerGroupScheduleDisallowed(t *testing.T) {
re.Nil(ops, "Affinity scheduling should be blocked when group is not allowed")
}

// TestAffinityCheckerExpireGroupWhenPlacementRuleMismatch verifies that when the affinity group peers
// don't satisfy placement rules, the group will be expired and scheduling is disabled.
func TestAffinityCheckerExpireGroupWhenPlacementRuleMismatch(t *testing.T) {
// TestAffinityCheckerNormalizeGroupWhenPlacementRuleMismatch verifies that when the affinity group peers
// don't satisfy placement rules, the checker keeps the group schedulable by normalizing it to the valid Region peers.
func TestAffinityCheckerNormalizeGroupWhenPlacementRuleMismatch(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -2037,8 +2038,8 @@ func TestAffinityCheckerExpireGroupWhenPlacementRuleMismatch(t *testing.T) {
affinityManager := tc.GetAffinityManager()
checker := newTestAffinityChecker(ctx, tc, opt)

// Affinity group expects 4 voters while placement rules (and the region) only allow 3,
// so isGroupReplicated should be false and the group should be expired/refreshed.
// Affinity group expects 4 voters while placement rules (and the region) only allow 3.
// The checker should normalize the group to the valid Region peers and keep it schedulable.
group := &affinity.Group{
ID: "test_group",
LeaderStoreID: 1,
Expand All @@ -2052,9 +2053,9 @@ func TestAffinityCheckerExpireGroupWhenPlacementRuleMismatch(t *testing.T) {

groupState := affinityManager.GetAffinityGroupState("test_group")
re.NotNil(groupState)
re.False(groupState.AffinitySchedulingAllowed, "Group should be expired when peer count violates placement rules")
re.Equal(affinity.PhasePending, groupState.Phase)
re.Equal([]uint64{1, 2, 3, 4}, groupState.VoterStoreIDs, "Peers remain as configured until a valid available region is observed")
re.True(groupState.AffinitySchedulingAllowed)
re.Equal(affinity.PhaseStable, groupState.Phase)
re.Equal([]uint64{1, 2, 3}, groupState.VoterStoreIDs)
}

// TestAffinityCheckerTargetStoreEvictLeader tests that operator is not created when target store has evict-leader.
Expand Down Expand Up @@ -2124,6 +2125,147 @@ func TestAffinityCheckerTargetStoreRejectLeader(t *testing.T) {
re.Nil(ops, "Should not create operator when target store has reject-leader label")
}

// TestAffinityCheckerRegionHasBetterLocation tests how the checker handles a Region where isRegionPlacementRuleSatisfiedWithBestLocation returns false.
func TestAffinityCheckerRegionHasBetterLocation(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opt := newAffinityTestOptions()
opt.SetLocationLabels([]string{"region", "zone", "host"})
tc := mockcluster.NewCluster(ctx, opt)

tc.AddLabelsStore(1, 0, map[string]string{"region": "r1", "zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 0, map[string]string{"region": "r1", "zone": "z1", "host": "h1"})
tc.AddLabelsStore(3, 0, map[string]string{"region": "r1", "zone": "z1", "host": "h2"})
tc.AddLabelsStore(4, 0, map[string]string{"region": "r1", "zone": "z2", "host": "h1"})
tc.AddLabelsStore(5, 0, map[string]string{"region": "r1", "zone": "z3", "host": "h1"})
tc.AddLabelsStore(6, 0, map[string]string{"region": "r1", "zone": "z3", "host": "h2"})

affinityManager := tc.GetAffinityManager()
checker := newTestAffinityChecker(ctx, tc, opt)

// Create affinity group without best location
group := &affinity.Group{
ID: "test_group",
LeaderStoreID: 2,
VoterStoreIDs: []uint64{1, 2, 3},
}
err := createAffinityGroupForTest(affinityManager, group, []byte(""), []byte(""))
re.NoError(err)

// No scheduling is generated because the source Region is not at the best location.
tc.AddLeaderRegion(100, 1, 3, 4)
region := tc.GetRegion(100)
re.False(checker.isRegionPlacementRuleSatisfiedWithBestLocation(region, true))
ops := checker.Check(region)
re.Nil(ops)
groupState := affinityManager.GetAffinityGroupState("test_group")
re.NotNil(groupState)
re.Equal([]uint64{1, 2, 3}, groupState.VoterStoreIDs)

// Because the Group’s currently specified Peers are not at the best location,
// they are cleared and replaced with the source Region’s Peers.
// In this case, no scheduling is generated, but the Peers are updated.
tc.AddLeaderRegion(200, 1, 4, 5)
region = tc.GetRegion(200)
re.True(checker.isRegionPlacementRuleSatisfiedWithBestLocation(region, false))
ops = checker.Check(region)
re.Nil(ops)
groupState = affinityManager.GetAffinityGroupState("test_group")
re.NotNil(groupState)
re.Equal([]uint64{1, 4, 5}, groupState.VoterStoreIDs)

// When both the source Region and the target Region are at the best location, scheduling is generated.
tc.AddLeaderRegion(300, 1, 4, 6)
region = tc.GetRegion(300)
re.True(checker.isRegionPlacementRuleSatisfiedWithBestLocation(region, true))
ops = checker.Check(region)
re.NotNil(ops)
re.Len(ops, 1)

// No scheduling is generated when Placement Rules are disabled.
tc.SetEnablePlacementRules(false)
ops = checker.Check(region)
re.Nil(ops)

// If an IsolationLevel is configured, no scheduling is generated when the required isolation level cannot be met.
tc.SetEnablePlacementRules(true)
rule := tc.GetRuleManager().GetRule(placement.DefaultGroupID, placement.DefaultRuleID)
re.NotNil(rule)
rule.IsolationLevel = "region"
re.NoError(tc.GetRuleManager().SetRule(rule))
ops = checker.Check(region)
re.Nil(ops)
}

func TestAffinityBestLocationFilteredByTemporarySourceState(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opt := newAffinityTestOptions()
opt.SetLocationLabels([]string{"region", "zone", "host"})
tc := mockcluster.NewCluster(ctx, opt)

tc.AddLabelsStore(1, 0, map[string]string{"region": "r1", "zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 0, map[string]string{"region": "r1", "zone": "z1", "host": "h1"})
tc.AddLabelsStore(3, 0, map[string]string{"region": "r1", "zone": "z1", "host": "h2"})
tc.AddLabelsStore(4, 0, map[string]string{"region": "r1", "zone": "z2", "host": "h1"})
tc.AddLabelsStore(5, 0, map[string]string{"region": "r1", "zone": "z3", "host": "h1"})
tc.AddLabelsStore(6, 0, map[string]string{"region": "r1", "zone": "z3", "host": "h2"})

checker := newTestAffinityChecker(ctx, tc, opt)
tc.AddLeaderRegion(1, 1, 3, 4)
region := tc.GetRegion(1)

fit := checker.ruleManager.FitRegion(tc, region)
re.NotNil(fit)
re.True(fit.IsSatisfied())
re.NotEmpty(fit.RuleFits)
strategy := checker.strategy(checker.r, region, fit.RuleFits[0].Rule, false)
oldStoreID, newStoreID, filterByTempState := strategy.getBetterLocation(tc, region, fit, fit.RuleFits[0])
re.NotZero(oldStoreID)
re.NotZero(newStoreID)
re.False(filterByTempState)

tc.SetStoreBusy(1, true)
tc.SetStoreBusy(3, true)
oldStoreID, newStoreID, filterByTempState = strategy.getBetterLocation(tc, region, fit, fit.RuleFits[0])
re.Zero(oldStoreID)
re.Zero(newStoreID)
re.True(filterByTempState)
re.False(checker.isRegionPlacementRuleSatisfiedWithBestLocation(region, true))
}

// TestAffinityStrictCheckDivergesFromIsRegionReplicated documents the intended
// semantic split: affinity scheduling may reject a region that is already
// placement-rule replicated when a strictly better location still exists.
func TestAffinityStrictCheckDivergesFromIsRegionReplicated(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opt := mockconfig.NewTestOptions()
opt.SetLocationLabels([]string{"zone", "host"})
tc := mockcluster.NewCluster(ctx, opt)
tc.SetEnablePlacementRules(true)
tc.SetMaxReplicas(3)

tc.AddLabelsStore(1, 0, map[string]string{"zone": "z1", "host": "h1"})
tc.AddLabelsStore(2, 0, map[string]string{"zone": "z1", "host": "h1"})
tc.AddLabelsStore(3, 0, map[string]string{"zone": "z2", "host": "h2"})
tc.AddLabelsStore(4, 0, map[string]string{"zone": "z3", "host": "h3"})

checker := newTestAffinityChecker(ctx, tc, opt)

tc.AddLeaderRegion(1, 1, 2, 3)
region := tc.GetRegion(1)

re.True(filter.IsRegionReplicated(tc, region))
re.False(checker.isRegionPlacementRuleSatisfiedWithBestLocation(region, true))
}

// TestAffinityMergeCheckPeerStoreMismatch tests that merge is rejected when peer stores don't match.
func TestAffinityMergeCheckPeerStoreMismatch(t *testing.T) {
re := require.New(t)
Expand Down
1 change: 1 addition & 0 deletions pkg/schedule/checker/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ var (

affinityCheckerCounter = checkerCounter.WithLabelValues(affinityChecker, "check")
affinityCheckerPausedCounter = checkerCounter.WithLabelValues(affinityChecker, "paused")
affinityCheckerPlacementRulesDisabledCounter = checkerCounter.WithLabelValues(affinityChecker, "placement-rules-disabled")
affinityCheckerRegionNoLeaderCounter = checkerCounter.WithLabelValues(affinityChecker, "region-no-leader")
affinityCheckerGroupSchedulingDisabledCounter = checkerCounter.WithLabelValues(affinityChecker, "group-scheduling-disabled")
affinityCheckerNewOpCounter = checkerCounter.WithLabelValues(affinityChecker, "new-operator")
Expand Down
Loading
Loading