Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config
func (c *Controller) PatrolRegions() {
c.patrolRegionContext.init(c.ctx)
c.patrolRegionContext.startPatrolRegionWorkers(c)
defer c.splitScatter.clearPendingSplitScatter()
defer c.patrolRegionContext.stop()
ticker := time.NewTicker(c.interval)
defer ticker.Stop()
Expand Down
74 changes: 69 additions & 5 deletions pkg/schedule/checker/split_scatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ type splitScatterController struct {
// pending maps a pending region ID to its latest split-scatter batch item.
// The item keeps its batch group so stale snapshots cannot mutate a newer
// pending entry for the same region.
pending map[uint64]splitScatterPendingItem
pending map[uint64]splitScatterPendingItem
nextDispatchAt time.Time
}

func newSplitScatterController(
Expand Down Expand Up @@ -111,17 +112,20 @@ func (c *splitScatterController) collectTopPendingSplitScatter(limit int) []spli
c.pendingMu.RUnlock()

candidates := make([]splitScatterPendingItem, 0, len(pendingSnapshot))
missingSnapshot := make([]splitScatterPendingItem, 0)
for _, pending := range pendingSnapshot {
if !pending.retryAt.IsZero() && now.Before(pending.retryAt) {
continue
}
regionID := pending.regionID
region := c.cluster.GetRegion(regionID)
if region == nil {
continue
}
if !pending.retryAt.IsZero() && now.Before(pending.retryAt) {
missingSnapshot = append(missingSnapshot, pending)
continue
}
sourceRegion := c.cluster.GetRegion(pending.sourceRegionID)
if sourceRegion == nil {
missingSnapshot = append(missingSnapshot, pending)
continue
}
sourceVersion := uint64(0)
Expand Down Expand Up @@ -162,6 +166,7 @@ func (c *splitScatterController) collectTopPendingSplitScatter(limit int) []spli
candidates = selected
c.pendingMu.Unlock()
}
c.delayMissingPendingSplitScatter(missingSnapshot, now)

if len(expiredSnapshot) > 0 {
attemptedExpiredCount := 0
Expand All @@ -188,6 +193,30 @@ func (c *splitScatterController) collectTopPendingSplitScatter(limit int) []spli
return candidates
}

func (c *splitScatterController) delayMissingPendingSplitScatter(missing []splitScatterPendingItem, now time.Time) {
if len(missing) == 0 {
return
}
missingCount := 0
c.pendingMu.Lock()
for _, expected := range missing {
pending, ok := c.pending[expected.regionID]
if !ok || pending.group != expected.group || !pending.expireAt.Equal(expected.expireAt) {
continue
}
if !pending.retryAt.IsZero() && now.Before(pending.retryAt) {
continue
}
pending.retryAt = now.Add(splitScatterRetryBackoff)
c.pending[expected.regionID] = pending
missingCount++
}
c.pendingMu.Unlock()
if missingCount > 0 {
splitScatterDispatchRegionMissingCounter.Add(float64(missingCount))
}
}

func (c *splitScatterController) delayPendingSplitScatter(expected splitScatterPendingItem) {
c.pendingMu.Lock()
defer c.pendingMu.Unlock()
Expand Down Expand Up @@ -250,6 +279,26 @@ func (c *splitScatterController) cleanupExpiredPendingSplitScatter() int {
return pendingCount
}

func (c *splitScatterController) clearPendingSplitScatter() {
c.pendingMu.Lock()
defer c.pendingMu.Unlock()
c.pending = make(map[uint64]splitScatterPendingItem)
c.updatePendingGaugeLocked()
c.nextDispatchAt = time.Time{}
}

func (c *splitScatterController) skipDispatchUntil(now time.Time) bool {
c.pendingMu.RLock()
defer c.pendingMu.RUnlock()
return !c.nextDispatchAt.IsZero() && now.Before(c.nextDispatchAt)
}

func (c *splitScatterController) delayNextDispatch(now time.Time) {
c.pendingMu.Lock()
defer c.pendingMu.Unlock()
c.nextDispatchAt = now.Add(splitScatterRetryBackoff)
}

func makeSplitScatterGroup(sourceRegionID, firstNewRegionID uint64) string {
return fmt.Sprintf("split-scatter-%d-%d", sourceRegionID, firstNewRegionID)
}
Expand All @@ -263,6 +312,9 @@ func (c *splitScatterController) recordSplitScatterBatch(sourceRegionID, sourceW
if len(newRegionIDs) == 0 {
return
}
if c.cluster.GetCheckerConfig().GetSplitScatterScheduleLimit() == 0 {
return
}
group := makeSplitScatterGroup(sourceRegionID, newRegionIDs[0])
expireAt := time.Now().Add(splitScatterPendingTTL)
if sourceWaitVersion == 0 {
Expand Down Expand Up @@ -317,27 +369,39 @@ func (c *splitScatterController) recordSplitScatterBatch(sourceRegionID, sourceW
expireAt: expireAt,
}
c.updatePendingGaugeLocked()
c.nextDispatchAt = time.Time{}
}

func (c *splitScatterController) dispatchSplitScatterRegions() {
now := time.Now()
if c.cleanupExpiredPendingSplitScatter() == 0 {
return
}
limit := c.cluster.GetCheckerConfig().GetSplitScatterScheduleLimit()
if limit == 0 {
splitScatterDispatchDisabledCounter.Inc()
c.clearPendingSplitScatter()
return
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
if c.skipDispatchUntil(now) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we needs to add new metrics recording why the split controller doesn't work?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the existing checker event metrics already cover the main reasons why split-scatter dispatch does not make progress. They are reported through pd_checker_event_count{type="split_scatter_checker", name=...}, including dispatch-disabled, dispatch-schedule-limit, dispatch-region-missing, dispatch-schedule-disabled, dispatch-not-fully-replicated, dispatch-scatter-failed, dispatch-store-limit, and dispatch-add-operator-failed. The skipDispatchUntil path is only the retry backoff after one of those reasons has already been recorded, so I do not add another metric for it here.

return
}
running := c.opController.OperatorCount(operator.OpSplitScatter)
if running >= limit {
splitScatterDispatchScheduleLimitCounter.Inc()
operator.IncOperatorLimitCounter(types.SplitScatterChecker, operator.OpSplitScatter)
c.delayNextDispatch(now)
return
}
dispatchLimit := int(limit - running)
// Dispatch sequentially so operators added for earlier pending items in this pass
// are visible to later ScatterInternal calls through the running-operator delta.
for _, pending := range c.collectTopPendingSplitScatter(dispatchLimit) {
pendingItems := c.collectTopPendingSplitScatter(dispatchLimit)
if len(pendingItems) == 0 {
c.delayNextDispatch(now)
return
}
for _, pending := range pendingItems {
region := c.cluster.GetRegion(pending.regionID)
if region == nil {
splitScatterDispatchRegionMissingCounter.Inc()
Expand Down
121 changes: 115 additions & 6 deletions pkg/schedule/checker/split_scatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ func (c *Controller) dispatchSplitScatterRegions() {
c.splitScatter.dispatchSplitScatterRegions()
}

func TestSplitScatterControllerCleanupResetsPendingGauge(t *testing.T) {
re := require.New(t)
splitScatterPendingGauge.Set(7)

controller, _, _, cleanup := newTestSplitScatterController(t)
cleanup()

re.Equal(0, splitScatterPendingCount(controller))
re.Equal(float64(0), promtestutil.ToFloat64(splitScatterPendingGauge))
}

func TestRecordSplitScatterBatchCollectsPendingRegions(t *testing.T) {
re := require.New(t)
controller, tc, _, cleanup := newTestSplitScatterController(t)
Expand Down Expand Up @@ -121,8 +132,10 @@ func TestDispatchSplitScatterKeepsPendingUntilSplitHeartbeat(t *testing.T) {

putSplitScatterRegion(tc, 101, "m", "", splitScatterReportedCPUUsage)

retrySplitScatterPendingAt(t, controller, 101, time.Now().Add(-time.Second))
re.Empty(controller.collectTopPendingSplitScatter(2))
advanceSplitScatterSourceVersion(t, tc)
setSplitScatterNextDispatchAt(t, controller, time.Now().Add(-time.Second))
re.ElementsMatch([]uint64{100, 101}, pendingRegionIDs(controller.collectTopPendingSplitScatter(2)))

controller.dispatchSplitScatterRegions()
Expand Down Expand Up @@ -151,6 +164,7 @@ func TestDispatchSplitScatterUsesRequestWaitVersionWhenCacheLags(t *testing.T) {
re.Equal(2, splitScatterPendingCount(controller))

advanceSplitScatterRegionVersion(t, tc, 100)
setSplitScatterNextDispatchAt(t, controller, time.Now().Add(-time.Second))
controller.dispatchSplitScatterRegions()

re.NotNil(oc.GetOperator(101))
Expand All @@ -161,17 +175,11 @@ func TestDispatchSplitScatterRespectsScheduleLimit(t *testing.T) {
controller, tc, oc, cleanup := newTestSplitScatterController(t)
defer cleanup()

tc.SetSplitScatterScheduleLimit(0)
controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101, 102})
putSplitScatterRegion(tc, 101, "m", "t", splitScatterReportedCPUUsage)
putSplitScatterRegion(tc, 102, "t", "", splitScatterReportedCPUUsage)
advanceSplitScatterSourceVersion(t, tc)

controller.dispatchSplitScatterRegions()

re.Empty(oc.GetOperators())
re.Equal(3, splitScatterPendingCount(controller))

tc.SetSplitScatterScheduleLimit(1)
controller.dispatchSplitScatterRegions()

Expand All @@ -183,6 +191,40 @@ func TestDispatchSplitScatterRespectsScheduleLimit(t *testing.T) {
re.Len(oc.GetOperators(), 1)
}

func TestRecordSplitScatterBatchSkipsWhenDisabled(t *testing.T) {
re := require.New(t)
controller, tc, _, cleanup := newTestSplitScatterController(t)
defer cleanup()

tc.SetSplitScatterScheduleLimit(0)

droppedBefore := promtestutil.ToFloat64(splitScatterPendingDroppedCounter)
controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101, 102})

re.Equal(0, splitScatterPendingCount(controller))
re.Equal(float64(0), promtestutil.ToFloat64(splitScatterPendingGauge))
re.Equal(float64(0), promtestutil.ToFloat64(splitScatterPendingDroppedCounter)-droppedBefore)
}

func TestDispatchSplitScatterClearsPendingWhenDisabled(t *testing.T) {
re := require.New(t)
controller, tc, oc, cleanup := newTestSplitScatterController(t)
defer cleanup()

controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101, 102})
re.Equal(3, splitScatterPendingCount(controller))

tc.SetSplitScatterScheduleLimit(0)
setSplitScatterNextDispatchAt(t, controller, time.Now().Add(splitScatterRetryBackoff))
disabledBefore := promtestutil.ToFloat64(splitScatterDispatchDisabledCounter)
controller.dispatchSplitScatterRegions()

re.Empty(oc.GetOperators())
re.Equal(0, splitScatterPendingCount(controller))
re.Equal(float64(0), promtestutil.ToFloat64(splitScatterPendingGauge))
re.Equal(float64(1), promtestutil.ToFloat64(splitScatterDispatchDisabledCounter)-disabledBefore)
}

func TestDispatchSplitScatterCleansExpiredPendingBeforeEarlyReturn(t *testing.T) {
testCases := []struct {
name string
Expand Down Expand Up @@ -236,6 +278,48 @@ func TestDispatchSplitScatterCleansExpiredPendingBeforeEarlyReturn(t *testing.T)
}
}

func TestCollectTopPendingDelaysMissingRegions(t *testing.T) {
re := require.New(t)
controller, _, _, cleanup := newTestSplitScatterController(t)
defer cleanup()

controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101})

missingBefore := promtestutil.ToFloat64(splitScatterDispatchRegionMissingCounter)
re.Empty(controller.collectTopPendingSplitScatter(2))

re.Equal(float64(1), promtestutil.ToFloat64(splitScatterDispatchRegionMissingCounter)-missingBefore)
pending := splitScatterPending(t, controller, 101)
re.True(pending.retryAt.After(time.Now()))

re.Empty(controller.collectTopPendingSplitScatter(2))
re.Equal(float64(1), promtestutil.ToFloat64(splitScatterDispatchRegionMissingCounter)-missingBefore)
}

func TestDispatchSplitScatterBacksOffWhenNoCandidates(t *testing.T) {
re := require.New(t)
controller, tc, oc, cleanup := newTestSplitScatterController(t)
defer cleanup()

controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101})

controller.dispatchSplitScatterRegions()

re.True(splitScatterNextDispatchAt(t, controller).After(time.Now()))
putSplitScatterRegion(tc, 101, "m", "", splitScatterReportedCPUUsage)
advanceSplitScatterSourceVersion(t, tc)

controller.dispatchSplitScatterRegions()

re.Empty(oc.GetOperators())

retrySplitScatterPendingAt(t, controller, 101, time.Now().Add(-time.Second))
setSplitScatterNextDispatchAt(t, controller, time.Now().Add(-time.Second))
controller.dispatchSplitScatterRegions()

re.NotNil(oc.GetOperator(101))
}

func TestDispatchSplitScatterRespectsScheduleDeny(t *testing.T) {
re := require.New(t)
controller, tc, oc, cleanup := newTestSplitScatterController(t)
Expand Down Expand Up @@ -604,6 +688,7 @@ func newTestSplitScatterController(t *testing.T) (*Controller, *mockcluster.Clus
controller := NewController(ctx, tc, tc.GetCheckerConfig(), oc)

cleanup := func() {
controller.splitScatter.clearPendingSplitScatter()
stream.Close()
cancel()
}
Expand Down Expand Up @@ -752,6 +837,30 @@ func expireSplitScatterPendingAt(t *testing.T, controller *Controller, regionID
controller.splitScatter.pending[regionID] = pending
}

func retrySplitScatterPendingAt(t *testing.T, controller *Controller, regionID uint64, retryAt time.Time) {
t.Helper()
controller.splitScatter.pendingMu.Lock()
defer controller.splitScatter.pendingMu.Unlock()
pending, ok := controller.splitScatter.pending[regionID]
require.True(t, ok)
pending.retryAt = retryAt
controller.splitScatter.pending[regionID] = pending
}

func setSplitScatterNextDispatchAt(t *testing.T, controller *Controller, nextDispatchAt time.Time) {
t.Helper()
controller.splitScatter.pendingMu.Lock()
defer controller.splitScatter.pendingMu.Unlock()
controller.splitScatter.nextDispatchAt = nextDispatchAt
}

func splitScatterNextDispatchAt(t *testing.T, controller *Controller) time.Time {
t.Helper()
controller.splitScatter.pendingMu.RLock()
defer controller.splitScatter.pendingMu.RUnlock()
return controller.splitScatter.nextDispatchAt
}

func requireInternalScatterOpsUseGroups(t *testing.T, oc *operator.Controller, scatterGroup, batchGroup string) {
t.Helper()
re := require.New(t)
Expand Down
Loading