diff --git a/metrics/grafana/pd.json b/metrics/grafana/pd.json index 164d8568917..30f3acd7c15 100644 --- a/metrics/grafana/pd.json +++ b/metrics/grafana/pd.json @@ -13672,7 +13672,7 @@ "steppedLine": false, "targets": [ { - "expr": "pd_region_syncer_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"sync_index\"}", + "expr": "pd_region_syncer_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=~\".*pd.*\", type=\"sync_index\"}", "format": "time_series", "intervalFactor": 2, "legendFormat": "{{instance}}", @@ -13755,7 +13755,7 @@ "steppedLine": false, "targets": [ { - "expr": "pd_region_syncer_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", type=\"last_index\"}", + "expr": "pd_region_syncer_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=~\".*pd.*\", type=\"last_index\"}", "format": "time_series", "intervalFactor": 2, "legendFormat": "{{instance}}", @@ -13798,6 +13798,104 @@ "show": true } ] + }, + { + "aliasColors": {}, + "bars": false, + "dashLength": 10, + "dashes": false, + "datasource": "${DS_TEST-CLUSTER}", + "fill": 1, + "gridPos": { + "h": 7, + "w": 24, + "x": 0, + "y": 220 + }, + "id": 1503, + "legend": { + "alignAsTable": true, + "avg": false, + "current": false, + "max": false, + "min": false, + "rightSide": true, + "show": true, + "total": false, + "values": false + }, + "lines": true, + "linewidth": 1, + "links": [], + "nullPointMode": "null", + "percentage": false, + "pointradius": 5, + "points": false, + "renderer": "flot", + "seriesOverrides": [ + { + "alias": "history buffer size", + "color": "#E24D42", + "dashes": true, + "fill": 0, + "linewidth": 3 + } + ], + "spaceLength": 10, + "stack": false, + "steppedLine": false, + "targets": [ + { + "expr": "max(pd_region_syncer_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=~\".*pd.*\", type=\"last_index\"}) - min(pd_region_syncer_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=~\".*pd.*\", type=\"sync_index\"})", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "index gap", + "refId": "A" + }, + { + "expr": "max(pd_region_syncer_status{k8s_cluster=\"$k8s_cluster\", tidb_cluster=\"$tidb_cluster\", job=~\".*pd.*\", type=\"history_buffer_size\"})", + "format": "time_series", + "intervalFactor": 2, + "legendFormat": "history buffer size", + "refId": "B" + } + ], + "thresholds": [], + "timeFrom": null, + "timeRegions": [], + "timeShift": null, + "title": "History index gap", + "tooltip": { + "shared": true, + "sort": 0, + "value_type": "individual" + }, + "type": "graph", + "xaxis": { + "buckets": null, + "mode": "time", + "name": null, + "show": true, + "values": [] + }, + "yaxes": [ + { + "format": "none", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + }, + { + "format": "short", + "label": null, + "logBase": 1, + "max": null, + "min": null, + "show": true + } + ] } ], "repeat": null, diff --git a/pkg/syncer/history_buffer.go b/pkg/syncer/history_buffer.go index a066762405f..4e5c57260f2 100644 --- a/pkg/syncer/history_buffer.go +++ b/pkg/syncer/history_buffer.go @@ -34,9 +34,10 @@ const ( var ( // WithLabelValues is a heavy operation, define variable to avoid call it every time. - syncIndexGauge = regionSyncerStatus.WithLabelValues("sync_index") - firstIndexGauge = regionSyncerStatus.WithLabelValues("first_index") - lastIndexGauge = regionSyncerStatus.WithLabelValues("last_index") + syncIndexGauge = regionSyncerStatus.WithLabelValues("sync_index") + firstIndexGauge = regionSyncerStatus.WithLabelValues("first_index") + lastIndexGauge = regionSyncerStatus.WithLabelValues("last_index") + historyBufferSizeGauge = regionSyncerStatus.WithLabelValues("history_buffer_size") ) type historyBuffer struct { @@ -56,6 +57,7 @@ func newHistoryBuffer(size int, kv kv.Base) *historyBuffer { if size < 2 { size = 2 } + historyBufferSizeGauge.Set(float64(size - 1)) records := make([]*core.RegionInfo, size) h := &historyBuffer{ records: records, @@ -64,6 +66,7 @@ func newHistoryBuffer(size int, kv kv.Base) *historyBuffer { flushCount: defaultFlushCount, } h.reload() + h.updateHistoryIndexMetrics() return h } @@ -96,6 +99,7 @@ func (h *historyBuffer) record(r *core.RegionInfo) { h.head = (h.head + 1) % h.size } h.index++ + h.updateHistoryIndexMetrics() h.flushCount-- if h.flushCount <= 0 { h.persist() @@ -126,6 +130,7 @@ func (h *historyBuffer) resetWithIndex(index uint64) { h.head = 0 h.tail = 0 h.flushCount = defaultFlushCount + h.updateHistoryIndexMetrics() } func (h *historyBuffer) getNextIndex() uint64 { @@ -159,10 +164,13 @@ func (h *historyBuffer) reload() { } func (h *historyBuffer) persist() { - firstIndexGauge.Set(float64(h.firstIndex())) - lastIndexGauge.Set(float64(h.nextIndex())) err := h.kv.Save(historyKey, strconv.FormatUint(h.nextIndex(), 10)) if err != nil { log.Warn("persist history index failed", zap.Uint64("persist-index", h.nextIndex()), errs.ZapError(err)) } } + +func (h *historyBuffer) updateHistoryIndexMetrics() { + firstIndexGauge.Set(float64(h.firstIndex())) + lastIndexGauge.Set(float64(h.nextIndex())) +} diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 415e0d62985..4fe7438f72e 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -34,6 +34,7 @@ import ( "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/memory" "github.com/tikv/pd/pkg/ratelimit" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/storage/kv" @@ -49,6 +50,8 @@ const ( maxSyncRegionBatchSize = 1000 syncerKeepAliveInterval = 10 * time.Second defaultHistoryBufferSize = 10000 + maxHistoryBufferSize = 100000 + historyBufferMemoryStep = 4 * 1024 * 1024 * 1024 ) // ClientStream is the client side of the region syncer. @@ -118,9 +121,10 @@ func NewRegionSyncer(s Server) *RegionSyncer { if regionStorage == nil { return nil } + historyBufferSize := historyBufferSizeFromMemory(memory.GetMemTotalIgnoreErr()) syncer := &RegionSyncer{ server: s, - history: newHistoryBuffer(defaultHistoryBufferSize, regionStorage.(kv.Base)), + history: newHistoryBuffer(historyBufferSize, regionStorage.(kv.Base)), limit: ratelimit.NewRateLimiter(defaultBucketRate, defaultBucketCapacity), sendTimeout: syncerKeepAliveInterval, tlsConfig: s.GetTLSConfig(), @@ -129,6 +133,20 @@ func NewRegionSyncer(s Server) *RegionSyncer { return syncer } +func historyBufferSizeFromMemory(totalMemory uint64) int { + if totalMemory == 0 { + return defaultHistoryBufferSize + } + size := int(uint64(defaultHistoryBufferSize) * totalMemory / historyBufferMemoryStep) + if size < defaultHistoryBufferSize { + return defaultHistoryBufferSize + } + if size > maxHistoryBufferSize { + return maxHistoryBufferSize + } + return size +} + // RunServer runs the server of the region syncer. // regionNotifier is used to get the changed regions. func (s *RegionSyncer) RunServer(ctx context.Context, regionNotifier <-chan *core.RegionInfo) { diff --git a/pkg/syncer/server_test.go b/pkg/syncer/server_test.go index 2b5398e841f..06b79a3e6ba 100644 --- a/pkg/syncer/server_test.go +++ b/pkg/syncer/server_test.go @@ -36,6 +36,26 @@ import ( "github.com/tikv/pd/pkg/utils/testutil" ) +func TestHistoryBufferSizeFromMemory(t *testing.T) { + testCases := []struct { + name string + totalMemory uint64 + expected int + }{ + {name: "zero-memory", totalMemory: 0, expected: defaultHistoryBufferSize}, + {name: "below-minimum", totalMemory: historyBufferMemoryStep / 2, expected: defaultHistoryBufferSize}, + {name: "base-step", totalMemory: historyBufferMemoryStep, expected: defaultHistoryBufferSize}, + {name: "scaled-linearly", totalMemory: historyBufferMemoryStep * 6 / 4, expected: 15000}, + {name: "max-clamped", totalMemory: historyBufferMemoryStep * 64 / 4, expected: maxHistoryBufferSize}, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + require.Equal(t, testCase.expected, historyBufferSizeFromMemory(testCase.totalMemory)) + }) + } +} + func TestSyncExitsWhenRegionSyncerStops(t *testing.T) { re := require.New(t) tempDir := t.TempDir()