Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion pkg/syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/memory"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/kv"
Expand All @@ -49,6 +50,8 @@ const (
maxSyncRegionBatchSize = 1000
syncerKeepAliveInterval = 10 * time.Second
defaultHistoryBufferSize = 10000
maxHistoryBufferSize = 100000
historyBufferMemoryStep = 4 * 1024 * 1024 * 1024
)

// ClientStream is the client side of the region syncer.
Expand Down Expand Up @@ -118,9 +121,10 @@ func NewRegionSyncer(s Server) *RegionSyncer {
if regionStorage == nil {
return nil
}
historyBufferSize := historyBufferSizeFromMemory(memory.GetMemTotalIgnoreErr())
Comment thread
okJiang marked this conversation as resolved.
syncer := &RegionSyncer{
server: s,
history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)),
history: newHistoryBuffer(historyBufferSize, regionStorage.(kv.Base)),
limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity),
sendTimeout: syncerKeepAliveInterval,
tlsConfig: s.GetTLSConfig(),
Expand All @@ -129,6 +133,20 @@ func NewRegionSyncer(s Server) *RegionSyncer {
return syncer
}

func historyBufferSizeFromMemory(totalMemory uint64) int {
if totalMemory == 0 {
return defaultHistoryBufferSize
}
size := int(uint64(defaultHistoryBufferSize) * totalMemory / historyBufferMemoryStep)
if size < defaultHistoryBufferSize {
return defaultHistoryBufferSize
}
if size > maxHistoryBufferSize {
return maxHistoryBufferSize
}
return size
}

// RunServer runs the server of the region syncer.
// regionNotifier is used to get the changed regions.
func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *core.RegionInfo) {
Expand Down
21 changes: 21 additions & 0 deletions pkg/syncer/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,27 @@ import (
"github.com/tikv/pd/pkg/utils/testutil"
)

func TestHistoryBufferSizeFromMemory(t *testing.T) {
re := require.New(t)
testCases := []struct {
name string
totalMemory uint64
expected int
}{
{name: "zero-memory", totalMemory: 0, expected: defaultHistoryBufferSize},
{name: "below-minimum", totalMemory: historyBufferMemoryStep / 2, expected: defaultHistoryBufferSize},
{name: "base-step", totalMemory: historyBufferMemoryStep, expected: defaultHistoryBufferSize},
{name: "scaled-linearly", totalMemory: historyBufferMemoryStep * 6 / 4, expected: 15000},
{name: "max-clamped", totalMemory: historyBufferMemoryStep * 64 / 4, expected: maxHistoryBufferSize},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(_ *testing.T) {
re.Equal(testCase.expected, historyBufferSizeFromMemory(testCase.totalMemory))
})
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Outdated
}
}

func TestSyncExitsWhenRegionSyncerStops(t *testing.T) {
re := require.New(t)
tempDir := t.TempDir()
Expand Down
Loading