Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,4 +281,7 @@ func (rw *Watcher) initializeRegionLabelWatcher() error {
func (rw *Watcher) Close() {
rw.cancel()
rw.wg.Wait()
if rw.checkerController != nil {
rw.checkerController.ClearSuspectKeyRanges()
}
Comment on lines +283 to +285
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleanup order can still leave stale suspect key ranges. Close cancels the watcher and immediately calls ClearSuspectKeyRanges, but an already-running watcher callback is not preempted by cancellation and may still run postEventsFn/AddSuspectKeyRange afterward, re-adding suspect ranges after they were cleared.

Could we move ClearSuspectKeyRanges after rw.wg.Wait(), so cleanup happens after all watcher callbacks have exited?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func TestCloseClearsSuspectKeyRangesAfterInFlightCallback(t *testing.T) {
        re := require.New(t)

        checkerCtx, checkerCancel := context.WithCancel(context.Background())
        defer checkerCancel()
        cluster := mockcluster.NewCluster(checkerCtx, mockconfig.NewTestOptions())
        checkerController := checker.NewController(checkerCtx, cluster, cluster.GetCheckerConfig(), nil)

        watcherCtx, watcherCancel := context.WithCancel(context.Background())
        rw := &Watcher{
                ctx:               watcherCtx,
                cancel:            watcherCancel,
                checkerController: checkerController,
        }

        callbackStarted := make(chan struct{})
        releaseCallback := make(chan struct{})
        rw.wg.Add(1)
        go func() {
                defer rw.wg.Done()
                <-watcherCtx.Done()
                close(callbackStarted)
                <-releaseCallback
                checkerController.AddSuspectKeyRange([]byte("a"), []byte("z"))
        }()

        closeDone := make(chan struct{})
        go func() {
                rw.Close()
                close(closeDone)
        }()

        select {
        case <-callbackStarted:
        case <-time.After(time.Second):
                re.FailNow("expected in-flight watcher callback to observe cancellation")
        }
        select {
        case <-closeDone:
                re.FailNow("Close returned before the in-flight callback finished")
        case <-time.After(50 * time.Millisecond):
        }

        close(releaseCallback)
        select {
        case <-closeDone:
        case <-time.After(time.Second):
                re.FailNow("Close did not return after the in-flight callback finished")
        }

        _, ok := checkerController.PopOneSuspectKeyRange()
        re.False(ok, "Close should not leave suspect key ranges added by in-flight callbacks")
  }

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about this

 func (rw *Watcher) Close() {
      rw.cancel()
      rw.wg.Wait()
      if rw.checkerController != nil {
          rw.checkerController.ClearSuspectKeyRanges()
      }
  }

}
82 changes: 61 additions & 21 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,70 +506,110 @@ func (s *Server) startServer() (err error) {
return nil
}

func (s *Server) startCluster(context.Context) error {
s.basicCluster = core.NewBasicCluster()
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
err := s.startMetaConfWatcher()
func (s *Server) startCluster(ctx context.Context) error {
basicCluster := core.NewBasicCluster()
storage := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
metaWatcher, configWatcher, err := s.startMetaConfWatcher(ctx, basicCluster, storage)
if err != nil {
return err
}
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), constant.SchedulingServiceName, s.basicCluster)
cluster, err := NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.checkMembershipCh, s.GetHTTPClient(), s.GetBackendEndpoints())
hbStreams := hbstream.NewHeartbeatStreams(ctx, constant.SchedulingServiceName, basicCluster)
cluster, err := NewCluster(ctx, s.persistConfig, storage, basicCluster, hbStreams, s.checkMembershipCh, s.GetHTTPClient(), s.GetBackendEndpoints())
if err != nil {
hbStreams.Close()
configWatcher.Close()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startMetaConfWatcher can return nil watchers with an error, so this can panic and hide the real startup error. Please remove these closes here or guard them; the helper already cleans up partial watcher state.

metaWatcher.Close()
return err
}
s.cluster.Store(cluster)
// Inject the cluster components into the config watcher after the scheduler controller is created.
s.configWatcher.SetSchedulersController(cluster.GetCoordinator().GetSchedulersController())
// Start the rule watcher after the cluster is created.
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.storage,
defer func() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StopBackgroundJobs returns immediately when StartBackgroundJobs has not run, so this rollback does not cancel the cluster context on partial startup failure. NewCluster already starts context-bound goroutines such as the labeler GC / affinity check loop; please use a cleanup path that always cancels the cluster context.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still does not fully address @rleungx's comment, and the latest defer also introduces a successful-path cleanup issue.

StopBackgroundJobs() still returns immediately when running=false, so if NewCluster() succeeds but a later watcher creation fails, the cluster context is still not cancelled.

Also, the successful path only sets cluster = nil, while hbStreams / watchers remain non-nil. The defer will close the runtime resources that were just transferred to Cluster before startCluster() returns successfully.

Could we either disarm those local variables after ownership is transferred, or centralize cleanup in a method that always cancels the cluster context?

if cluster != nil {
cluster.StopBackgroundJobs()
}
}()
configWatcher.SetSchedulersController(cluster.GetCoordinator().GetSchedulersController())
ruleWatcher, err := rule.NewWatcher(ctx, s.GetClient(), storage,
cluster.GetCoordinator().GetCheckerController(), cluster.GetRuleManager(), cluster.GetRegionLabeler())
if err != nil {
hbStreams.Close()
configWatcher.Close()
metaWatcher.Close()
return err
}
// Start the affinity watcher after the cluster is created.
s.affinityWatcher, err = affinity.NewWatcher(s.Context(), s.GetClient(), cluster.GetAffinityManager())
affinityWatcher, err := affinity.NewWatcher(ctx, s.GetClient(), cluster.GetAffinityManager())
if err != nil {
ruleWatcher.Close()
hbStreams.Close()
configWatcher.Close()
metaWatcher.Close()
return err
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}

s.basicCluster = basicCluster
Comment thread
lhy1024 marked this conversation as resolved.
Outdated
s.storage = storage
s.metaWatcher = metaWatcher
s.configWatcher = configWatcher
s.hbStreams = hbStreams
s.ruleWatcher = ruleWatcher
s.affinityWatcher = affinityWatcher
s.cluster.Store(cluster)
cluster.StartBackgroundJobs()
cluster = nil // defer cleanup no longer needed
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only skips cluster.StopBackgroundJobs; the defer will still close hbStreams and all watchers after a successful start because those locals remain non-nil. Please run the defer only on error, or nil out the resources after ownership is transferred to cluster.

return nil
}

func (s *Server) stopCluster() {
cluster := s.GetCluster()
if cluster != nil {
s.cluster.Store((*Cluster)(nil))
cluster.StopBackgroundJobs()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the shutdown visibility order in a risky way. The old cluster remains published via s.GetCluster() while StopBackgroundJobs() is stopping the coordinator/runners. During this window, RPC/API handlers can still enter the old cluster and operate on resources that are already stopping. The previous code unpublished the cluster before stopping watchers. Could we first atomically unpublish the serving resources, then stop the captured old cluster/resources?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, remove the basicCluster from the service struct; all callers should get the basic cluster from the Cluster field.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for moving hbStreams and watchers into Cluster; that fixes the separately mutable Server field issue.

I think the shutdown visibility issue in this thread is still valid, though. stopCluster() still calls cluster.StopBackgroundJobs() before cleanupClusterResources(cluster), and cleanupClusterResources is where s.cluster.Store(nil) happens. So while StopBackgroundJobs() is stopping coordinator/runners and canceling the cluster context, the old cluster is still visible through GetCluster() and handlers can still enter it.

Could we unpublish the cluster first, then stop and clean the captured old cluster? For example: capture old := s.GetCluster(), store nil into s.cluster, then call old.StopBackgroundJobs() and old.cleanupRuntimeResources().

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a regression test to verify this window.

  func TestStopClusterUnpublishesClusterBeforeStoppingBackgroundJobs(t *testing.T) {
        re := require.New(t)
        s, cluster := newServerWithClusterForStopTest(t)

        cluster.heartbeatRunner = syncRunner
        cluster.miscRunner = syncRunner
        cluster.logRunner = syncRunner
        cluster.running.Store(true)
        // Keep StopBackgroundJobs blocked so the test can observe the shutdown window.
        cluster.wg.Add(1)

        done := make(chan struct{})
        released := false
        defer func() {
                if !released {
                        cluster.wg.Done()
                }
                select {
                case <-done:
                case <-time.After(5 * time.Second):
                        t.Log("stopCluster did not return after releasing the test gate")
                }
        }()

        go func() {
                s.stopCluster()
                close(done)
        }()

        re.Eventually(func() bool {
                return !cluster.IsBackgroundJobsRunning()
        }, 5*time.Second, 10*time.Millisecond)
        select {
        case <-done:
                t.Fatal("stopCluster returned before the test observed the shutdown window")
        default:
        }

        if got := s.GetCluster(); got != nil {
                t.Errorf("expected stopCluster to make cluster invisible before waiting for background jobs to stop, got %p", got)
        }

        cluster.wg.Done()
        released = true
        select {
        case <-done:
        case <-time.After(5 * time.Second):
                t.Fatal("stopCluster did not return")
        }
  }
--- FAIL: TestStopClusterUnpublishesClusterBeforeStoppingBackgroundJobs
      server_test.go:100: expected stopCluster to make cluster invisible before waiting for background jobs to stop, got 0xc0002dedc0

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, fixed

}
s.stopWatcher()
s.cleanupClusterResources()
Comment thread
bufferflies marked this conversation as resolved.
Outdated
}

func (s *Server) startMetaConfWatcher() (err error) {
s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.basicCluster)
func (s *Server) startMetaConfWatcher(
ctx context.Context,
basicCluster *core.BasicCluster,
storage *endpoint.StorageEndpoint,
) (metaWatcher *meta.Watcher, configWatcher *config.Watcher, err error) {
metaWatcher, err = meta.NewWatcher(ctx, s.GetClient(), basicCluster)
if err != nil {
return err
return nil, nil, err
}
s.configWatcher, err = config.NewWatcher(s.Context(), s.GetClient(), s.persistConfig, s.storage)
configWatcher, err = config.NewWatcher(ctx, s.GetClient(), s.persistConfig, storage)
if err != nil {
return err
metaWatcher.Close()
return nil, nil, err
}
return err
return metaWatcher, configWatcher, nil
}

func (s *Server) stopWatcher() {
if s.affinityWatcher != nil {
s.affinityWatcher.Close()
s.affinityWatcher = nil
}
if s.ruleWatcher != nil {
s.ruleWatcher.Close()
s.ruleWatcher = nil
}
if s.metaWatcher != nil {
s.metaWatcher.Close()
s.metaWatcher = nil
}
if s.configWatcher != nil {
s.configWatcher.Close()
s.configWatcher = nil
}
}

func (s *Server) cleanupClusterResources() {
s.stopWatcher()
if s.hbStreams != nil {
s.hbStreams.Close()
s.hbStreams = nil
}
s.cluster.Store((*Cluster)(nil))
Comment thread
bufferflies marked this conversation as resolved.
Outdated
s.basicCluster = nil
s.storage = nil
}
Comment thread
lhy1024 marked this conversation as resolved.
Outdated

// GetPersistConfig returns the persist config.
Expand Down
60 changes: 60 additions & 0 deletions pkg/mcs/scheduling/server/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2026 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/testutil"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, testutil.LeakOptions...)
}

func TestCleanupClusterResources(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hbStreams := hbstream.NewHeartbeatStreams(ctx, constant.SchedulingServiceName, core.NewBasicCluster())
basicCluster := core.NewBasicCluster()
storage := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
cluster := &Cluster{}

s := &Server{
basicCluster: basicCluster,
hbStreams: hbStreams,
storage: storage,
}
s.cluster.Store(cluster)

s.cleanupClusterResources()
s.cleanupClusterResources()

re.Nil(s.GetCluster())
re.Nil(s.basicCluster)
re.Nil(s.hbStreams)
re.Nil(s.storage)
}
Loading