Skip to content
20 changes: 18 additions & 2 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ func (s *RegionSyncer) reset() {
s.mu.clientCancel, s.mu.clientCtx = nil, nil
}

// ResetHistoryIndex resets and persists the next region sync history index.
func (s *RegionSyncer) ResetHistoryIndex(index uint64) {
s.history.resetWithIndexAndPersist(index)
}

func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (ClientStream, error) {
cli := pdpb.NewPDClient(conn)
syncStream, err := cli.SyncRegions(ctx)
Expand All @@ -85,7 +90,7 @@ func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (C

var regionGuide = core.GenerateRegionGuideFunc(false)

func (s *RegionSyncer) handleRegionSyncResponse(ctx context.Context, resp *pdpb.SyncRegionResponse, bc *core.BasicCluster, regionStorage storage.Storage) {
func (s *RegionSyncer) handleRegionSyncResponse(ctx context.Context, resp *pdpb.SyncRegionResponse, bc *core.BasicCluster, regionStorage storage.Storage, fullSyncing *bool) {
if s.history.getNextIndex() != resp.GetStartIndex() {
log.Warn("server sync index not match the leader",
zap.String("server", s.server.Name()),
Expand All @@ -99,6 +104,9 @@ func (s *RegionSyncer) handleRegionSyncResponse(ctx context.Context, resp *pdpb.
regions := resp.GetRegions()
buckets := resp.GetBuckets()
regionLeaders := resp.GetRegionLeaders()
if !s.IsRunning() && resp.GetStartIndex() == 0 && len(regions) > 0 {
*fullSyncing = true
}
hasStats := len(stats) == len(regions)
hasBuckets := len(buckets) == len(regions)
for i, r := range regions {
Expand Down Expand Up @@ -146,6 +154,13 @@ func (s *RegionSyncer) handleRegionSyncResponse(ctx context.Context, resp *pdpb.
_ = regionStorage.DeleteRegion(old.GetMeta())
}
}
if *fullSyncing {
if len(regions) == 0 {
*fullSyncing = false
s.streamingRunning.Store(true)
}
return
}
// mark the client as running status when it finished the first history region sync.
s.streamingRunning.Store(true)
}
Expand Down Expand Up @@ -233,6 +248,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
continue
}
log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.getNextIndex()))
fullSyncing := false
for {
resp, err := stream.Recv()
if err == io.EOF {
Expand Down Expand Up @@ -260,7 +276,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
}
break
}
s.handleRegionSyncResponse(ctx, resp, bc, regionStorage)
s.handleRegionSyncResponse(ctx, resp, bc, regionStorage, &fullSyncing)
}
}
}()
Expand Down
17 changes: 17 additions & 0 deletions pkg/syncer/history_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,35 @@ func (h *historyBuffer) recordsFrom(index uint64) []*core.RegionInfo {
func (h *historyBuffer) resetWithIndex(index uint64) {
h.Lock()
defer h.Unlock()
h.resetWithIndexLocked(index)
}

func (h *historyBuffer) resetWithIndexLocked(index uint64) {
h.index = index
h.head = 0
h.tail = 0
h.flushCount = defaultFlushCount
}

func (h *historyBuffer) resetWithIndexAndPersist(index uint64) {
h.Lock()
defer h.Unlock()
h.resetWithIndexLocked(index)
h.persist()
}

func (h *historyBuffer) getNextIndex() uint64 {
h.RLock()
defer h.RUnlock()
return h.index
}

func (h *historyBuffer) getFirstIndex() uint64 {
h.RLock()
defer h.RUnlock()
return h.firstIndex()
}

func (h *historyBuffer) get(index uint64) *core.RegionInfo {
h.RLock()
defer h.RUnlock()
Expand Down
98 changes: 80 additions & 18 deletions pkg/syncer/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,14 @@ func (s *RegionSyncer) Sync(ctx context.Context, stream pdpb.PD_SyncRegionsServe
zap.String("requested-server", request.GetMember().GetName()),
zap.String("url", request.GetMember().GetClientUrls()[0]))

err = s.syncHistoryRegion(ctx, request, stream)
syncStream, err := s.syncHistoryRegion(ctx, request, stream)
if err != nil {
return err
}
name := request.GetMember().GetName()
syncStream := s.bindStream(name, stream)
if syncStream == nil {
syncStream = s.bindStream(name, stream)
}
select {
case <-ctx.Done():
s.unbindStream(name, syncStream)
Expand Down Expand Up @@ -284,9 +286,12 @@ func recvSyncRegionRequest(ctx context.Context, stream pdpb.PD_SyncRegionsServer
}
}

func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.SyncRegionRequest, stream pdpb.PD_SyncRegionsServer) error {
func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.SyncRegionRequest, stream pdpb.PD_SyncRegionsServer) (*regionSyncStream, error) {
startIndex := request.GetStartIndex()
name := request.GetMember().GetName()
if startIndex == 0 {
return s.syncFullRegions(ctx, name, stream)
}
records := s.history.recordsFrom(startIndex)
if len(records) == 0 {
if s.history.getNextIndex() == startIndex {
Expand All @@ -301,21 +306,17 @@ func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.Sync
RegionLeaders: nil,
Buckets: nil,
}
return stream.Send(resp)
}
// do full synchronization
if startIndex == 0 {
return s.syncFullRegions(ctx, name, stream)
return nil, stream.Send(resp)
}
log.Warn("no history regions from index, the leader may be restarted", zap.Uint64("index", startIndex))
return nil
log.Warn("no history regions from index, fall back to full sync", zap.Uint64("index", startIndex))
return s.syncFullRegions(ctx, name, stream)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this falls back to full sync, the follower can mark the syncer as running after the first response batch. For large clusters, follower reads may see a partial region cache before full sync finishes. Can we add an explicit completion signal or keep follower local reads disabled until the full sync is done?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in b3c130e. Full sync now sends an empty completion response after all full-sync batches, and the follower client keeps local follower reads disabled until that completion response is received.

Coverage added in TestClientWaitsForFullSyncCompletionBeforeRunning and the existing fallback full sync test. Verified with:

  • make gotest GOTEST_ARGS="./pkg/syncer -run 'TestSyncFallsBackToFullSyncWhenHistoryMissing|TestClientWaitsForFullSyncCompletionBeforeRunning' -count=1"
  • make gotest GOTEST_ARGS="./tests/server/api -tags without_dashboard -run TestFollowerRegionResetCacheWithNoForward -count=1"

}
log.Info("sync the history regions with server",
zap.String("server", name),
zap.Uint64("from-index", startIndex),
zap.Uint64("last-index", s.history.getNextIndex()),
zap.Int("records-length", len(records)))
return s.syncHistoryRecords(startIndex, records, stream)
return nil, s.syncHistoryRecords(startIndex, records, stream)
}

func (*RegionSyncer) syncHistoryRecords(startIndex uint64, records []*core.RegionInfo, stream pdpb.PD_SyncRegionsServer) error {
Expand Down Expand Up @@ -348,10 +349,21 @@ func (*RegionSyncer) syncHistoryRecords(startIndex uint64, records []*core.Regio
return stream.Send(resp)
}

func (s *RegionSyncer) syncFullRegions(ctx context.Context, name string, stream pdpb.PD_SyncRegionsServer) error {
func (s *RegionSyncer) syncFullRegions(ctx context.Context, name string, stream pdpb.PD_SyncRegionsServer) (*regionSyncStream, error) {
catchUpIndex := s.history.getNextIndex()
regions := s.server.GetRegions()
lastIndex := 0
start := time.Now()
if len(regions) == 0 {
resp := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()},
StartIndex: 0,
}
if err := stream.Send(resp); err != nil {
log.Warn("failed to send sync region response", errs.ZapError(errs.ErrGRPCSend, err))
return nil, err
}
}
lastIndex := 0
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the FollowerRegion begins a reset and the server calls Sync4Regions, the LastIndex becomes inconsistent with the actual index in the server's history buffer.

How should we handle this?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the broader recovery problem covered by #10670. To make LastIndex fully consistent after a full snapshot, the server needs to catch up history generated during full sync and then send a final empty response with the catch-up index; the client also needs to keep streamingRunning false while it is still in full-sync mode. I would keep that full index-recovery flow in #10670 rather than expanding this reset-cache PR further.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't fix this issue, won't there be a problem once this PR is merged?

Even after the region cache is reset, subsequent region synchronization will still have issues. Isn't this effectively a bug?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 2a3f13d. The leader now records the history index before full sync, sends any region history generated while the snapshot is being sent, and only completes the full sync after the catch-up records are sent and the downstream stream is bound.

Verified with:

  • git diff --check
  • make gotest GOTEST_ARGS="./pkg/syncer -count=1"

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the local History Buffer overflows during a full data synchronization? Specifically, what should be done if the number of changes during this period exceeds the length of the local buffer?

Copy link
Copy Markdown
Member Author

@okJiang okJiang May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covered in b47a9eb.

If the leader loses the catch-up history that was recorded at the start of a full sync, it must not bind the follower stream with a partial cache. The current path detects catchUpIndex < history.firstIndex() and restarts full sync from a fresh snapshot, so the follower only marks local reads as running after it receives the final completion response from a complete sync attempt.

I added TestFullSyncRestartsWhenHistoryBufferOverflowsDuringCatchUp with a one-entry history buffer. The test forces history overflow while the first full-sync response is blocked, then verifies that the leader sends a restarted full snapshot and only then sends the completion response.

Verified with:

  • git diff --check
  • make gotest GOTEST_ARGS='./pkg/syncer -run "TestSyncFallsBackToFullSyncWhenHistoryMissing|TestFullSyncRestartsWhenHistoryBufferOverflowsDuringCatchUp|TestClientWaitsForFullSyncCompletionBeforeRunning" -count=1'

metas := make([]*metapb.Region, 0, maxSyncRegionBatchSize)
stats := make([]*pdpb.RegionStat, 0, maxSyncRegionBatchSize)
leaders := make([]*metapb.Peer, 0, maxSyncRegionBatchSize)
Expand All @@ -363,7 +375,7 @@ func (s *RegionSyncer) syncFullRegions(ctx context.Context, name string, stream
failpoint.Inject("noFastExitSync", func() {
failpoint.Goto("doSync")
})
return nil
return nil, nil
default:
}
failpoint.Label("doSync")
Expand Down Expand Up @@ -392,33 +404,83 @@ func (s *RegionSyncer) syncFullRegions(ctx context.Context, name string, stream
}
if err := s.limit.WaitN(ctx, resp.Size()); err != nil {
log.Error("failed to wait rate limit", errs.ZapError(err))
return err
return nil, err
}
lastIndex += len(metas)
if err := stream.Send(resp); err != nil {
log.Error("failed to send sync region response", errs.ZapError(errs.ErrGRPCSend, err))
return err
return nil, err
}
metas = metas[:0]
stats = stats[:0]
leaders = leaders[:0]
buckets = buckets[:0]
}
for {
records := s.history.recordsFrom(catchUpIndex)
if len(records) == 0 {
if catchUpIndex < s.history.getFirstIndex() {
log.Warn("region history buffer overflow during full synchronization, restart full synchronization",
zap.String("requested-server", name),
zap.String("server", s.server.Name()),
zap.Uint64("catch-up-index", catchUpIndex),
zap.Uint64("first-index", s.history.getFirstIndex()))
return s.syncFullRegions(ctx, name, stream)
}
if catchUpIndex == s.history.getNextIndex() {
break
}
continue
}
if err := s.syncHistoryRecords(catchUpIndex, records, stream); err != nil {
return nil, err
}
catchUpIndex += uint64(len(records))
}
syncStream := newRegionSyncStream(stream)
s.mu.Lock()
defer s.mu.Unlock()
for {
records := s.history.recordsFrom(catchUpIndex)
if len(records) == 0 {
if catchUpIndex < s.history.getFirstIndex() {
return nil, errors.Errorf("region history buffer overflow during full sync catch-up, catch-up-index %d, first-index %d", catchUpIndex, s.history.getFirstIndex())
}
break
}
if err := s.syncHistoryRecords(catchUpIndex, records, stream); err != nil {
return nil, err
}
catchUpIndex += uint64(len(records))
}
log.Info("requested server has completed full synchronization with server",
zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Duration("cost", time.Since(start)))
return nil
resp := &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()},
StartIndex: catchUpIndex,
}
if err := stream.Send(resp); err != nil {
log.Warn("failed to send sync region completion response", errs.ZapError(errs.ErrGRPCSend, err))
return nil, err
}
s.bindStreamLocked(name, syncStream)
return syncStream, nil
}

// bindStream binds the established server stream.
func (s *RegionSyncer) bindStream(name string, stream ServerStream) *regionSyncStream {
syncStream := newRegionSyncStream(stream)
s.mu.Lock()
defer s.mu.Unlock()
s.bindStreamLocked(name, syncStream)
return syncStream
}

func (s *RegionSyncer) bindStreamLocked(name string, syncStream *regionSyncStream) {
if oldStream := s.mu.streams[name]; oldStream != nil {
oldStream.close()
}
s.mu.streams[name] = syncStream
return syncStream
}

func (s *RegionSyncer) unbindStream(name string, stream *regionSyncStream) {
Expand Down
Loading
Loading