diff --git a/pkg/schedule/checker/checker_controller.go b/pkg/schedule/checker/checker_controller.go index a589dd088bc..4124ab5d3b6 100644 --- a/pkg/schedule/checker/checker_controller.go +++ b/pkg/schedule/checker/checker_controller.go @@ -131,6 +131,7 @@ func NewController(ctx context.Context, cluster sche.CheckerCluster, conf config func (c *Controller) PatrolRegions() { c.patrolRegionContext.init(c.ctx) c.patrolRegionContext.startPatrolRegionWorkers(c) + defer c.splitScatter.clearPendingSplitScatter() defer c.patrolRegionContext.stop() ticker := time.NewTicker(c.interval) defer ticker.Stop() diff --git a/pkg/schedule/checker/split_scatter.go b/pkg/schedule/checker/split_scatter.go index d3044b67ddb..975e0a7662a 100644 --- a/pkg/schedule/checker/split_scatter.go +++ b/pkg/schedule/checker/split_scatter.go @@ -63,7 +63,8 @@ type splitScatterController struct { // pending maps a pending region ID to its latest split-scatter batch item. // The item keeps its batch group so stale snapshots cannot mutate a newer // pending entry for the same region. - pending map[uint64]splitScatterPendingItem + pending map[uint64]splitScatterPendingItem + nextDispatchAt time.Time } func newSplitScatterController( @@ -111,17 +112,20 @@ func (c *splitScatterController) collectTopPendingSplitScatter(limit int) []spli c.pendingMu.RUnlock() candidates := make([]splitScatterPendingItem, 0, len(pendingSnapshot)) + missingSnapshot := make([]splitScatterPendingItem, 0) for _, pending := range pendingSnapshot { + if !pending.retryAt.IsZero() && now.Before(pending.retryAt) { + continue + } regionID := pending.regionID region := c.cluster.GetRegion(regionID) if region == nil { - continue - } - if !pending.retryAt.IsZero() && now.Before(pending.retryAt) { + missingSnapshot = append(missingSnapshot, pending) continue } sourceRegion := c.cluster.GetRegion(pending.sourceRegionID) if sourceRegion == nil { + missingSnapshot = append(missingSnapshot, pending) continue } sourceVersion := uint64(0) @@ -162,6 +166,7 @@ func (c *splitScatterController) collectTopPendingSplitScatter(limit int) []spli candidates = selected c.pendingMu.Unlock() } + c.delayMissingPendingSplitScatter(missingSnapshot, now) if len(expiredSnapshot) > 0 { attemptedExpiredCount := 0 @@ -188,6 +193,30 @@ func (c *splitScatterController) collectTopPendingSplitScatter(limit int) []spli return candidates } +func (c *splitScatterController) delayMissingPendingSplitScatter(missing []splitScatterPendingItem, now time.Time) { + if len(missing) == 0 { + return + } + missingCount := 0 + c.pendingMu.Lock() + for _, expected := range missing { + pending, ok := c.pending[expected.regionID] + if !ok || pending.group != expected.group || !pending.expireAt.Equal(expected.expireAt) { + continue + } + if !pending.retryAt.IsZero() && now.Before(pending.retryAt) { + continue + } + pending.retryAt = now.Add(splitScatterRetryBackoff) + c.pending[expected.regionID] = pending + missingCount++ + } + c.pendingMu.Unlock() + if missingCount > 0 { + splitScatterDispatchRegionMissingCounter.Add(float64(missingCount)) + } +} + func (c *splitScatterController) delayPendingSplitScatter(expected splitScatterPendingItem) { c.pendingMu.Lock() defer c.pendingMu.Unlock() @@ -250,6 +279,26 @@ func (c *splitScatterController) cleanupExpiredPendingSplitScatter() int { return pendingCount } +func (c *splitScatterController) clearPendingSplitScatter() { + c.pendingMu.Lock() + defer c.pendingMu.Unlock() + c.pending = make(map[uint64]splitScatterPendingItem) + c.updatePendingGaugeLocked() + c.nextDispatchAt = time.Time{} +} + +func (c *splitScatterController) skipDispatchUntil(now time.Time) bool { + c.pendingMu.RLock() + defer c.pendingMu.RUnlock() + return !c.nextDispatchAt.IsZero() && now.Before(c.nextDispatchAt) +} + +func (c *splitScatterController) delayNextDispatch(now time.Time) { + c.pendingMu.Lock() + defer c.pendingMu.Unlock() + c.nextDispatchAt = now.Add(splitScatterRetryBackoff) +} + func makeSplitScatterGroup(sourceRegionID, firstNewRegionID uint64) string { return fmt.Sprintf("split-scatter-%d-%d", sourceRegionID, firstNewRegionID) } @@ -263,6 +312,9 @@ func (c *splitScatterController) recordSplitScatterBatch(sourceRegionID, sourceW if len(newRegionIDs) == 0 { return } + if c.cluster.GetCheckerConfig().GetSplitScatterScheduleLimit() == 0 { + return + } group := makeSplitScatterGroup(sourceRegionID, newRegionIDs[0]) expireAt := time.Now().Add(splitScatterPendingTTL) if sourceWaitVersion == 0 { @@ -317,27 +369,39 @@ func (c *splitScatterController) recordSplitScatterBatch(sourceRegionID, sourceW expireAt: expireAt, } c.updatePendingGaugeLocked() + c.nextDispatchAt = time.Time{} } func (c *splitScatterController) dispatchSplitScatterRegions() { + now := time.Now() if c.cleanupExpiredPendingSplitScatter() == 0 { return } limit := c.cluster.GetCheckerConfig().GetSplitScatterScheduleLimit() if limit == 0 { splitScatterDispatchDisabledCounter.Inc() + c.clearPendingSplitScatter() + return + } + if c.skipDispatchUntil(now) { return } running := c.opController.OperatorCount(operator.OpSplitScatter) if running >= limit { splitScatterDispatchScheduleLimitCounter.Inc() operator.IncOperatorLimitCounter(types.SplitScatterChecker, operator.OpSplitScatter) + c.delayNextDispatch(now) return } dispatchLimit := int(limit - running) // Dispatch sequentially so operators added for earlier pending items in this pass // are visible to later ScatterInternal calls through the running-operator delta. - for _, pending := range c.collectTopPendingSplitScatter(dispatchLimit) { + pendingItems := c.collectTopPendingSplitScatter(dispatchLimit) + if len(pendingItems) == 0 { + c.delayNextDispatch(now) + return + } + for _, pending := range pendingItems { region := c.cluster.GetRegion(pending.regionID) if region == nil { splitScatterDispatchRegionMissingCounter.Inc() diff --git a/pkg/schedule/checker/split_scatter_test.go b/pkg/schedule/checker/split_scatter_test.go index c2b74471620..cb48b7eb2d2 100644 --- a/pkg/schedule/checker/split_scatter_test.go +++ b/pkg/schedule/checker/split_scatter_test.go @@ -54,6 +54,17 @@ func (c *Controller) dispatchSplitScatterRegions() { c.splitScatter.dispatchSplitScatterRegions() } +func TestSplitScatterControllerCleanupResetsPendingGauge(t *testing.T) { + re := require.New(t) + splitScatterPendingGauge.Set(7) + + controller, _, _, cleanup := newTestSplitScatterController(t) + cleanup() + + re.Equal(0, splitScatterPendingCount(controller)) + re.Equal(float64(0), promtestutil.ToFloat64(splitScatterPendingGauge)) +} + func TestRecordSplitScatterBatchCollectsPendingRegions(t *testing.T) { re := require.New(t) controller, tc, _, cleanup := newTestSplitScatterController(t) @@ -121,8 +132,10 @@ func TestDispatchSplitScatterKeepsPendingUntilSplitHeartbeat(t *testing.T) { putSplitScatterRegion(tc, 101, "m", "", splitScatterReportedCPUUsage) + retrySplitScatterPendingAt(t, controller, 101, time.Now().Add(-time.Second)) re.Empty(controller.collectTopPendingSplitScatter(2)) advanceSplitScatterSourceVersion(t, tc) + setSplitScatterNextDispatchAt(t, controller, time.Now().Add(-time.Second)) re.ElementsMatch([]uint64{100, 101}, pendingRegionIDs(controller.collectTopPendingSplitScatter(2))) controller.dispatchSplitScatterRegions() @@ -151,6 +164,7 @@ func TestDispatchSplitScatterUsesRequestWaitVersionWhenCacheLags(t *testing.T) { re.Equal(2, splitScatterPendingCount(controller)) advanceSplitScatterRegionVersion(t, tc, 100) + setSplitScatterNextDispatchAt(t, controller, time.Now().Add(-time.Second)) controller.dispatchSplitScatterRegions() re.NotNil(oc.GetOperator(101)) @@ -161,17 +175,11 @@ func TestDispatchSplitScatterRespectsScheduleLimit(t *testing.T) { controller, tc, oc, cleanup := newTestSplitScatterController(t) defer cleanup() - tc.SetSplitScatterScheduleLimit(0) controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101, 102}) putSplitScatterRegion(tc, 101, "m", "t", splitScatterReportedCPUUsage) putSplitScatterRegion(tc, 102, "t", "", splitScatterReportedCPUUsage) advanceSplitScatterSourceVersion(t, tc) - controller.dispatchSplitScatterRegions() - - re.Empty(oc.GetOperators()) - re.Equal(3, splitScatterPendingCount(controller)) - tc.SetSplitScatterScheduleLimit(1) controller.dispatchSplitScatterRegions() @@ -183,6 +191,40 @@ func TestDispatchSplitScatterRespectsScheduleLimit(t *testing.T) { re.Len(oc.GetOperators(), 1) } +func TestRecordSplitScatterBatchSkipsWhenDisabled(t *testing.T) { + re := require.New(t) + controller, tc, _, cleanup := newTestSplitScatterController(t) + defer cleanup() + + tc.SetSplitScatterScheduleLimit(0) + + droppedBefore := promtestutil.ToFloat64(splitScatterPendingDroppedCounter) + controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101, 102}) + + re.Equal(0, splitScatterPendingCount(controller)) + re.Equal(float64(0), promtestutil.ToFloat64(splitScatterPendingGauge)) + re.Equal(float64(0), promtestutil.ToFloat64(splitScatterPendingDroppedCounter)-droppedBefore) +} + +func TestDispatchSplitScatterClearsPendingWhenDisabled(t *testing.T) { + re := require.New(t) + controller, tc, oc, cleanup := newTestSplitScatterController(t) + defer cleanup() + + controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101, 102}) + re.Equal(3, splitScatterPendingCount(controller)) + + tc.SetSplitScatterScheduleLimit(0) + setSplitScatterNextDispatchAt(t, controller, time.Now().Add(splitScatterRetryBackoff)) + disabledBefore := promtestutil.ToFloat64(splitScatterDispatchDisabledCounter) + controller.dispatchSplitScatterRegions() + + re.Empty(oc.GetOperators()) + re.Equal(0, splitScatterPendingCount(controller)) + re.Equal(float64(0), promtestutil.ToFloat64(splitScatterPendingGauge)) + re.Equal(float64(1), promtestutil.ToFloat64(splitScatterDispatchDisabledCounter)-disabledBefore) +} + func TestDispatchSplitScatterCleansExpiredPendingBeforeEarlyReturn(t *testing.T) { testCases := []struct { name string @@ -236,6 +278,48 @@ func TestDispatchSplitScatterCleansExpiredPendingBeforeEarlyReturn(t *testing.T) } } +func TestCollectTopPendingDelaysMissingRegions(t *testing.T) { + re := require.New(t) + controller, _, _, cleanup := newTestSplitScatterController(t) + defer cleanup() + + controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101}) + + missingBefore := promtestutil.ToFloat64(splitScatterDispatchRegionMissingCounter) + re.Empty(controller.collectTopPendingSplitScatter(2)) + + re.Equal(float64(1), promtestutil.ToFloat64(splitScatterDispatchRegionMissingCounter)-missingBefore) + pending := splitScatterPending(t, controller, 101) + re.True(pending.retryAt.After(time.Now())) + + re.Empty(controller.collectTopPendingSplitScatter(2)) + re.Equal(float64(1), promtestutil.ToFloat64(splitScatterDispatchRegionMissingCounter)-missingBefore) +} + +func TestDispatchSplitScatterBacksOffWhenNoCandidates(t *testing.T) { + re := require.New(t) + controller, tc, oc, cleanup := newTestSplitScatterController(t) + defer cleanup() + + controller.RecordSplitScatterBatch(100, splitScatterTestSourceWaitVersion, []uint64{101}) + + controller.dispatchSplitScatterRegions() + + re.True(splitScatterNextDispatchAt(t, controller).After(time.Now())) + putSplitScatterRegion(tc, 101, "m", "", splitScatterReportedCPUUsage) + advanceSplitScatterSourceVersion(t, tc) + + controller.dispatchSplitScatterRegions() + + re.Empty(oc.GetOperators()) + + retrySplitScatterPendingAt(t, controller, 101, time.Now().Add(-time.Second)) + setSplitScatterNextDispatchAt(t, controller, time.Now().Add(-time.Second)) + controller.dispatchSplitScatterRegions() + + re.NotNil(oc.GetOperator(101)) +} + func TestDispatchSplitScatterRespectsScheduleDeny(t *testing.T) { re := require.New(t) controller, tc, oc, cleanup := newTestSplitScatterController(t) @@ -604,6 +688,7 @@ func newTestSplitScatterController(t *testing.T) (*Controller, *mockcluster.Clus controller := NewController(ctx, tc, tc.GetCheckerConfig(), oc) cleanup := func() { + controller.splitScatter.clearPendingSplitScatter() stream.Close() cancel() } @@ -752,6 +837,30 @@ func expireSplitScatterPendingAt(t *testing.T, controller *Controller, regionID controller.splitScatter.pending[regionID] = pending } +func retrySplitScatterPendingAt(t *testing.T, controller *Controller, regionID uint64, retryAt time.Time) { + t.Helper() + controller.splitScatter.pendingMu.Lock() + defer controller.splitScatter.pendingMu.Unlock() + pending, ok := controller.splitScatter.pending[regionID] + require.True(t, ok) + pending.retryAt = retryAt + controller.splitScatter.pending[regionID] = pending +} + +func setSplitScatterNextDispatchAt(t *testing.T, controller *Controller, nextDispatchAt time.Time) { + t.Helper() + controller.splitScatter.pendingMu.Lock() + defer controller.splitScatter.pendingMu.Unlock() + controller.splitScatter.nextDispatchAt = nextDispatchAt +} + +func splitScatterNextDispatchAt(t *testing.T, controller *Controller) time.Time { + t.Helper() + controller.splitScatter.pendingMu.RLock() + defer controller.splitScatter.pendingMu.RUnlock() + return controller.splitScatter.nextDispatchAt +} + func requireInternalScatterOpsUseGroups(t *testing.T, oc *operator.Controller, scatterGroup, batchGroup string) { t.Helper() re := require.New(t)