Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 36 additions & 6 deletions pkg/mcs/scheduling/server/apis/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"

pdcore "github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
Expand Down Expand Up @@ -1481,13 +1482,17 @@ func checkRegionsReplicated(c *gin.Context) {
// @Router /stores/{id} [get]
func getStoreByID(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
basicCluster, ok := getBasicCluster(c, svr)
if !ok {
return
}
idStr := c.Param("id")
storeID, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
c.String(http.StatusBadRequest, err.Error())
return
}
store := svr.GetBasicCluster().GetStore(storeID)
store := basicCluster.GetStore(storeID)
if store == nil {
c.String(http.StatusNotFound, errs.ErrStoreNotFound.FastGenByArgs(storeID).Error())
return
Expand All @@ -1505,14 +1510,18 @@ func getStoreByID(c *gin.Context) {
// @Router /stores [get]
func getAllStores(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
stores := svr.GetBasicCluster().GetMetaStores()
basicCluster, ok := getBasicCluster(c, svr)
if !ok {
return
}
stores := basicCluster.GetMetaStores()
StoresInfo := &response.StoresInfo{
Stores: make([]*response.StoreInfo, 0, len(stores)),
}

for _, s := range stores {
storeID := s.GetId()
store := svr.GetBasicCluster().GetStore(storeID)
store := basicCluster.GetStore(storeID)
if store == nil {
c.String(http.StatusInternalServerError, errs.ErrStoreNotFound.FastGenByArgs(storeID).Error())
return
Expand All @@ -1534,7 +1543,11 @@ func getAllStores(c *gin.Context) {
// @Router /regions [get]
func getAllRegions(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
regions := svr.GetBasicCluster().GetRegions()
basicCluster, ok := getBasicCluster(c, svr)
if !ok {
return
}
regions := basicCluster.GetRegions()
b, err := response.MarshalRegionsInfoJSON(c.Request.Context(), regions)
if err != nil {
c.String(http.StatusInternalServerError, err.Error())
Expand All @@ -1550,7 +1563,11 @@ func getAllRegions(c *gin.Context) {
// @Router /regions/count [get]
func getRegionCount(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
count := svr.GetBasicCluster().GetTotalRegionCount()
basicCluster, ok := getBasicCluster(c, svr)
if !ok {
return
}
count := basicCluster.GetTotalRegionCount()
c.IndentedJSON(http.StatusOK, &response.RegionsInfo{Count: count})
}

Expand All @@ -1563,6 +1580,10 @@ func getRegionCount(c *gin.Context) {
// @Router /regions/{id} [get]
func getRegionByID(c *gin.Context) {
svr := c.MustGet(multiservicesapi.ServiceContextKey).(*scheserver.Server)
basicCluster, ok := getBasicCluster(c, svr)
if !ok {
return
}
idStr := c.Param("id")
regionID, err := strconv.ParseUint(idStr, 10, 64)
if err != nil {
Expand All @@ -1573,7 +1594,7 @@ func getRegionByID(c *gin.Context) {
c.String(http.StatusBadRequest, errs.ErrRegionInvalidID.FastGenByArgs().Error())
return
}
regionInfo := svr.GetBasicCluster().GetRegion(regionID)
regionInfo := basicCluster.GetRegion(regionID)
if regionInfo == nil {
c.String(http.StatusNotFound, errs.ErrRegionNotFound.FastGenByArgs(regionID).Error())
return
Expand Down Expand Up @@ -1633,6 +1654,15 @@ func getAffinityManager(c *gin.Context) (*affinity.Manager, bool) {
return manager, true
}

func getBasicCluster(c *gin.Context, svr *scheserver.Server) (*pdcore.BasicCluster, bool) {
basicCluster := svr.GetBasicCluster()
Comment thread
lhy1024 marked this conversation as resolved.
if basicCluster == nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, errs.ErrNotBootstrapped.GenWithStackByArgs().Error())
return nil, false
}
return basicCluster, true
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

// @Tags affinity-groups
// @Summary List all affinity groups.
// @Param ids query []string false "Optional affinity group IDs. Repeat as ids=a&ids=b."
Expand Down
48 changes: 48 additions & 0 deletions pkg/mcs/scheduling/server/apis/v1/api_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
// Copyright 2026 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package apis

import (
"net/http"
"net/http/httptest"
"testing"

"github.com/gin-gonic/gin"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

scheserver "github.com/tikv/pd/pkg/mcs/scheduling/server"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/testutil"
)

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, testutil.LeakOptions...)
}

func TestGetAllStoresReturnsNotBootstrappedWhenBasicClusterMissing(t *testing.T) {
gin.SetMode(gin.TestMode)
re := require.New(t)

resp := httptest.NewRecorder()
ctx, _ := gin.CreateTestContext(resp)
ctx.Request = httptest.NewRequest(http.MethodGet, "/stores", nil)
ctx.Set(multiservicesapi.ServiceContextKey, &scheserver.Server{})

getAllStores(ctx)

re.Equal(http.StatusInternalServerError, resp.Code)
re.Contains(resp.Body.String(), "not bootstrapped")
}
Comment thread
bufferflies marked this conversation as resolved.
6 changes: 4 additions & 2 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,9 +654,10 @@ func (c *Cluster) StartBackgroundJobs() {
}

// StopBackgroundJobs stops background jobs.
func (c *Cluster) StopBackgroundJobs() {
// It will return false if the cluster isn't running, otherwise it will stop the background jobs and return true.
func (c *Cluster) StopBackgroundJobs() bool {
if !c.running.Load() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On startup rollback, NewCluster may have already started goroutines while StartBackgroundJobs was never called, so running is false and this returns without canceling c.ctx. Please add a cleanup path that cancels the cluster context even before background jobs start.

return
return false
}
c.running.Store(false)
c.coordinator.Stop()
Expand All @@ -665,6 +666,7 @@ func (c *Cluster) StopBackgroundJobs() {
c.logRunner.Stop()
c.cancel()
c.wg.Wait()
return true
}

// IsBackgroundJobsRunning returns whether the background jobs are running. Only for test purpose.
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
}

c := s.GetCluster()
if c == nil {
if c == nil || s.hbStreams == nil {
Comment thread
lhy1024 marked this conversation as resolved.
Outdated
resp := &schedulingpb.RegionHeartbeatResponse{Header: notBootstrappedHeader()}
err := server.Send(resp)
return errors.WithStack(err)
Expand Down Expand Up @@ -243,7 +243,7 @@ func (s *Service) RegionBuckets(stream schedulingpb.Scheduling_RegionBucketsServ
// StoreHeartbeat implements gRPC SchedulingServer.
func (s *Service) StoreHeartbeat(_ context.Context, request *schedulingpb.StoreHeartbeatRequest) (*schedulingpb.StoreHeartbeatResponse, error) {
c := s.GetCluster()
if c == nil {
if c == nil || s.metaWatcher == nil {
return &schedulingpb.StoreHeartbeatResponse{Header: notBootstrappedHeader()}, nil
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/mcs/scheduling/server/rule/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,4 +281,7 @@ func (rw *Watcher) initializeRegionLabelWatcher() error {
func (rw *Watcher) Close() {
rw.cancel()
rw.wg.Wait()
if rw.checkerController != nil {
rw.checkerController.ClearSuspectKeyRanges()
}
}
90 changes: 67 additions & 23 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ type Server struct {

cfg *config.Config
persistConfig *config.PersistConfig
basicCluster *core.BasicCluster

// for the primary election of scheduling
participant *member.Participant
Expand Down Expand Up @@ -426,7 +425,10 @@ func (s *Server) GetCluster() *Cluster {

// GetBasicCluster returns the basic cluster.
func (s *Server) GetBasicCluster() *core.BasicCluster {
return s.basicCluster
if cluster := s.GetCluster(); cluster != nil {
return cluster.GetBasicCluster()
}
return nil
}

// GetCoordinator returns the coordinator.
Expand Down Expand Up @@ -506,70 +508,112 @@ func (s *Server) startServer() (err error) {
return nil
}

func (s *Server) startCluster(context.Context) error {
s.basicCluster = core.NewBasicCluster()
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
err := s.startMetaConfWatcher()
func (s *Server) startCluster(ctx context.Context) (err error) {
basicCluster := core.NewBasicCluster()
storage := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
metaWatcher, configWatcher, err := s.startMetaConfWatcher(ctx, basicCluster, storage)
if err != nil {
return err
}
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), constant.SchedulingServiceName, s.basicCluster)
cluster, err := NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.checkMembershipCh, s.GetHTTPClient(), s.GetBackendEndpoints())
hbStreams := hbstream.NewHeartbeatStreams(ctx, constant.SchedulingServiceName, basicCluster)
cluster, err := NewCluster(ctx, s.persistConfig, storage, basicCluster, hbStreams, s.checkMembershipCh, s.GetHTTPClient(), s.GetBackendEndpoints())
defer func() {
// make sure the cluster is stopped if any error occurs
// if StopBackgroundJobs return false, it means the cluster is not running, so we need to close the context make the
// other goroutines exit.
if cluster != nil && !cluster.StopBackgroundJobs() {
cluster.cancel()
}
}()
if err != nil {
hbStreams.Close()
configWatcher.Close()
metaWatcher.Close()
return err
}
s.cluster.Store(cluster)
// Inject the cluster components into the config watcher after the scheduler controller is created.
s.configWatcher.SetSchedulersController(cluster.GetCoordinator().GetSchedulersController())
// Start the rule watcher after the cluster is created.
s.ruleWatcher, err = rule.NewWatcher(s.Context(), s.GetClient(), s.storage,

configWatcher.SetSchedulersController(cluster.GetCoordinator().GetSchedulersController())
ruleWatcher, err := rule.NewWatcher(ctx, s.GetClient(), storage,
cluster.GetCoordinator().GetCheckerController(), cluster.GetRuleManager(), cluster.GetRegionLabeler())
if err != nil {
hbStreams.Close()
configWatcher.Close()
metaWatcher.Close()
return err
}
// Start the affinity watcher after the cluster is created.
s.affinityWatcher, err = affinity.NewWatcher(s.Context(), s.GetClient(), cluster.GetAffinityManager())
affinityWatcher, err := affinity.NewWatcher(ctx, s.GetClient(), cluster.GetAffinityManager())
if err != nil {
ruleWatcher.Close()
hbStreams.Close()
configWatcher.Close()
metaWatcher.Close()
return err
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}

s.storage = storage
s.metaWatcher = metaWatcher
s.configWatcher = configWatcher
s.hbStreams = hbStreams
s.ruleWatcher = ruleWatcher
s.affinityWatcher = affinityWatcher
s.cluster.Store(cluster)
cluster.StartBackgroundJobs()
cluster = nil // defer cleanup no longer needed
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only skips cluster.StopBackgroundJobs; the defer will still close hbStreams and all watchers after a successful start because those locals remain non-nil. Please run the defer only on error, or nil out the resources after ownership is transferred to cluster.

return nil
}

func (s *Server) stopCluster() {
cluster := s.GetCluster()
if cluster != nil {
s.cluster.Store((*Cluster)(nil))
cluster.StopBackgroundJobs()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This changes the shutdown visibility order in a risky way. The old cluster remains published via s.GetCluster() while StopBackgroundJobs() is stopping the coordinator/runners. During this window, RPC/API handlers can still enter the old cluster and operate on resources that are already stopping. The previous code unpublished the cluster before stopping watchers. Could we first atomically unpublish the serving resources, then stop the captured old cluster/resources?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, remove the basicCluster from the service struct; all callers should get the basic cluster from the Cluster field.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for moving hbStreams and watchers into Cluster; that fixes the separately mutable Server field issue.

I think the shutdown visibility issue in this thread is still valid, though. stopCluster() still calls cluster.StopBackgroundJobs() before cleanupClusterResources(cluster), and cleanupClusterResources is where s.cluster.Store(nil) happens. So while StopBackgroundJobs() is stopping coordinator/runners and canceling the cluster context, the old cluster is still visible through GetCluster() and handlers can still enter it.

Could we unpublish the cluster first, then stop and clean the captured old cluster? For example: capture old := s.GetCluster(), store nil into s.cluster, then call old.StopBackgroundJobs() and old.cleanupRuntimeResources().

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a regression test to verify this window.

  func TestStopClusterUnpublishesClusterBeforeStoppingBackgroundJobs(t *testing.T) {
        re := require.New(t)
        s, cluster := newServerWithClusterForStopTest(t)

        cluster.heartbeatRunner = syncRunner
        cluster.miscRunner = syncRunner
        cluster.logRunner = syncRunner
        cluster.running.Store(true)
        // Keep StopBackgroundJobs blocked so the test can observe the shutdown window.
        cluster.wg.Add(1)

        done := make(chan struct{})
        released := false
        defer func() {
                if !released {
                        cluster.wg.Done()
                }
                select {
                case <-done:
                case <-time.After(5 * time.Second):
                        t.Log("stopCluster did not return after releasing the test gate")
                }
        }()

        go func() {
                s.stopCluster()
                close(done)
        }()

        re.Eventually(func() bool {
                return !cluster.IsBackgroundJobsRunning()
        }, 5*time.Second, 10*time.Millisecond)
        select {
        case <-done:
                t.Fatal("stopCluster returned before the test observed the shutdown window")
        default:
        }

        if got := s.GetCluster(); got != nil {
                t.Errorf("expected stopCluster to make cluster invisible before waiting for background jobs to stop, got %p", got)
        }

        cluster.wg.Done()
        released = true
        select {
        case <-done:
        case <-time.After(5 * time.Second):
                t.Fatal("stopCluster did not return")
        }
  }
--- FAIL: TestStopClusterUnpublishesClusterBeforeStoppingBackgroundJobs
      server_test.go:100: expected stopCluster to make cluster invisible before waiting for background jobs to stop, got 0xc0002dedc0

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, fixed

}
s.stopWatcher()
s.cleanupClusterResources()
Comment thread
bufferflies marked this conversation as resolved.
Outdated
}

func (s *Server) startMetaConfWatcher() (err error) {
s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.basicCluster)
func (s *Server) startMetaConfWatcher(
ctx context.Context,
basicCluster *core.BasicCluster,
storage *endpoint.StorageEndpoint,
) (metaWatcher *meta.Watcher, configWatcher *config.Watcher, err error) {
metaWatcher, err = meta.NewWatcher(ctx, s.GetClient(), basicCluster)
if err != nil {
return err
return nil, nil, err
}
s.configWatcher, err = config.NewWatcher(s.Context(), s.GetClient(), s.persistConfig, s.storage)
configWatcher, err = config.NewWatcher(ctx, s.GetClient(), s.persistConfig, storage)
if err != nil {
return err
metaWatcher.Close()
return nil, nil, err
}
return err
return metaWatcher, configWatcher, nil
}

func (s *Server) stopWatcher() {
if s.affinityWatcher != nil {
s.affinityWatcher.Close()
s.affinityWatcher = nil
}
if s.ruleWatcher != nil {
s.ruleWatcher.Close()
s.ruleWatcher = nil
}
if s.metaWatcher != nil {
s.metaWatcher.Close()
s.metaWatcher = nil
}
if s.configWatcher != nil {
s.configWatcher.Close()
s.configWatcher = nil
}
}

func (s *Server) cleanupClusterResources() {
s.cluster.Store((*Cluster)(nil))
s.stopWatcher()
if s.hbStreams != nil {
s.hbStreams.Close()
s.hbStreams = nil
}
s.storage = nil
}

// GetPersistConfig returns the persist config.
Expand Down
Loading
Loading