Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,4 +281,7 @@ func (rw *Watcher) initializeRegionLabelWatcher() error {
func (rw *Watcher) Close() {
rw.cancel()
rw.wg.Wait()
if rw.checkerController != nil {
rw.checkerController.ClearSuspectKeyRanges()
}
Comment on lines +283 to +285
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This cleanup order can still leave stale suspect key ranges. Close cancels the watcher and immediately calls ClearSuspectKeyRanges, but an already-running watcher callback is not preempted by cancellation and may still run postEventsFn/AddSuspectKeyRange afterward, re-adding suspect ranges after they were cleared.

Could we move ClearSuspectKeyRanges after rw.wg.Wait(), so cleanup happens after all watcher callbacks have exited?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func TestCloseClearsSuspectKeyRangesAfterInFlightCallback(t *testing.T) {
        re := require.New(t)

        checkerCtx, checkerCancel := context.WithCancel(context.Background())
        defer checkerCancel()
        cluster := mockcluster.NewCluster(checkerCtx, mockconfig.NewTestOptions())
        checkerController := checker.NewController(checkerCtx, cluster, cluster.GetCheckerConfig(), nil)

        watcherCtx, watcherCancel := context.WithCancel(context.Background())
        rw := &Watcher{
                ctx:               watcherCtx,
                cancel:            watcherCancel,
                checkerController: checkerController,
        }

        callbackStarted := make(chan struct{})
        releaseCallback := make(chan struct{})
        rw.wg.Add(1)
        go func() {
                defer rw.wg.Done()
                <-watcherCtx.Done()
                close(callbackStarted)
                <-releaseCallback
                checkerController.AddSuspectKeyRange([]byte("a"), []byte("z"))
        }()

        closeDone := make(chan struct{})
        go func() {
                rw.Close()
                close(closeDone)
        }()

        select {
        case <-callbackStarted:
        case <-time.After(time.Second):
                re.FailNow("expected in-flight watcher callback to observe cancellation")
        }
        select {
        case <-closeDone:
                re.FailNow("Close returned before the in-flight callback finished")
        case <-time.After(50 * time.Millisecond):
        }

        close(releaseCallback)
        select {
        case <-closeDone:
        case <-time.After(time.Second):
                re.FailNow("Close did not return after the in-flight callback finished")
        }

        _, ok := checkerController.PopOneSuspectKeyRange()
        re.False(ok, "Close should not leave suspect key ranges added by in-flight callbacks")
  }

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about this

 func (rw *Watcher) Close() {
      rw.cancel()
      rw.wg.Wait()
      if rw.checkerController != nil {
          rw.checkerController.ClearSuspectKeyRanges()
      }
  }

}
76 changes: 55 additions & 21 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,70 +506,104 @@ func (s *Server) startServer() (err error) {
return nil
}

func (s *Server) startCluster(context.Context) error {
s.basicCluster = core.NewBasicCluster()
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
err := s.startMetaConfWatcher()
func (s *Server) startCluster(ctx context.Context) error {
basicCluster := core.NewBasicCluster()
storage := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
metaWatcher, configWatcher, err := s.startMetaConfWatcher(ctx, basicCluster, storage)
if err != nil {
return err
}
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), constant.SchedulingServiceName, s.basicCluster)
cluster, err := NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.checkMembershipCh, s.GetHTTPClient(), s.GetBackendEndpoints())
hbStreams := hbstream.NewHeartbeatStreams(ctx, constant.SchedulingServiceName, basicCluster)
cluster, err := NewCluster(ctx, s.persistConfig, storage, basicCluster, hbStreams, s.checkMembershipCh, s.GetHTTPClient(), s.GetBackendEndpoints())
if err != nil {
hbStreams.Close()
configWatcher.Close()
metaWatcher.Close()
return err
}
s.cluster.Store(cluster)
// Inject the cluster components into the config watcher after the scheduler controller is created.
s.configWatcher.SetSchedulersController(cluster.GetCoordinator().GetSchedulersController())
// Start the rule watcher after the cluster is created.
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.storage,
configWatcher.SetSchedulersController(cluster.GetCoordinator().GetSchedulersController())
ruleWatcher, err := rule.NewWatcher(ctx, s.GetClient(), storage,
cluster.GetCoordinator().GetCheckerController(), cluster.GetRuleManager(), cluster.GetRegionLabeler())
if err != nil {
hbStreams.Close()
configWatcher.Close()
metaWatcher.Close()
return err
}
// Start the affinity watcher after the cluster is created.
s.affinityWatcher, err = affinity.NewWatcher(s.Context(), s.GetClient(), cluster.GetAffinityManager())
affinityWatcher, err := affinity.NewWatcher(ctx, s.GetClient(), cluster.GetAffinityManager())
if err != nil {
ruleWatcher.Close()
hbStreams.Close()
configWatcher.Close()
metaWatcher.Close()
return err
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}

s.basicCluster = basicCluster
Comment thread
lhy1024 marked this conversation as resolved.
Outdated
s.storage = storage
s.metaWatcher = metaWatcher
s.configWatcher = configWatcher
s.hbStreams = hbStreams
s.ruleWatcher = ruleWatcher
s.affinityWatcher = affinityWatcher
s.cluster.Store(cluster)
cluster.StartBackgroundJobs()
return nil
}

func (s *Server) stopCluster() {
cluster := s.GetCluster()
if cluster != nil {
s.cluster.Store((*Cluster)(nil))
cluster.StopBackgroundJobs()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the shutdown visibility order in a risky way. The old cluster remains published via s.GetCluster() while StopBackgroundJobs() is stopping the coordinator/runners. During this window, RPC/API handlers can still enter the old cluster and operate on resources that are already stopping. The previous code unpublished the cluster before stopping watchers. Could we first atomically unpublish the serving resources, then stop the captured old cluster/resources?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, remove the basicCluster from the service struct; all callers should get the basic cluster from the Cluster field.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for moving hbStreams and watchers into Cluster; that fixes the separately mutable Server field issue.

I think the shutdown visibility issue in this thread is still valid, though. stopCluster() still calls cluster.StopBackgroundJobs() before cleanupClusterResources(cluster), and cleanupClusterResources is where s.cluster.Store(nil) happens. So while StopBackgroundJobs() is stopping coordinator/runners and canceling the cluster context, the old cluster is still visible through GetCluster() and handlers can still enter it.

Could we unpublish the cluster first, then stop and clean the captured old cluster? For example: capture old := s.GetCluster(), store nil into s.cluster, then call old.StopBackgroundJobs() and old.cleanupRuntimeResources().

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a regression test to verify this window.

  func TestStopClusterUnpublishesClusterBeforeStoppingBackgroundJobs(t *testing.T) {
        re := require.New(t)
        s, cluster := newServerWithClusterForStopTest(t)

        cluster.heartbeatRunner = syncRunner
        cluster.miscRunner = syncRunner
        cluster.logRunner = syncRunner
        cluster.running.Store(true)
        // Keep StopBackgroundJobs blocked so the test can observe the shutdown window.
        cluster.wg.Add(1)

        done := make(chan struct{})
        released := false
        defer func() {
                if !released {
                        cluster.wg.Done()
                }
                select {
                case <-done:
                case <-time.After(5 * time.Second):
                        t.Log("stopCluster did not return after releasing the test gate")
                }
        }()

        go func() {
                s.stopCluster()
                close(done)
        }()

        re.Eventually(func() bool {
                return !cluster.IsBackgroundJobsRunning()
        }, 5*time.Second, 10*time.Millisecond)
        select {
        case <-done:
                t.Fatal("stopCluster returned before the test observed the shutdown window")
        default:
        }

        if got := s.GetCluster(); got != nil {
                t.Errorf("expected stopCluster to make cluster invisible before waiting for background jobs to stop, got %p", got)
        }

        cluster.wg.Done()
        released = true
        select {
        case <-done:
        case <-time.After(5 * time.Second):
                t.Fatal("stopCluster did not return")
        }
  }
--- FAIL: TestStopClusterUnpublishesClusterBeforeStoppingBackgroundJobs
      server_test.go:100: expected stopCluster to make cluster invisible before waiting for background jobs to stop, got 0xc0002dedc0

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, fixed

}
s.stopWatcher()
s.cleanupClusterResources()
Comment thread
bufferflies marked this conversation as resolved.
Outdated
}

func (s *Server) startMetaConfWatcher() (err error) {
s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.basicCluster)
func (s *Server) startMetaConfWatcher(
ctx context.Context,
basicCluster *core.BasicCluster,
storage *endpoint.StorageEndpoint,
) (metaWatcher *meta.Watcher, configWatcher *config.Watcher, err error) {
metaWatcher, err = meta.NewWatcher(ctx, s.GetClient(), basicCluster)
if err != nil {
return err
return nil, nil, err
}
s.configWatcher, err = config.NewWatcher(s.Context(), s.GetClient(), s.persistConfig, s.storage)
configWatcher, err = config.NewWatcher(ctx, s.GetClient(), s.persistConfig, storage)
if err != nil {
return err
metaWatcher.Close()
return nil, nil, err
}
return err
return metaWatcher, configWatcher, nil
}

func (s *Server) stopWatcher() {
if s.affinityWatcher != nil {
s.affinityWatcher.Close()
s.affinityWatcher = nil
}
if s.ruleWatcher != nil {
s.ruleWatcher.Close()
s.ruleWatcher = nil
}
if s.metaWatcher != nil {
s.metaWatcher.Close()
s.metaWatcher = nil
}
if s.configWatcher != nil {
s.configWatcher.Close()
s.configWatcher = nil
}
}

func (s *Server) cleanupClusterResources() {
s.stopWatcher()
if s.hbStreams != nil {
s.hbStreams.Close()
s.hbStreams = nil
}
s.cluster.Store((*Cluster)(nil))
Comment thread
bufferflies marked this conversation as resolved.
Outdated
s.basicCluster = nil
s.storage = nil
}
Comment thread
lhy1024 marked this conversation as resolved.
Outdated

// GetPersistConfig returns the persist config.
Expand Down
54 changes: 54 additions & 0 deletions pkg/mcs/scheduling/server/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright 2026 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
)

func TestCleanupClusterResources(t *testing.T) {

Check failure on line 30 in pkg/mcs/scheduling/server/server_test.go

View workflow job for this annotation

GitHub Actions / statics

test function TestCleanupClusterResources is not covered by goleak (goleak not imported)

Check failure on line 30 in pkg/mcs/scheduling/server/server_test.go

View workflow job for this annotation

GitHub Actions / statics

test function TestCleanupClusterResources is not covered by goleak (goleak not imported)
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

hbStreams := hbstream.NewHeartbeatStreams(ctx, constant.SchedulingServiceName, core.NewBasicCluster())
basicCluster := core.NewBasicCluster()
storage := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
cluster := &Cluster{}

s := &Server{
basicCluster: basicCluster,
hbStreams: hbStreams,
storage: storage,
}
s.cluster.Store(cluster)

s.cleanupClusterResources()
s.cleanupClusterResources()

re.Nil(s.GetCluster())
re.Nil(s.basicCluster)
re.Nil(s.hbStreams)
re.Nil(s.storage)
}
Loading