syncer: trigger full sync on history gap#10670
Conversation
|
Skipping CI for Draft Pull Request. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughLeader detects unrecoverable history gaps and falls back to streaming a full region snapshot; followers track history vs full-sync phases, reset cache/storage on transition, persist rebuilt regions, and resume streaming with consistent region state. ChangesRegion syncer history-overflow recovery
sequenceDiagram
participant Leader as Leader (history buffer)
participant Server as PD Leader / sync server
participant Client as PD Follower / RegionSyncer
participant Storage as Follower region storage & cache
Leader->>Server: follower StartIndex request
alt StartIndex within retained history
Server->>Client: send incremental history batch (StartIndex>0)
Client->>Storage: apply incremental updates
else StartIndex outside history or StartIndex==0
Server->>Client: send full-sync initial batch (StartIndex=0)
Client->>Client: detect history→full transition (regionSyncState)
Client->>Storage: reset cache and flush/clear region storage
Server->>Client: stream full snapshot and remaining regions
Client->>Storage: persist snapshot and replay catch-up history
Server->>Client: send completion (StartIndex = catchUpIndex)
Client->>Client: clear fullSyncing/syncingHistory and enable streaming
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related issues
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
8fb3d34 to
8771077
Compare
8771077 to
457178c
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
pkg/syncer/server_test.go (1)
40-45: ⚡ Quick winSnapshot responses in the test double to avoid pointer aliasing across sends.
Sendappends the original pointer; if the caller reuses/mutates the same proto between sends, earlier assertions can read mutated data. Store a cloned message inresponses.Proposed patch
import ( "context" "testing" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "google.golang.org/protobuf/proto" @@ func (s *testSyncRegionsServer) Send(resp *pdpb.SyncRegionResponse) error { - s.responses = append(s.responses, resp) + if resp == nil { + s.responses = append(s.responses, nil) + return nil + } + s.responses = append(s.responses, proto.Clone(resp).(*pdpb.SyncRegionResponse)) if s.onSend != nil { s.onSend(resp) } return nil }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/syncer/server_test.go` around lines 40 - 45, testSyncRegionsServer.Send currently appends the incoming *pdpb.SyncRegionResponse pointer directly into responses which allows later mutations to affect earlier snapshots; instead, deep-copy (clone) the resp before storing it and invoke onSend with the cloned copy as well. Update testSyncRegionsServer.Send to create a clone of the *pdpb.SyncRegionResponse (using the appropriate protobuf clone utility for your proto package), append that clone to the responses slice, and pass the clone to s.onSend if set.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@pkg/syncer/server_test.go`:
- Around line 40-45: testSyncRegionsServer.Send currently appends the incoming
*pdpb.SyncRegionResponse pointer directly into responses which allows later
mutations to affect earlier snapshots; instead, deep-copy (clone) the resp
before storing it and invoke onSend with the cloned copy as well. Update
testSyncRegionsServer.Send to create a clone of the *pdpb.SyncRegionResponse
(using the appropriate protobuf clone utility for your proto package), append
that clone to the responses slice, and pass the clone to s.onSend if set.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 32ad935d-e2a3-46b9-94c9-7e8f0ad8eb7d
📥 Commits
Reviewing files that changed from the base of the PR and between 87710779c704a48e87a472e1031844823c7d3475 and 457178c.
📒 Files selected for processing (6)
pkg/mcs/router/server/sync.gopkg/syncer/client.gopkg/syncer/client_test.gopkg/syncer/history_buffer.gopkg/syncer/server.gopkg/syncer/server_test.go
🚧 Files skipped from review as they are similar to previous changes (5)
- pkg/syncer/history_buffer.go
- pkg/mcs/router/server/sync.go
- pkg/syncer/client_test.go
- pkg/syncer/server.go
- pkg/syncer/client.go
Signed-off-by: okjiang <[email protected]>
457178c to
6b56372
Compare
|
@okJiang: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
pkg/syncer/server.go (1)
612-625:⚠️ Potential issue | 🟠 Major | ⚡ Quick win
closeAllClientcan still hang shutdown on a blocked send.Line 623 still does a synchronous gRPC
Sendaftersender.close(). ClosingdonewakesSync, but it does not interrupt a blocked transport write, soRunServercan wait forever incloseAllClient. Please reuse the bounded-send path here, or make the close notification best-effort and non-blocking.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@pkg/syncer/server.go` around lines 612 - 625, The loop in closeAllClient still performs a blocking gRPC send (sender.stream.Send) after calling sender.close(), which can hang shutdown; replace the direct synchronous Send with the existing bounded/non-blocking send path used elsewhere (e.g., the sender.enqueue/sendCh/trySend helper or the sender.asyncSend method) so the close notification is best-effort and does not block RunServer; specifically, stop calling sender.stream.Send directly in closeAllClient and instead push the close response into the sender's bounded channel or use its non-blocking try-send helper (or spawn a goroutine with a select+timeout fallback) to ensure shutdown cannot be blocked by a stuck transport write.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@pkg/syncer/server.go`:
- Around line 455-477: You are holding s.mu across network I/O
(syncHistoryRecords and stream.Send), which blocks broadcast() and other
readers; change the loop so you only lock to read/validate history and make a
local copy of the records, then unlock before calling syncHistoryRecords or
stream.Send. Specifically, in the loop around
s.history.recordsFrom(catchUpIndex) only hold s.mu to call recordsFrom, check
the overflow condition against s.history.getFirstIndex(), and make a copy of the
returned slice; then release s.mu and call s.syncHistoryRecords(catchUpIndex,
copiedRecords, stream); after that, re-acquire the lock to advance/inspect
shared state as needed and continue. Also move the final stream.Send of resp
outside the s.mu lock so no gRPC send occurs while s.mu is held (this will avoid
blocking broadcast() and RLock() callers).
- Around line 433-443: The code currently recurses by calling
s.syncFullRegions(ctx, name, stream) when catchUpIndex <
s.history.getFirstIndex(), which can cause unbounded goroutine/stack growth
under churn; instead refactor syncFullRegions to use an outer retry loop: remove
the recursive call and continue the outer for/while loop that surrounds the
chunk-fetching logic (the loop that uses s.history.recordsFrom(catchUpIndex)),
resetting any per-attempt state (e.g. catchUpIndex and any temp buffers) as
needed so the function retries from the top without recursion; ensure symbols
mentioned (syncFullRegions, catchUpIndex, s.history.recordsFrom,
s.history.getFirstIndex, stream, ctx, name) are used to locate and update the
logic.
---
Outside diff comments:
In `@pkg/syncer/server.go`:
- Around line 612-625: The loop in closeAllClient still performs a blocking gRPC
send (sender.stream.Send) after calling sender.close(), which can hang shutdown;
replace the direct synchronous Send with the existing bounded/non-blocking send
path used elsewhere (e.g., the sender.enqueue/sendCh/trySend helper or the
sender.asyncSend method) so the close notification is best-effort and does not
block RunServer; specifically, stop calling sender.stream.Send directly in
closeAllClient and instead push the close response into the sender's bounded
channel or use its non-blocking try-send helper (or spawn a goroutine with a
select+timeout fallback) to ensure shutdown cannot be blocked by a stuck
transport write.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e8bc49a1-7d9d-48db-8068-ee60f4d46585
📒 Files selected for processing (6)
pkg/mcs/router/server/sync.gopkg/syncer/client.gopkg/syncer/client_test.gopkg/syncer/history_buffer.gopkg/syncer/server.gopkg/syncer/server_test.go
| for { | ||
| records := s.history.recordsFrom(catchUpIndex) | ||
| if len(records) == 0 { | ||
| if catchUpIndex < s.history.getFirstIndex() { | ||
| log.Warn("region history buffer overflow during full synchronization, restart full synchronization", | ||
| zap.String("requested-server", name), | ||
| zap.String("server", s.server.Name()), | ||
| zap.Uint64("catch-up-index", catchUpIndex), | ||
| zap.Uint64("first-index", s.history.getFirstIndex())) | ||
| return s.syncFullRegions(ctx, name, stream) | ||
| } |
There was a problem hiding this comment.
Replace recursive full-sync retries with an outer loop.
Line 442 re-enters syncFullRegions recursively when catch-up overflows. Under sustained churn this path can repeat many times, so recovery starts growing the goroutine stack instead of simply retrying.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/syncer/server.go` around lines 433 - 443, The code currently recurses by
calling s.syncFullRegions(ctx, name, stream) when catchUpIndex <
s.history.getFirstIndex(), which can cause unbounded goroutine/stack growth
under churn; instead refactor syncFullRegions to use an outer retry loop: remove
the recursive call and continue the outer for/while loop that surrounds the
chunk-fetching logic (the loop that uses s.history.recordsFrom(catchUpIndex)),
resetting any per-attempt state (e.g. catchUpIndex and any temp buffers) as
needed so the function retries from the top without recursion; ensure symbols
mentioned (syncFullRegions, catchUpIndex, s.history.recordsFrom,
s.history.getFirstIndex, stream, ctx, name) are used to locate and update the
logic.
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
| for { | ||
| records := s.history.recordsFrom(catchUpIndex) | ||
| if len(records) == 0 { | ||
| if catchUpIndex < s.history.getFirstIndex() { | ||
| return nil, errors.Errorf("region history buffer overflow during full sync catch-up, catch-up-index %d, first-index %d", catchUpIndex, s.history.getFirstIndex()) | ||
| } | ||
| break | ||
| } | ||
| if err := s.syncHistoryRecords(catchUpIndex, records, stream); err != nil { | ||
| return nil, err | ||
| } | ||
| catchUpIndex += uint64(len(records)) | ||
| } | ||
| resp := &pdpb.SyncRegionResponse{ | ||
| Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()}, | ||
| StartIndex: catchUpIndex, | ||
| } | ||
| if err := stream.Send(resp); err != nil { | ||
| log.Error("failed to send sync region response", errs.ZapError(errs.ErrGRPCSend, err)) | ||
| return nil, err | ||
| } |
There was a problem hiding this comment.
Avoid doing gRPC sends while holding s.mu.
From Line 455, the global stream lock stays held while syncHistoryRecords and the completion stream.Send perform network I/O. If that follower stalls here, broadcast() blocks on RLock() and every other downstream stops receiving updates.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@pkg/syncer/server.go` around lines 455 - 477, You are holding s.mu across
network I/O (syncHistoryRecords and stream.Send), which blocks broadcast() and
other readers; change the loop so you only lock to read/validate history and
make a local copy of the records, then unlock before calling syncHistoryRecords
or stream.Send. Specifically, in the loop around
s.history.recordsFrom(catchUpIndex) only hold s.mu to call recordsFrom, check
the overflow condition against s.history.getFirstIndex(), and make a copy of the
returned slice; then release s.mu and call s.syncHistoryRecords(catchUpIndex,
copiedRecords, stream); after that, re-acquire the lock to advance/inspect
shared state as needed and continue. Also move the final stream.Send of resp
outside the s.mu lock so no gRPC send occurs while s.mu is held (this will avoid
blocking broadcast() and RLock() callers).
What problem does this PR solve?
Issue Number: Close #10668
When a PD follower requests a region sync index that is no longer available in the leader's history buffer, the leader used to return no records and keep the stream alive. The follower could then keep stale region metadata in memory and in local region storage.
What is changed and how does it work?
Check List
Tests
Release note
Summary by CodeRabbit
Bug Fixes
Refactor
Tests