Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20260326084500-678ff92b1edd
github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.20.5
github.com/stretchr/testify v1.9.0
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20260326084500-678ff92b1edd h1:FA2DzGly3tuBWFjktkJxmqeOVEqgrsUvKMQXAw9xvWE=
github.com/pingcap/kvproto v0.0.0-20260326084500-678ff92b1edd/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE=
github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359 h1:oteLtLuoWZN3uvfH836U0IIJ+s3UOk11q7GaQ0Tk+wc=
github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ require (
github.com/pingcap/errcode v0.3.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86
github.com/pingcap/kvproto v0.0.0-20260326084500-678ff92b1edd
github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/pingcap/metering_sdk v0.0.0-20260203082503-b9f282339654
github.com/pingcap/sysutil v1.0.1-0.20230407040306-fb007c5aff21
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -480,8 +480,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ue
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86 h1:tdMsjOqUR7YXHoBitzdebTvOjs/swniBTOLy5XiMtuE=
github.com/pingcap/failpoint v0.0.0-20240528011301-b51a646c7c86/go.mod h1:exzhVYca3WRtd6gclGNErRWb1qEgff3LYta0LvRmON4=
github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w=
github.com/pingcap/kvproto v0.0.0-20260326084500-678ff92b1edd h1:FA2DzGly3tuBWFjktkJxmqeOVEqgrsUvKMQXAw9xvWE=
github.com/pingcap/kvproto v0.0.0-20260326084500-678ff92b1edd/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE=
github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359 h1:oteLtLuoWZN3uvfH836U0IIJ+s3UOk11q7GaQ0Tk+wc=
github.com/pingcap/kvproto v0.0.0-20260511034003-fc9e0458a359/go.mod h1:z6+aAHB7dBkA+LyinEX+48/ImRJ3jag0Hg0c7wkhEvE=
github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
Expand Down
5 changes: 5 additions & 0 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,11 @@ func (c *Cluster) GetCoordinator() *schedule.Coordinator {
return c.coordinator
}

// SendRegionHeartbeatMessage sends an operation to the target region through heartbeat stream.
func (c *Cluster) SendRegionHeartbeatMessage(region *core.RegionInfo, op *hbstream.Operation) {
c.coordinator.GetHeartbeatStreams().SendMsg(region, op)
}

// GetHotStat gets hot stat.
func (c *Cluster) GetHotStat() *statistics.HotStat {
return c.hotStat
Expand Down
23 changes: 19 additions & 4 deletions pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/registry"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/logutil"
Expand Down Expand Up @@ -372,7 +373,16 @@ func (s *Service) AskBatchSplit(_ context.Context, request *schedulingpb.AskBatc
splitCount := request.GetSplitCount()
err := c.ValidRegion(reqRegion)
if err != nil {
return nil, err
return &schedulingpb.AskBatchSplitResponse{
Header: wrapErrorToHeader(schedulingpb.ErrorType_UNKNOWN, err.Error()),
}, nil
}
region := c.GetRegion(reqRegion.GetId())
if affinityManager := c.GetAffinityManager(); affinityManager != nil && !affinityManager.AllowSplit(region, request.GetReason()) {
c.SendRegionHeartbeatMessage(region, &hbstream.Operation{ChangeSplit: &pdpb.ChangeSplit{AutoSplitEnabled: false}})
return &schedulingpb.AskBatchSplitResponse{
Header: wrapErrorToHeader(schedulingpb.ErrorType_UNKNOWN, "cannot split affinity region"),
}, nil
}
splitIDs := make([]*pdpb.SplitID, 0, splitCount)
recordRegions := make([]uint64, 0, splitCount+1)
Expand Down Expand Up @@ -440,9 +450,14 @@ func (s *Service) AskBatchSplit(_ context.Context, request *schedulingpb.AskBatc
// status may be left, and these regions need to be checked with higher
// priority.
c.GetCoordinator().GetCheckerController().AddPendingProcessedRegions(false, recordRegions...)
// TODO(split-scatter): if load-based split-scatter is supported on the
// scheduling-service/NEXT_GEN path, record split-scatter batches here and
// plumb SplitReason through the forwarding request.
if request.GetReason() == pdpb.SplitReason_LOAD {
newRegionIDs := recordRegions[:len(recordRegions)-1]
c.GetCoordinator().GetCheckerController().RecordSplitScatterBatch(
reqRegion.GetId(),
reqRegion.GetRegionEpoch().GetVersion()+1,
newRegionIDs,
)
}

return &schedulingpb.AskBatchSplitResponse{
Header: wrapHeader(),
Expand Down
255 changes: 255 additions & 0 deletions pkg/mcs/scheduling/server/grpc_service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
// Copyright 2026 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"fmt"
"net/http"
"testing"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"google.golang.org/grpc"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/kvproto/pkg/schedulingpb"

"github.com/tikv/pd/pkg/cache"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mcs/scheduling/server/config"
"github.com/tikv/pd/pkg/mcs/utils/constant"
"github.com/tikv/pd/pkg/schedule/affinity"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/hbstream"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/keyutil"
"github.com/tikv/pd/pkg/utils/testutil"
)

type captureHeartbeatStream struct {
ch chan core.RegionHeartbeatResponse
}

func (s *captureHeartbeatStream) Send(resp core.RegionHeartbeatResponse) error {
s.ch <- resp
return nil
}

type splitScatterPDClient struct {
pdpb.PDClient
next uint64
}

func (c *splitScatterPDClient) AllocID(_ context.Context, req *pdpb.AllocIDRequest, _ ...grpc.CallOption) (*pdpb.AllocIDResponse, error) {
count := req.GetCount()
if count == 0 {
count = 1
}
c.next += uint64(count)
return &pdpb.AllocIDResponse{
Header: &pdpb.ResponseHeader{},
Id: c.next - 1,
Count: count,
}, nil
}

func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, testutil.LeakOptions...)
}

func waitHeartbeatStreamBound(t *testing.T, hbStreams *hbstream.HeartbeatStreams, region *core.RegionInfo, stream *captureHeartbeatStream) {
t.Helper()

ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
timeout := time.After(time.Second)
for {
hbStreams.SendMsg(region, &hbstream.Operation{})
select {
case <-stream.ch:
for {
select {
case <-stream.ch:
default:
return
}
}
case <-ticker.C:
case <-timeout:
require.FailNow(t, "expected heartbeat stream to be bound")
}
}
}

func TestAskBatchSplitRejectsAffinityAutoSplit(t *testing.T) {
re := require.New(t)

svc, cluster, stream := newTestSchedulingServiceForSplit(t)

region := newAffinitySplitTestRegion()
cluster.PutRegion(region)
err := cluster.GetAffinityManager().CreateAffinityGroups([]affinity.GroupKeyRanges{
{
GroupID: "g1",
KeyRanges: []keyutil.KeyRange{
{StartKey: []byte("a"), EndKey: []byte("z")},
},
},
})
re.NoError(err)
_, err = cluster.GetAffinityManager().UpdateAffinityGroupPeers("g1", 1, []uint64{1, 2, 3})
re.NoError(err)
hbStreams := cluster.GetCoordinator().GetHeartbeatStreams()
hbStreams.BindStream(1, stream)
waitHeartbeatStreamBound(t, hbStreams, region, stream)

resp, err := svc.AskBatchSplit(context.Background(), &schedulingpb.AskBatchSplitRequest{
Region: region.GetMeta(),
SplitCount: 1,
Reason: pdpb.SplitReason_LOAD,
})
re.NoError(err)
re.Equal(schedulingpb.ErrorType_UNKNOWN, resp.GetHeader().GetError().GetType())
re.Contains(resp.GetHeader().GetError().GetMessage(), "cannot split affinity region")
re.Empty(resp.GetIds())

select {
case msg := <-stream.ch:
heartbeatResp, ok := msg.(*schedulingpb.RegionHeartbeatResponse)
re.True(ok)
re.Equal(region.GetID(), heartbeatResp.GetRegionId())
re.NotNil(heartbeatResp.GetChangeSplit())
re.False(heartbeatResp.GetChangeSplit().GetAutoSplitEnabled())
case <-time.After(time.Second):
Comment thread
coderabbitai[bot] marked this conversation as resolved.
re.Fail("expected ChangeSplit heartbeat response")
}
}

func TestAskBatchSplitRecordsSplitScatterInSchedulingService(t *testing.T) {
re := require.New(t)

svc, cluster, _ := newTestSchedulingServiceForSplit(t)
cluster.SwitchPDLeader(&splitScatterPDClient{next: 1000})

region := newAffinitySplitTestRegion()
cluster.PutRegion(region)
resp, err := svc.AskBatchSplit(context.Background(), &schedulingpb.AskBatchSplitRequest{
Region: region.GetMeta(),
SplitCount: 1,
Reason: pdpb.SplitReason_LOAD,
})
re.NoError(err)
re.Empty(resp.GetHeader().GetError())
re.Len(resp.GetIds(), 1)

re.Equal(float64(2), splitScatterPendingMetricValue(t))
}

func newTestSchedulingServiceForSplit(
t *testing.T,
) (*Service, *Cluster, *captureHeartbeatStream) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
oldClusterID := keypath.ClusterID()
keypath.SetClusterID(1)
t.Cleanup(func() {
keypath.SetClusterID(oldClusterID)
})
cfg, err := GenerateConfig(&config.Config{
Name: "test-scheduling",
ListenAddr: "http://127.0.0.1:0",
AdvertiseListenAddr: "http://127.0.0.1:0",
BackendEndpoints: "http://127.0.0.1:2379",
})
re.NoError(err)
cfg.Schedule.AffinityScheduleLimit = 4
cfg.Schedule.MaxAffinityMergeRegionSize = 10

basicCluster := core.NewBasicCluster()
for i := uint64(1); i <= 3; i++ {
basicCluster.PutStore(core.NewStoreInfo(&metapb.Store{
Id: i,
Address: fmt.Sprintf("mock://tikv-%d", i),
State: metapb.StoreState_Up,
}))
}

storage := endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
hbStreams := hbstream.NewHeartbeatStreams(ctx, constant.SchedulingServiceName, basicCluster)
persistConfig := config.NewPersistConfig(
cfg,
cache.NewStringTTL(ctx, sc.DefaultGCInterval, sc.DefaultTTL),
)
cluster, err := NewCluster(
ctx,
persistConfig,
storage,
basicCluster,
hbStreams,
make(chan struct{}),
http.DefaultClient,
"",
)
re.NoError(err)

server := &Server{}
server.cluster.Store(cluster)
stream := &captureHeartbeatStream{ch: make(chan core.RegionHeartbeatResponse, 16)}
t.Cleanup(hbStreams.Close)
return &Service{Server: server}, cluster, stream
}

func newAffinitySplitTestRegion() *core.RegionInfo {
peers := []*metapb.Peer{
{Id: 11, StoreId: 1},
{Id: 22, StoreId: 2},
{Id: 33, StoreId: 3},
}
return core.NewRegionInfo(&metapb.Region{
Id: 100,
StartKey: []byte("b"),
EndKey: []byte("y"),
Peers: peers,
RegionEpoch: &metapb.RegionEpoch{ConfVer: 1, Version: 1},
}, peers[0],
core.SetApproximateSize(5),
core.SetApproximateKeys(int64(5*sc.RegionSizeToKeysRatio)),
)
}

func splitScatterPendingMetricValue(t *testing.T) float64 {
t.Helper()
metricFamilies, err := prometheus.DefaultGatherer.Gather()
require.NoError(t, err)
for _, metricFamily := range metricFamilies {
if metricFamily.GetName() != "pd_checker_split_scatter_pending" {
continue
}
var value float64
for _, metric := range metricFamily.GetMetric() {
value += metric.GetGauge().GetValue()
}
return value
}
require.FailNow(t, "split scatter pending metric not found")
return 0
}
Loading
Loading