mcs/scheduling: clean primary resources on exit#10645
Conversation
Signed-off-by: tongjian <1045931706@qq.com>
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR refactors the scheduling server's cluster lifecycle: context-driven ChangesCluster Lifecycle Cleanup
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Tip 💬 Introducing Slack Agent: The best way for teams to turn conversations into code.Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.
Built for teams:
One agent for your entire SDLC. Right inside Slack. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
/check-issue-triage-complete |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/mcs/scheduling/server/server.go`:
- Around line 525-539: NewCluster allocates a Cluster whose NewRegionLabeler
starts a background goroutine (doGC) that listens on the cluster context; when
rule.NewWatcher or affinity.NewWatcher fails the code closes
hbStreams/configWatcher/metaWatcher but never cancels the cluster context so
goroutines leak. Fix by ensuring cluster's context is cancelled on all error
paths: after NewCluster returns, register a short-lived defer or explicit
cleanup to call the cluster cancellation/unconditional shutdown routine (either
call a new public Cluster method that unconditionally cancels the context or
directly invoke the cluster's cancel function) before returning on errors from
rule.NewWatcher and affinity.NewWatcher; update references around NewCluster,
rule.NewWatcher, affinity.NewWatcher, StartBackgroundJobs, StopBackgroundJobs,
and any cluster.cancel/StopBackgroundJobs logic so cleanup runs even if
StartBackgroundJobs was never called.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8eda0be5-dbf6-4c9e-bc86-bd595fded54a
📒 Files selected for processing (3)
pkg/mcs/scheduling/server/rule/watcher.gopkg/mcs/scheduling/server/server.gopkg/mcs/scheduling/server/server_test.go
Signed-off-by: tongjian <1045931706@qq.com>
| s.configWatcher.SetSchedulersController(cluster.GetCoordinator().GetSchedulersController()) | ||
| // Start the rule watcher after the cluster is created. | ||
| s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.storage, | ||
| defer func() { |
There was a problem hiding this comment.
StopBackgroundJobs returns immediately when StartBackgroundJobs has not run, so this rollback does not cancel the cluster context on partial startup failure. NewCluster already starts context-bound goroutines such as the labeler GC / affinity check loop; please use a cleanup path that always cancels the cluster context.
There was a problem hiding this comment.
This still does not fully address @rleungx's comment, and the latest defer also introduces a successful-path cleanup issue.
StopBackgroundJobs() still returns immediately when running=false, so if NewCluster() succeeds but a later watcher creation fails, the cluster context is still not cancelled.
Also, the successful path only sets cluster = nil, while hbStreams / watchers remain non-nil. The defer will close the runtime resources that were just transferred to Cluster before startCluster() returns successfully.
Could we either disarm those local variables after ownership is transferred, or centralize cleanup in a method that always cancels the cluster context?
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/mcs/scheduling/server/apis/v1/api_test.go`:
- Around line 29-42: The test
TestGetAllStoresReturnsNotBootstrappedWhenBasicClusterMissing is failing because
getBasicCluster currently returns errs.ErrNotLeader.Error() instead of an error
that produces "not bootstrapped" in getAllStores' response; update the
getBasicCluster function so it returns the appropriate not-bootstrapped error
(or wraps/returns the actual error value that contains "not bootstrapped")
instead of returning errs.ErrNotLeader.Error(), ensuring getAllStores sees and
propagates the not-bootstrapped condition so the test assertion on resp.Body
containing "not bootstrapped" passes.
In `@pkg/mcs/scheduling/server/apis/v1/api.go`:
- Around line 1657-1664: The getBasicCluster helper currently returns
errs.ErrNotLeader when svr.GetBasicCluster() is nil; change it to return
errs.ErrNotBootstrapped instead so handlers relying on getBasicCluster (function
getBasicCluster and call site using svr.GetBasicCluster()) produce the correct
"not bootstrapped" response expected by tests and consistent with other
handlers.
In `@pkg/mcs/scheduling/server/grpc_service.go`:
- Around line 246-248: The current nil-check branch in the StoreHeartbeat
handler returns a StoreHeartbeatResponse with notLeaderHeader() which sets
ErrorType_INVALID_VALUE; instead, change the returned header to a
transient/unavailable error so clients will retry: update the nil-branch that
checks s.metaWatcher (in the StoreHeartbeat method referencing s.metaWatcher and
notLeaderHeader()) to return a header using NOT_BOOTSTRAPPED or a new "leader
unavailable" error header (create a helper if needed, e.g.,
notBootstrappedHeader() or notLeaderAvailableHeader()) rather than
ErrorType_INVALID_VALUE so the semantics match initialization/stepdown transient
state.
- Around line 475-480: Change the error type returned by notLeaderHeader() from
schedulingpb.ErrorType_INVALID_VALUE to schedulingpb.ErrorType_UNKNOWN so the
"not leader" operational/state error is categorized correctly; locate the
notLeaderHeader function and update the schedulingpb.Error construction to set
Type to schedulingpb.ErrorType_UNKNOWN while keeping the Message "not leader"
and returning via errorHeader(&schedulingpb.Error{...}).
In `@pkg/mcs/scheduling/server/server_test.go`:
- Around line 88-99: The test
TestStoreHeartbeatReturnsNotBootstrappedWhenMetaWatcherMissing expects
schedulingpb.ErrorType_NOT_BOOTSTRAPPED but the current implementation in
grpc_service.go uses notLeaderHeader() which returns
schedulingpb.ErrorType_INVALID_VALUE; update the test to assert
schedulingpb.ErrorType_INVALID_VALUE for the response header error (or rename
the test to reflect the "not leader/invalid value" behavior) and ensure the
assertion checks resp.GetHeader().GetError().GetType() equals
schedulingpb.ErrorType_INVALID_VALUE after calling Service.StoreHeartbeat.
- Around line 67-86: The test
TestRegionHeartbeatReturnsNotBootstrappedWhenHeartbeatStreamsMissing asserts
NOT_BOOTSTRAPPED but the Service.RegionHeartbeat implementation returns
notLeaderHeader() which uses schedulingpb.ErrorType_INVALID_VALUE; update the
test to expect schedulingpb.ErrorType_INVALID_VALUE (change the re.Equal
assertion) and optionally rename the test to reflect the actual behavior (e.g.,
TestRegionHeartbeatReturnsInvalidValueWhenHeartbeatStreamsMissing) so the test
name matches the returned error; reference Service.RegionHeartbeat and
notLeaderHeader() when making the change.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e13d367b-25bc-4c60-ac16-5a6deb1d1876
📥 Commits
Reviewing files that changed from the base of the PR and between 23174e5 and e24a808d7089758d8e97f90f046efa6eb99e8a2a.
📒 Files selected for processing (5)
pkg/mcs/scheduling/server/apis/v1/api.gopkg/mcs/scheduling/server/apis/v1/api_test.gopkg/mcs/scheduling/server/grpc_service.gopkg/mcs/scheduling/server/server.gopkg/mcs/scheduling/server/server_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- pkg/mcs/scheduling/server/server.go
Signed-off-by: tongjian <1045931706@qq.com>
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/mcs/scheduling/server/cluster.go (1)
658-670:⚠️ Potential issue | 🔴 Critical | ⚡ Quick winRace condition: use CompareAndSwap for atomic test-and-set.
The current Load-then-Store pattern has a TOCTOU race. Two concurrent calls could both pass the
running.Load()check on line 659, then both execute the stop sequence, leading to duplicatecoordinator.Stop(),cancel(), andwg.Wait()calls with undefined behavior.🔒 Proposed fix using CompareAndSwap
func (c *Cluster) StopBackgroundJobs() bool { - if !c.running.Load() { + if !c.running.CompareAndSwap(true, false) { return false } - c.running.Store(false) c.coordinator.Stop() c.heartbeatRunner.Stop() c.miscRunner.Stop()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/mcs/scheduling/server/cluster.go` around lines 658 - 670, Replace the non-atomic Load-then-Store in Cluster.StopBackgroundJobs with an atomic CompareAndSwap so only the goroutine that flips the running flag from true to false executes the shutdown sequence; specifically, attempt c.running.CompareAndSwap(true, false) at the start of StopBackgroundJobs and return false unless the CAS succeeds, and only then call c.coordinator.Stop(), c.heartbeatRunner.Stop(), c.miscRunner.Stop(), c.logRunner.Stop(), c.cancel(), and c.wg.Wait().
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/mcs/scheduling/server/apis/v1/api_test.go`:
- Around line 35-48: Rename the test function
TestStoreHeartbeatReturnsNotLeaderWhenMetaWatcherMissing to a name that reflects
it's testing the HTTP handler for listing stores (e.g.,
TestGetAllStoresReturnsNotBootstrapped) and update any references; the body
exercises getAllStores using gin test context and sets
multiservicesapi.ServiceContextKey with &scheserver.Server{}, so ensure the new
test name mentions getAllStores (or "GetAllStores") to avoid confusion with the
gRPC StoreHeartbeat method.
---
Outside diff comments:
In `@pkg/mcs/scheduling/server/cluster.go`:
- Around line 658-670: Replace the non-atomic Load-then-Store in
Cluster.StopBackgroundJobs with an atomic CompareAndSwap so only the goroutine
that flips the running flag from true to false executes the shutdown sequence;
specifically, attempt c.running.CompareAndSwap(true, false) at the start of
StopBackgroundJobs and return false unless the CAS succeeds, and only then call
c.coordinator.Stop(), c.heartbeatRunner.Stop(), c.miscRunner.Stop(),
c.logRunner.Stop(), c.cancel(), and c.wg.Wait().
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: dc3fd4ed-4660-4c49-a883-4f135390b24c
📥 Commits
Reviewing files that changed from the base of the PR and between e7bea36 and faa8c7108303db0a0f271fa4d3f026e2823a8329.
📒 Files selected for processing (7)
pkg/mcs/scheduling/server/apis/v1/api.gopkg/mcs/scheduling/server/apis/v1/api_test.gopkg/mcs/scheduling/server/cluster.gopkg/mcs/scheduling/server/grpc_service.gopkg/mcs/scheduling/server/server.gopkg/mcs/scheduling/server/server_test.gopkg/schedule/affinity/manager.go
💤 Files with no reviewable changes (1)
- pkg/schedule/affinity/manager.go
🚧 Files skipped from review as they are similar to previous changes (2)
- pkg/mcs/scheduling/server/apis/v1/api.go
- pkg/mcs/scheduling/server/server.go
faa8c71 to
a1468c7
Compare
| return nil | ||
| } | ||
|
|
||
| func (s *Server) stopCluster() { | ||
| cluster := s.GetCluster() | ||
| if cluster != nil { | ||
| s.cluster.Store((*Cluster)(nil)) | ||
| cluster.StopBackgroundJobs() |
There was a problem hiding this comment.
This changes the shutdown visibility order in a risky way. The old cluster remains published via s.GetCluster() while StopBackgroundJobs() is stopping the coordinator/runners. During this window, RPC/API handlers can still enter the old cluster and operate on resources that are already stopping. The previous code unpublished the cluster before stopping watchers. Could we first atomically unpublish the serving resources, then stop the captured old cluster/resources?
There was a problem hiding this comment.
Yes, remove the basicCluster from the service struct; all callers should get the basic cluster from the Cluster field.
There was a problem hiding this comment.
Thanks for moving hbStreams and watchers into Cluster; that fixes the separately mutable Server field issue.
I think the shutdown visibility issue in this thread is still valid, though. stopCluster() still calls cluster.StopBackgroundJobs() before cleanupClusterResources(cluster), and cleanupClusterResources is where s.cluster.Store(nil) happens. So while StopBackgroundJobs() is stopping coordinator/runners and canceling the cluster context, the old cluster is still visible through GetCluster() and handlers can still enter it.
Could we unpublish the cluster first, then stop and clean the captured old cluster? For example: capture old := s.GetCluster(), store nil into s.cluster, then call old.StopBackgroundJobs() and old.cleanupRuntimeResources().
There was a problem hiding this comment.
I added a regression test to verify this window.
func TestStopClusterUnpublishesClusterBeforeStoppingBackgroundJobs(t *testing.T) {
re := require.New(t)
s, cluster := newServerWithClusterForStopTest(t)
cluster.heartbeatRunner = syncRunner
cluster.miscRunner = syncRunner
cluster.logRunner = syncRunner
cluster.running.Store(true)
// Keep StopBackgroundJobs blocked so the test can observe the shutdown window.
cluster.wg.Add(1)
done := make(chan struct{})
released := false
defer func() {
if !released {
cluster.wg.Done()
}
select {
case <-done:
case <-time.After(5 * time.Second):
t.Log("stopCluster did not return after releasing the test gate")
}
}()
go func() {
s.stopCluster()
close(done)
}()
re.Eventually(func() bool {
return !cluster.IsBackgroundJobsRunning()
}, 5*time.Second, 10*time.Millisecond)
select {
case <-done:
t.Fatal("stopCluster returned before the test observed the shutdown window")
default:
}
if got := s.GetCluster(); got != nil {
t.Errorf("expected stopCluster to make cluster invisible before waiting for background jobs to stop, got %p", got)
}
cluster.wg.Done()
released = true
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("stopCluster did not return")
}
}
--- FAIL: TestStopClusterUnpublishesClusterBeforeStoppingBackgroundJobs
server_test.go:100: expected stopCluster to make cluster invisible before waiting for background jobs to stop, got 0xc0002dedc0
There was a problem hiding this comment.
good catch, fixed
|
|
||
| s := &Server{} | ||
| s.cluster.Store(&Cluster{BasicCluster: basicCluster}) | ||
| s.hbStreams = nil // cleanupClusterResources has nilled hbStreams before clearing cluster. |
There was a problem hiding this comment.
This test state does not match the current cleanup order. cleanupClusterResources() clears cluster before niling hbStreams, so cluster != nil && hbStreams == nil is not the real shutdown window. The real risky window is in stopCluster(): cluster is still published while StopBackgroundJobs() is running. Could we add a test that exercises the actual stopCluster() order, preferably with a concurrent heartbeat/API read?
There was a problem hiding this comment.
This thread is outdated, but I don't think the underlying test concern is fully resolved yet. The current test still manually constructs cluster != nil && hbStreams == nil instead of exercising the actual stopCluster() order. The real window is still that stopCluster() keeps the old cluster published while StopBackgroundJobs() is running.
Also, after removing Server.basicCluster, this test file currently does not compile because TestCleanupClusterResources still references basicCluster in the Server literal and assertion.
|
/retest |
Signed-off-by: tongjian <1045931706@qq.com>
a1468c7 to
b6baff6
Compare
Signed-off-by: tongjian <1045931706@qq.com>
02b93c7 to
6fff238
Compare
Signed-off-by: tongjian <1045931706@qq.com>
Signed-off-by: tongjian <1045931706@qq.com>
|
@bufferflies: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
| // StopBackgroundJobs stops background jobs. | ||
| // StopBackgroundJobs stops background jobs, these jobs is created by NewCluster. | ||
| func (c *Cluster) StopBackgroundJobs() { | ||
| if !c.running.Load() { |
There was a problem hiding this comment.
On startup rollback, NewCluster may have already started goroutines while StartBackgroundJobs was never called, so running is false and this returns without canceling c.ctx. Please add a cleanup path that cancels the cluster context even before background jobs start.
| cluster.SetRuntimeResources(metaWatcher, configWatcher, ruleWatcher, affinityWatcher) | ||
| s.cluster.Store(cluster) | ||
| cluster.StartBackgroundJobs() | ||
| cluster = nil // defer cleanup no longer needed |
There was a problem hiding this comment.
This only skips cluster.StopBackgroundJobs; the defer will still close hbStreams and all watchers after a successful start because those locals remain non-nil. Please run the defer only on error, or nil out the resources after ownership is transferred to cluster.
| ) | ||
| metaWatcher, configWatcher, err = s.startMetaConfWatcher(ctx, basicCluster, storage) | ||
| if err != nil { | ||
| configWatcher.Close() |
There was a problem hiding this comment.
startMetaConfWatcher can return nil watchers with an error, so this can panic and hide the real startup error. Please remove these closes here or guard them; the helper already cleans up partial watcher state.
What problem does this PR solve?
Issue Number: Close #10644
What is changed and how does it work?
Check List
Tests
Release note
Summary by CodeRabbit
Bug Fixes
Improvements
Tests