Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 100 additions & 2 deletions metrics/grafana/pd.json
Comment thread
okJiang marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -13672,7 +13672,7 @@
"steppedLine": false,
"targets": [
{
"expr": "pd_region_syncer_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"sync_index\"}",
"expr": "pd_region_syncer_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=~\".*pd.*\", type=\"sync_index\"}",
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just delete the Sync index and History index directly? They don't seem to have much of a reason to exist.

What do you think? @rleungx

"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{instance}}",
Expand Down Expand Up @@ -13755,7 +13755,7 @@
"steppedLine": false,
"targets": [
{
"expr": "pd_region_syncer_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"last_index\"}",
"expr": "pd_region_syncer_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=~\".*pd.*\", type=\"last_index\"}",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "{{instance}}",
Expand Down Expand Up @@ -13798,6 +13798,104 @@
"show": true
}
]
},
{
"aliasColors": {},
"bars": false,
"dashLength": 10,
"dashes": false,
"datasource": "${DS_TEST-CLUSTER}",
"fill": 1,
"gridPos": {
"h": 7,
"w": 24,
"x": 0,
"y": 220
},
"id": 1503,
"legend": {
"alignAsTable": true,
"avg": false,
"current": false,
"max": false,
"min": false,
"rightSide": true,
"show": true,
"total": false,
"values": false
},
"lines": true,
"linewidth": 1,
"links": [],
"nullPointMode": "null",
"percentage": false,
"pointradius": 5,
"points": false,
"renderer": "flot",
"seriesOverrides": [
{
"alias": "history buffer size",
"color": "#E24D42",
"dashes": true,
"fill": 0,
"linewidth": 3
}
],
"spaceLength": 10,
"stack": false,
"steppedLine": false,
"targets": [
{
"expr": "max(pd_region_syncer_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=~\".*pd.*\", type=\"last_index\"}) - min(pd_region_syncer_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=~\".*pd.*\", type=\"sync_index\"})",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "index gap",
"refId": "A"
},
{
"expr": "max(pd_region_syncer_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=~\".*pd.*\", type=\"history_buffer_size\"})",
"format": "time_series",
"intervalFactor": 2,
"legendFormat": "history buffer size",
Comment thread
okJiang marked this conversation as resolved.
"refId": "B"
}
],
"thresholds": [],
"timeFrom": null,
"timeRegions": [],
"timeShift": null,
"title": "History index gap",
"tooltip": {
"shared": true,
"sort": 0,
"value_type": "individual"
},
"type": "graph",
"xaxis": {
"buckets": null,
"mode": "time",
"name": null,
"show": true,
"values": []
},
"yaxes": [
{
"format": "none",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
},
{
"format": "short",
"label": null,
"logBase": 1,
"max": null,
"min": null,
"show": true
}
]
}
],
"repeat": null,
Expand Down
18 changes: 13 additions & 5 deletions pkg/syncer/history_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ const (

var (
// WithLabelValues is a heavy operation, define variable to avoid call it every time.
syncIndexGauge = regionSyncerStatus.WithLabelValues("sync_index")
firstIndexGauge = regionSyncerStatus.WithLabelValues("first_index")
lastIndexGauge = regionSyncerStatus.WithLabelValues("last_index")
syncIndexGauge = regionSyncerStatus.WithLabelValues("sync_index")
firstIndexGauge = regionSyncerStatus.WithLabelValues("first_index")
lastIndexGauge = regionSyncerStatus.WithLabelValues("last_index")
historyBufferSizeGauge = regionSyncerStatus.WithLabelValues("history_buffer_size")
)

type historyBuffer struct {
Expand All @@ -56,6 +57,7 @@ func newHistoryBuffer(size int, kv kv.Base) *historyBuffer {
if size < 2 {
size = 2
}
historyBufferSizeGauge.Set(float64(size - 1))
records := make([]*core.RegionInfo, size)
h := &historyBuffer{
records: records,
Expand All @@ -64,6 +66,7 @@ func newHistoryBuffer(size int, kv kv.Base) *historyBuffer {
flushCount: defaultFlushCount,
}
h.reload()
h.updateHistoryIndexMetrics()
return h
}

Expand Down Expand Up @@ -96,6 +99,7 @@ func (h *historyBuffer) record(r *core.RegionInfo) {
h.head = (h.head + 1) % h.size
}
h.index++
h.updateHistoryIndexMetrics()
h.flushCount--
if h.flushCount <= 0 {
h.persist()
Expand Down Expand Up @@ -126,6 +130,7 @@ func (h *historyBuffer) resetWithIndex(index uint64) {
h.head = 0
h.tail = 0
h.flushCount = defaultFlushCount
h.updateHistoryIndexMetrics()
}

func (h *historyBuffer) getNextIndex() uint64 {
Expand Down Expand Up @@ -159,10 +164,13 @@ func (h *historyBuffer) reload() {
}

func (h *historyBuffer) persist() {
firstIndexGauge.Set(float64(h.firstIndex()))
lastIndexGauge.Set(float64(h.nextIndex()))
err := h.kv.Save(historyKey, strconv.FormatUint(h.nextIndex(), 10))
if err != nil {
log.Warn("persist history index failed", zap.Uint64("persist-index", h.nextIndex()), errs.ZapError(err))
}
}

func (h *historyBuffer) updateHistoryIndexMetrics() {
firstIndexGauge.Set(float64(h.firstIndex()))
lastIndexGauge.Set(float64(h.nextIndex()))
}
20 changes: 19 additions & 1 deletion pkg/syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/memory"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/storage/kv"
Expand All @@ -49,6 +50,8 @@ const (
maxSyncRegionBatchSize = 1000
syncerKeepAliveInterval = 10 * time.Second
defaultHistoryBufferSize = 10000
maxHistoryBufferSize = 100000
historyBufferMemoryStep = 4 * 1024 * 1024 * 1024
)

// ClientStream is the client side of the region syncer.
Expand Down Expand Up @@ -118,9 +121,10 @@ func NewRegionSyncer(s Server) *RegionSyncer {
if regionStorage == nil {
return nil
}
historyBufferSize := historyBufferSizeFromMemory(memory.GetMemTotalIgnoreErr())
Comment thread
okJiang marked this conversation as resolved.
syncer := &RegionSyncer{
server: s,
history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)),
history: newHistoryBuffer(historyBufferSize, regionStorage.(kv.Base)),
limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity),
sendTimeout: syncerKeepAliveInterval,
tlsConfig: s.GetTLSConfig(),
Expand All @@ -129,6 +133,20 @@ func NewRegionSyncer(s Server) *RegionSyncer {
return syncer
}

func historyBufferSizeFromMemory(totalMemory uint64) int {
if totalMemory == 0 {
return defaultHistoryBufferSize
}
size := int(uint64(defaultHistoryBufferSize) * totalMemory / historyBufferMemoryStep)
if size < defaultHistoryBufferSize {
return defaultHistoryBufferSize
}
if size > maxHistoryBufferSize {
return maxHistoryBufferSize
}
return size
}

// RunServer runs the server of the region syncer.
// regionNotifier is used to get the changed regions.
func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *core.RegionInfo) {
Expand Down
20 changes: 20 additions & 0 deletions pkg/syncer/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ import (
"github.com/tikv/pd/pkg/utils/testutil"
)

func TestHistoryBufferSizeFromMemory(t *testing.T) {
testCases := []struct {
name string
totalMemory uint64
expected int
}{
{name: "zero-memory", totalMemory: 0, expected: defaultHistoryBufferSize},
{name: "below-minimum", totalMemory: historyBufferMemoryStep / 2, expected: defaultHistoryBufferSize},
{name: "base-step", totalMemory: historyBufferMemoryStep, expected: defaultHistoryBufferSize},
{name: "scaled-linearly", totalMemory: historyBufferMemoryStep * 6 / 4, expected: 15000},
{name: "max-clamped", totalMemory: historyBufferMemoryStep * 64 / 4, expected: maxHistoryBufferSize},
}

for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
require.Equal(t, testCase.expected, historyBufferSizeFromMemory(testCase.totalMemory))
})
}
}

func TestSyncExitsWhenRegionSyncerStops(t *testing.T) {
re := require.New(t)
tempDir := t.TempDir()
Expand Down
Loading