Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 73 additions & 5 deletions pkg/schedule/checker/split_scatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ type splitScatterController struct {
// pending maps a pending region ID to its latest split-scatter batch item.
// The item keeps its batch group so stale snapshots cannot mutate a newer
// pending entry for the same region.
pending map[uint64]splitScatterPendingItem
pending map[uint64]splitScatterPendingItem
nextDispatchAt time.Time
}

func newSplitScatterController(
Expand All @@ -72,6 +73,7 @@ func newSplitScatterController(
opController *operator.Controller,
addPendingProcessedRegions func(needCheckLen bool, ids ...uint64),
) *splitScatterController {
splitScatterPendingGauge.Set(0)
return &splitScatterController{
cluster: cluster,
opController: opController,
Expand Down Expand Up @@ -111,17 +113,20 @@ func (c *splitScatterController) collectTopPendingSplitScatter(limit int) []spli
c.pendingMu.RUnlock()

candidates := make([]splitScatterPendingItem, 0, len(pendingSnapshot))
missingSnapshot := make([]splitScatterPendingItem, 0)
for _, pending := range pendingSnapshot {
if !pending.retryAt.IsZero() && now.Before(pending.retryAt) {
continue
}
regionID := pending.regionID
region := c.cluster.GetRegion(regionID)
if region == nil {
continue
}
if !pending.retryAt.IsZero() && now.Before(pending.retryAt) {
missingSnapshot = append(missingSnapshot, pending)
continue
}
sourceRegion := c.cluster.GetRegion(pending.sourceRegionID)
if sourceRegion == nil {
missingSnapshot = append(missingSnapshot, pending)
continue
}
sourceVersion := uint64(0)
Expand Down Expand Up @@ -162,6 +167,7 @@ func (c *splitScatterController) collectTopPendingSplitScatter(limit int) []spli
candidates = selected
c.pendingMu.Unlock()
}
c.delayMissingPendingSplitScatter(missingSnapshot, now)

if len(expiredSnapshot) > 0 {
attemptedExpiredCount := 0
Expand All @@ -188,6 +194,30 @@ func (c *splitScatterController) collectTopPendingSplitScatter(limit int) []spli
return candidates
}

func (c *splitScatterController) delayMissingPendingSplitScatter(missing []splitScatterPendingItem, now time.Time) {
if len(missing) == 0 {
return
}
missingCount := 0
c.pendingMu.Lock()
for _, expected := range missing {
pending, ok := c.pending[expected.regionID]
if !ok || pending.group != expected.group || !pending.expireAt.Equal(expected.expireAt) {
continue
}
if !pending.retryAt.IsZero() && now.Before(pending.retryAt) {
continue
}
pending.retryAt = now.Add(splitScatterRetryBackoff)
c.pending[expected.regionID] = pending
missingCount++
}
c.pendingMu.Unlock()
if missingCount > 0 {
splitScatterDispatchRegionMissingCounter.Add(float64(missingCount))
}
}

func (c *splitScatterController) delayPendingSplitScatter(expected splitScatterPendingItem) {
c.pendingMu.Lock()
defer c.pendingMu.Unlock()
Expand Down Expand Up @@ -250,6 +280,29 @@ func (c *splitScatterController) cleanupExpiredPendingSplitScatter() int {
return pendingCount
}

func (c *splitScatterController) clearPendingSplitScatter() {
c.pendingMu.Lock()
defer c.pendingMu.Unlock()
pendingCount := len(c.pending)
if pendingCount > 0 {
c.pending = make(map[uint64]splitScatterPendingItem)
c.updatePendingGaugeLocked()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will the metrics reset zero if pending count is zero,

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 64529c3. clearPendingSplitScatter now always refreshes splitScatterPendingGauge after clearing pending, so the metric is reset to 0 even when the pending count is already 0. PatrolRegions also calls the same cleanup path when it exits.

}
c.nextDispatchAt = time.Time{}
}

func (c *splitScatterController) skipDispatchUntil(now time.Time) bool {
c.pendingMu.RLock()
defer c.pendingMu.RUnlock()
return !c.nextDispatchAt.IsZero() && now.Before(c.nextDispatchAt)
}

func (c *splitScatterController) delayNextDispatch(now time.Time) {
c.pendingMu.Lock()
defer c.pendingMu.Unlock()
c.nextDispatchAt = now.Add(splitScatterRetryBackoff)
}

func makeSplitScatterGroup(sourceRegionID, firstNewRegionID uint64) string {
return fmt.Sprintf("split-scatter-%d-%d", sourceRegionID, firstNewRegionID)
}
Expand All @@ -263,6 +316,9 @@ func (c *splitScatterController) recordSplitScatterBatch(sourceRegionID, sourceW
if len(newRegionIDs) == 0 {
return
}
if c.cluster.GetCheckerConfig().GetSplitScatterScheduleLimit() == 0 {
return
}
group := makeSplitScatterGroup(sourceRegionID, newRegionIDs[0])
expireAt := time.Now().Add(splitScatterPendingTTL)
if sourceWaitVersion == 0 {
Expand Down Expand Up @@ -317,27 +373,39 @@ func (c *splitScatterController) recordSplitScatterBatch(sourceRegionID, sourceW
expireAt: expireAt,
}
c.updatePendingGaugeLocked()
c.nextDispatchAt = time.Time{}
}

func (c *splitScatterController) dispatchSplitScatterRegions() {
now := time.Now()
if c.cleanupExpiredPendingSplitScatter() == 0 {
return
}
limit := c.cluster.GetCheckerConfig().GetSplitScatterScheduleLimit()
if limit == 0 {
splitScatterDispatchDisabledCounter.Inc()
c.clearPendingSplitScatter()
return
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}
if c.skipDispatchUntil(now) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we needs to add new metrics recording why the split controller doesn't work?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the existing checker event metrics already cover the main reasons why split-scatter dispatch does not make progress. They are reported through pd_checker_event_count{type="split_scatter_checker", name=...}, including dispatch-disabled, dispatch-schedule-limit, dispatch-region-missing, dispatch-schedule-disabled, dispatch-not-fully-replicated, dispatch-scatter-failed, dispatch-store-limit, and dispatch-add-operator-failed. The skipDispatchUntil path is only the retry backoff after one of those reasons has already been recorded, so I do not add another metric for it here.

return
}
running := c.opController.OperatorCount(operator.OpSplitScatter)
if running >= limit {
splitScatterDispatchScheduleLimitCounter.Inc()
operator.IncOperatorLimitCounter(types.SplitScatterChecker, operator.OpSplitScatter)
c.delayNextDispatch(now)
return
}
dispatchLimit := int(limit - running)
// Dispatch sequentially so operators added for earlier pending items in this pass
// are visible to later ScatterInternal calls through the running-operator delta.
for _, pending := range c.collectTopPendingSplitScatter(dispatchLimit) {
pendingItems := c.collectTopPendingSplitScatter(dispatchLimit)
if len(pendingItems) == 0 {
c.delayNextDispatch(now)
return
}
for _, pending := range pendingItems {
region := c.cluster.GetRegion(pending.regionID)
if region == nil {
splitScatterDispatchRegionMissingCounter.Inc()
Expand Down
120 changes: 114 additions & 6 deletions pkg/schedule/checker/split_scatter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,17 @@ func (c *Controller) dispatchSplitScatterRegions() {
c.splitScatter.dispatchSplitScatterRegions()
}

func TestNewSplitScatterControllerResetsPendingGauge(t *testing.T) {
re := require.New(t)
splitScatterPendingGauge.Set(7)

controller, _, _, cleanup := newTestSplitScatterController(t)
defer cleanup()

re.Equal(0, splitScatterPendingCount(controller))
re.Equal(float64(0), promtestutil.ToFloat64(splitScatterPendingGauge))
}

func TestRecordSplitScatterBatchCollectsPendingRegions(t *testing.T) {
re := require.New(t)
controller, tc, _, cleanup := newTestSplitScatterController(t)
Expand Down Expand Up @@ -121,8 +132,10 @@ func TestDispatchSplitScatterKeepsPendingUntilSplitHeartbeat(t *testing.T) {

putSplitScatterRegion(tc, 101, "m", "", splitScatterReportedCPUUsage)

retrySplitScatterPendingAt(t, controller, 101, time.Now().Add(-time.Second))
re.Empty(controller.collectTopPendingSplitScatter(2))
advanceSplitScatterSourceVersion(t, tc)
setSplitScatterNextDispatchAt(t, controller, time.Now().Add(-time.Second))
re.ElementsMatch([]uint64{100, 101}, pendingRegionIDs(controller.collectTopPendingSplitScatter(2)))

controller.dispatchSplitScatterRegions()
Expand Down Expand Up @@ -151,6 +164,7 @@ func TestDispatchSplitScatterUsesRequestWaitVersionWhenCacheLags(t *testing.T) {
re.Equal(2, splitScatterPendingCount(controller))

advanceSplitScatterRegionVersion(t, tc, 100)
setSplitScatterNextDispatchAt(t, controller, time.Now().Add(-time.Second))
controller.dispatchSplitScatterRegions()

re.NotNil(oc.GetOperator(101))
Expand All @@ -161,17 +175,11 @@ func TestDispatchSplitScatterRespectsScheduleLimit(t *testing.T) {
controller, tc, oc, cleanup := newTestSplitScatterController(t)
defer cleanup()

tc.SetSplitScatterScheduleLimit(0)
controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101, 102})
putSplitScatterRegion(tc, 101, "m", "t", splitScatterReportedCPUUsage)
putSplitScatterRegion(tc, 102, "t", "", splitScatterReportedCPUUsage)
advanceSplitScatterSourceVersion(t, tc)

controller.dispatchSplitScatterRegions()

re.Empty(oc.GetOperators())
re.Equal(3, splitScatterPendingCount(controller))

tc.SetSplitScatterScheduleLimit(1)
controller.dispatchSplitScatterRegions()

Expand All @@ -183,6 +191,40 @@ func TestDispatchSplitScatterRespectsScheduleLimit(t *testing.T) {
re.Len(oc.GetOperators(), 1)
}

func TestRecordSplitScatterBatchSkipsWhenDisabled(t *testing.T) {
re := require.New(t)
controller, tc, _, cleanup := newTestSplitScatterController(t)
defer cleanup()

tc.SetSplitScatterScheduleLimit(0)

droppedBefore := promtestutil.ToFloat64(splitScatterPendingDroppedCounter)
controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101, 102})

re.Equal(0, splitScatterPendingCount(controller))
re.Equal(float64(0), promtestutil.ToFloat64(splitScatterPendingGauge))
re.Equal(float64(0), promtestutil.ToFloat64(splitScatterPendingDroppedCounter)-droppedBefore)
}

func TestDispatchSplitScatterClearsPendingWhenDisabled(t *testing.T) {
re := require.New(t)
controller, tc, oc, cleanup := newTestSplitScatterController(t)
defer cleanup()

controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101, 102})
re.Equal(3, splitScatterPendingCount(controller))

tc.SetSplitScatterScheduleLimit(0)
setSplitScatterNextDispatchAt(t, controller, time.Now().Add(splitScatterRetryBackoff))
disabledBefore := promtestutil.ToFloat64(splitScatterDispatchDisabledCounter)
controller.dispatchSplitScatterRegions()

re.Empty(oc.GetOperators())
re.Equal(0, splitScatterPendingCount(controller))
re.Equal(float64(0), promtestutil.ToFloat64(splitScatterPendingGauge))
re.Equal(float64(1), promtestutil.ToFloat64(splitScatterDispatchDisabledCounter)-disabledBefore)
}

func TestDispatchSplitScatterCleansExpiredPendingBeforeEarlyReturn(t *testing.T) {
testCases := []struct {
name string
Expand Down Expand Up @@ -236,6 +278,48 @@ func TestDispatchSplitScatterCleansExpiredPendingBeforeEarlyReturn(t *testing.T)
}
}

func TestCollectTopPendingDelaysMissingRegions(t *testing.T) {
re := require.New(t)
controller, _, _, cleanup := newTestSplitScatterController(t)
defer cleanup()

controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101})

missingBefore := promtestutil.ToFloat64(splitScatterDispatchRegionMissingCounter)
re.Empty(controller.collectTopPendingSplitScatter(2))

re.Equal(float64(1), promtestutil.ToFloat64(splitScatterDispatchRegionMissingCounter)-missingBefore)
pending := splitScatterPending(t, controller, 101)
re.True(pending.retryAt.After(time.Now()))

re.Empty(controller.collectTopPendingSplitScatter(2))
re.Equal(float64(1), promtestutil.ToFloat64(splitScatterDispatchRegionMissingCounter)-missingBefore)
}

func TestDispatchSplitScatterBacksOffWhenNoCandidates(t *testing.T) {
re := require.New(t)
controller, tc, oc, cleanup := newTestSplitScatterController(t)
defer cleanup()

controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101})

controller.dispatchSplitScatterRegions()

re.True(splitScatterNextDispatchAt(t, controller).After(time.Now()))
putSplitScatterRegion(tc, 101, "m", "", splitScatterReportedCPUUsage)
advanceSplitScatterSourceVersion(t, tc)

controller.dispatchSplitScatterRegions()

re.Empty(oc.GetOperators())

retrySplitScatterPendingAt(t, controller, 101, time.Now().Add(-time.Second))
setSplitScatterNextDispatchAt(t, controller, time.Now().Add(-time.Second))
controller.dispatchSplitScatterRegions()

re.NotNil(oc.GetOperator(101))
}

func TestDispatchSplitScatterRespectsScheduleDeny(t *testing.T) {
re := require.New(t)
controller, tc, oc, cleanup := newTestSplitScatterController(t)
Expand Down Expand Up @@ -752,6 +836,30 @@ func expireSplitScatterPendingAt(t *testing.T, controller *Controller, regionID
controller.splitScatter.pending[regionID] = pending
}

func retrySplitScatterPendingAt(t *testing.T, controller *Controller, regionID uint64, retryAt time.Time) {
t.Helper()
controller.splitScatter.pendingMu.Lock()
defer controller.splitScatter.pendingMu.Unlock()
pending, ok := controller.splitScatter.pending[regionID]
require.True(t, ok)
pending.retryAt = retryAt
controller.splitScatter.pending[regionID] = pending
}

func setSplitScatterNextDispatchAt(t *testing.T, controller *Controller, nextDispatchAt time.Time) {
t.Helper()
controller.splitScatter.pendingMu.Lock()
defer controller.splitScatter.pendingMu.Unlock()
controller.splitScatter.nextDispatchAt = nextDispatchAt
}

func splitScatterNextDispatchAt(t *testing.T, controller *Controller) time.Time {
t.Helper()
controller.splitScatter.pendingMu.RLock()
defer controller.splitScatter.pendingMu.RUnlock()
return controller.splitScatter.nextDispatchAt
}

func requireInternalScatterOpsUseGroups(t *testing.T, oc *operator.Controller, scatterGroup, batchGroup string) {
t.Helper()
re := require.New(t)
Expand Down
Loading