diff --git a/pkg/mcs/router/server/sync.go b/pkg/mcs/router/server/sync.go index 761f24b22d3..faa5205d8d5 100644 --- a/pkg/mcs/router/server/sync.go +++ b/pkg/mcs/router/server/sync.go @@ -281,6 +281,8 @@ func (s *RegionSyncer) sync(ctx context.Context, leaderAddr string) { continue } log.Info("server starts to synchronize with leader", zap.String("server", s.name), zap.String("leader", leaderAddr), zap.Uint64("request-index", s.nextSyncIndex)) + syncingHistory := true + fullSyncing := false for { resp, err := stream.Recv() failpoint.Inject("syncMetError", func() { @@ -310,6 +312,10 @@ func (s *RegionSyncer) sync(ctx context.Context, leaderAddr string) { s.triggerMembershipCheck() return } + if syncingHistory && resp.GetStartIndex() == 0 && s.nextSyncIndex != 0 { + bc.ResetRegionCache() + fullSyncing = true + } // client maybe loss some region info, need to reset the nextSyncIndex if s.nextSyncIndex != resp.GetStartIndex() { log.Warn("server sync index not match the leader", @@ -353,6 +359,12 @@ func (s *RegionSyncer) sync(ctx context.Context, leaderAddr string) { s.nextSyncIndex++ } } + if fullSyncing && len(regions) == 0 { + fullSyncing = false + syncingHistory = false + } else if syncingHistory && !fullSyncing { + syncingHistory = false + } } } } diff --git a/pkg/syncer/client.go b/pkg/syncer/client.go index 6f164b777ba..4656637903f 100644 --- a/pkg/syncer/client.go +++ b/pkg/syncer/client.go @@ -85,7 +85,43 @@ func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (C var regionGuide = core.GenerateRegionGuideFunc(false) -func (s *RegionSyncer) handleRegionSyncResponse(ctx context.Context, resp *pdpb.SyncRegionResponse, bc *core.BasicCluster, regionStorage storage.Storage) { +type regionSyncState struct { + syncingHistory bool + fullSyncing bool +} + +func (*RegionSyncer) resetRegionCacheAndStorage(ctx context.Context, bc *core.BasicCluster, regionStorage storage.Storage) error { + if err := regionStorage.Flush(); err != nil { + return err + } + for _, region := range bc.GetRegions() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + if err := regionStorage.DeleteRegion(region.GetMeta()); err != nil { + return err + } + } + bc.ResetRegionCache() + return nil +} + +func (s *RegionSyncer) handleRegionSyncResponse( + ctx context.Context, + resp *pdpb.SyncRegionResponse, + bc *core.BasicCluster, + regionStorage storage.Storage, + state *regionSyncState, +) error { + if state.syncingHistory && resp.GetStartIndex() == 0 && s.history.getNextIndex() != 0 { + s.streamingRunning.Store(false) + if err := s.resetRegionCacheAndStorage(ctx, bc, regionStorage); err != nil { + return err + } + state.fullSyncing = true + } if s.history.getNextIndex() != resp.GetStartIndex() { log.Warn("server sync index not match the leader", zap.String("server", s.server.Name()), @@ -146,8 +182,20 @@ func (s *RegionSyncer) handleRegionSyncResponse(ctx context.Context, resp *pdpb. _ = regionStorage.DeleteRegion(old.GetMeta()) } } + if state.fullSyncing && len(regions) == 0 { + if err := regionStorage.Flush(); err != nil { + return err + } + state.fullSyncing = false + state.syncingHistory = false + } else if state.syncingHistory && !state.fullSyncing { + state.syncingHistory = false + } // mark the client as running status when it finished the first history region sync. - s.streamingRunning.Store(true) + if !state.fullSyncing { + s.streamingRunning.Store(true) + } + return nil } // IsRunning returns whether the region syncer client is running. @@ -233,6 +281,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { continue } log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.getNextIndex())) + syncState := ®ionSyncState{syncingHistory: true} for { resp, err := stream.Recv() if err == io.EOF { @@ -260,7 +309,16 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { } break } - s.handleRegionSyncResponse(ctx, resp, bc, regionStorage) + if err = s.handleRegionSyncResponse(ctx, resp, bc, regionStorage, syncState); err != nil { + log.Warn("failed to handle region sync response", + zap.String("server", s.server.Name()), + zap.String("leader", s.server.GetLeader().GetName()), + errs.ZapError(err)) + if err = stream.CloseSend(); err != nil { + log.Warn("failed to terminate client stream", errs.ZapError(errs.ErrGRPCCloseSend, err)) + } + break + } } } }() diff --git a/pkg/syncer/client_test.go b/pkg/syncer/client_test.go index 2a9a181d729..b0be1256d9c 100644 --- a/pkg/syncer/client_test.go +++ b/pkg/syncer/client_test.go @@ -26,11 +26,13 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockserver" "github.com/tikv/pd/pkg/storage" "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/testutil" ) @@ -94,3 +96,127 @@ func TestErrorCode(t *testing.T) { re.True(ok) re.Equal(codes.Canceled, ev.Code()) } + +func TestResetRegionCacheAndStorage(t *testing.T) { + re := require.New(t) + tempDir := t.TempDir() + rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil) + re.NoError(err) + defer func() { + re.NoError(rs.Close()) + }() + storageWithRegionStorage := storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs) + storage.TrySwitchRegionStorage(storageWithRegionStorage, true) + bc := core.NewBasicCluster() + server := mockserver.NewMockServer( + context.Background(), + nil, + nil, + storageWithRegionStorage, + bc, + ) + for i := range 2 { + region := &metapb.Region{ + Id: uint64(i + 1), + StartKey: []byte{byte(i)}, + EndKey: []byte{byte(i + 1)}, + } + re.NoError(storageWithRegionStorage.SaveRegion(region)) + bc.PutRegion(core.NewRegionInfo(region, nil)) + } + re.NoError(storageWithRegionStorage.Flush()) + + rc := NewRegionSyncer(server) + re.NoError(rc.resetRegionCacheAndStorage(context.Background(), bc, storageWithRegionStorage)) + re.Empty(bc.GetRegions()) + for i := range 2 { + region := &metapb.Region{} + ok, err := storageWithRegionStorage.LoadRegion(uint64(i+1), region) + re.NoError(err) + re.False(ok) + } +} + +func TestHandleFullSyncResponseResetsStaleRegions(t *testing.T) { + re := require.New(t) + tempDir := t.TempDir() + rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil) + re.NoError(err) + defer func() { + re.NoError(rs.Close()) + }() + storageWithRegionStorage := storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs) + storage.TrySwitchRegionStorage(storageWithRegionStorage, true) + bc := core.NewBasicCluster() + server := mockserver.NewMockServer( + context.Background(), + nil, + nil, + storageWithRegionStorage, + bc, + ) + staleRegion1 := &metapb.Region{ + Id: 1, + StartKey: []byte{0}, + EndKey: []byte{1}, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + } + staleRegion2 := &metapb.Region{ + Id: 2, + StartKey: []byte{1}, + EndKey: []byte{2}, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + } + for _, region := range []*metapb.Region{staleRegion1, staleRegion2} { + re.NoError(storageWithRegionStorage.SaveRegion(region)) + bc.PutRegion(core.NewRegionInfo(region, nil)) + } + + rc := NewRegionSyncer(server) + rc.history.resetWithIndex(10) + state := ®ionSyncState{syncingHistory: true} + latestRegion := &metapb.Region{ + Id: 1, + StartKey: []byte{0}, + EndKey: []byte{1}, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 2, + }, + } + re.NoError(rc.handleRegionSyncResponse(context.Background(), &pdpb.SyncRegionResponse{ + Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()}, + StartIndex: 0, + Regions: []*metapb.Region{latestRegion}, + }, bc, storageWithRegionStorage, state)) + re.False(rc.IsRunning()) + re.True(state.fullSyncing) + re.NoError(rc.handleRegionSyncResponse(context.Background(), &pdpb.SyncRegionResponse{ + Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()}, + StartIndex: 20, + }, bc, storageWithRegionStorage, state)) + + re.True(rc.IsRunning()) + re.False(state.fullSyncing) + re.False(state.syncingHistory) + re.Equal(uint64(20), rc.history.getNextIndex()) + historyIndex, err := rs.Load(historyKey) + re.NoError(err) + re.Equal("20", historyIndex) + re.Len(bc.GetRegions(), 1) + re.Equal(uint64(2), bc.GetRegion(1).GetRegionEpoch().GetVersion()) + storedRegion := &metapb.Region{} + ok, err := storageWithRegionStorage.LoadRegion(1, storedRegion) + re.NoError(err) + re.True(ok) + re.Equal(uint64(2), storedRegion.GetRegionEpoch().GetVersion()) + ok, err = storageWithRegionStorage.LoadRegion(2, storedRegion) + re.NoError(err) + re.False(ok) +} diff --git a/pkg/syncer/history_buffer.go b/pkg/syncer/history_buffer.go index a066762405f..80b6b6d0d3c 100644 --- a/pkg/syncer/history_buffer.go +++ b/pkg/syncer/history_buffer.go @@ -126,6 +126,7 @@ func (h *historyBuffer) resetWithIndex(index uint64) { h.head = 0 h.tail = 0 h.flushCount = defaultFlushCount + h.persist() } func (h *historyBuffer) getNextIndex() uint64 { @@ -134,6 +135,12 @@ func (h *historyBuffer) getNextIndex() uint64 { return h.index } +func (h *historyBuffer) getFirstIndex() uint64 { + h.RLock() + defer h.RUnlock() + return h.firstIndex() +} + func (h *historyBuffer) get(index uint64) *core.RegionInfo { h.RLock() defer h.RUnlock() diff --git a/pkg/syncer/server.go b/pkg/syncer/server.go index 415e0d62985..070c9e719a4 100644 --- a/pkg/syncer/server.go +++ b/pkg/syncer/server.go @@ -242,12 +242,14 @@ func (s *RegionSyncer) Sync(ctx context.Context, stream pdpb.PD_SyncRegionsServe zap.String("requested-server", request.GetMember().GetName()), zap.String("url", request.GetMember().GetClientUrls()[0])) - err = s.syncHistoryRegion(ctx, request, stream) + syncStream, err := s.syncHistoryRegion(ctx, request, stream) if err != nil { return err } name := request.GetMember().GetName() - syncStream := s.bindStream(name, stream) + if syncStream == nil { + syncStream = s.bindStream(name, stream) + } select { case <-ctx.Done(): s.unbindStream(name, syncStream) @@ -284,12 +286,29 @@ func recvSyncRegionRequest(ctx context.Context, stream pdpb.PD_SyncRegionsServer } } -func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.SyncRegionRequest, stream pdpb.PD_SyncRegionsServer) error { +func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.SyncRegionRequest, stream pdpb.PD_SyncRegionsServer) (*regionSyncStream, error) { startIndex := request.GetStartIndex() name := request.GetMember().GetName() + if startIndex == 0 { + log.Warn("requested server needs full synchronization", + zap.String("requested-server", name), + zap.String("server", s.server.Name())) + return s.syncFullRegions(ctx, name, stream) + } records := s.history.recordsFrom(startIndex) if len(records) == 0 { - if s.history.getNextIndex() == startIndex { + firstIndex := s.history.getFirstIndex() + nextIndex := s.history.getNextIndex() + if startIndex < firstIndex || startIndex > nextIndex { + log.Warn("requested server cannot catch up with history buffer, trigger full synchronization", + zap.String("requested-server", name), + zap.String("server", s.server.Name()), + zap.Uint64("request-index", startIndex), + zap.Uint64("first-index", firstIndex), + zap.Uint64("last-index", nextIndex)) + return s.syncFullRegions(ctx, name, stream) + } + if nextIndex == startIndex { log.Info("requested server has already in sync with server", zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Uint64("last-index", startIndex)) // still send a response to follower to show the history region sync. @@ -301,21 +320,17 @@ func (s *RegionSyncer) syncHistoryRegion(ctx context.Context, request *pdpb.Sync RegionLeaders: nil, Buckets: nil, } - return stream.Send(resp) - } - // do full synchronization - if startIndex == 0 { - return s.syncFullRegions(ctx, name, stream) + return nil, stream.Send(resp) } log.Warn("no history regions from index, the leader may be restarted", zap.Uint64("index", startIndex)) - return nil + return nil, nil } log.Info("sync the history regions with server", zap.String("server", name), zap.Uint64("from-index", startIndex), zap.Uint64("last-index", s.history.getNextIndex()), zap.Int("records-length", len(records))) - return s.syncHistoryRecords(startIndex, records, stream) + return nil, s.syncHistoryRecords(startIndex, records, stream) } func (*RegionSyncer) syncHistoryRecords(startIndex uint64, records []*core.RegionInfo, stream pdpb.PD_SyncRegionsServer) error { @@ -348,10 +363,21 @@ func (*RegionSyncer) syncHistoryRecords(startIndex uint64, records []*core.Regio return stream.Send(resp) } -func (s *RegionSyncer) syncFullRegions(ctx context.Context, name string, stream pdpb.PD_SyncRegionsServer) error { +func (s *RegionSyncer) syncFullRegions(ctx context.Context, name string, stream pdpb.PD_SyncRegionsServer) (*regionSyncStream, error) { + catchUpIndex := s.history.getNextIndex() regions := s.server.GetRegions() lastIndex := 0 start := time.Now() + if len(regions) == 0 { + resp := &pdpb.SyncRegionResponse{ + Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()}, + StartIndex: 0, + } + if err := stream.Send(resp); err != nil { + log.Error("failed to send sync region response", errs.ZapError(errs.ErrGRPCSend, err)) + return nil, err + } + } metas := make([]*metapb.Region, 0, maxSyncRegionBatchSize) stats := make([]*pdpb.RegionStat, 0, maxSyncRegionBatchSize) leaders := make([]*metapb.Peer, 0, maxSyncRegionBatchSize) @@ -363,7 +389,7 @@ func (s *RegionSyncer) syncFullRegions(ctx context.Context, name string, stream failpoint.Inject("noFastExitSync", func() { failpoint.Goto("doSync") }) - return nil + return nil, nil default: } failpoint.Label("doSync") @@ -392,21 +418,67 @@ func (s *RegionSyncer) syncFullRegions(ctx context.Context, name string, stream } if err := s.limit.WaitN(ctx, resp.Size()); err != nil { log.Error("failed to wait rate limit", errs.ZapError(err)) - return err + return nil, err } lastIndex += len(metas) if err := stream.Send(resp); err != nil { log.Error("failed to send sync region response", errs.ZapError(errs.ErrGRPCSend, err)) - return err + return nil, err } metas = metas[:0] stats = stats[:0] leaders = leaders[:0] buckets = buckets[:0] } + for { + records := s.history.recordsFrom(catchUpIndex) + if len(records) == 0 { + if catchUpIndex < s.history.getFirstIndex() { + log.Warn("region history buffer overflow during full synchronization, restart full synchronization", + zap.String("requested-server", name), + zap.String("server", s.server.Name()), + zap.Uint64("catch-up-index", catchUpIndex), + zap.Uint64("first-index", s.history.getFirstIndex())) + return s.syncFullRegions(ctx, name, stream) + } + if catchUpIndex == s.history.getNextIndex() { + break + } + continue + } + if err := s.syncHistoryRecords(catchUpIndex, records, stream); err != nil { + return nil, err + } + catchUpIndex += uint64(len(records)) + } + syncStream := newRegionSyncStream(stream) + s.mu.Lock() + defer s.mu.Unlock() + for { + records := s.history.recordsFrom(catchUpIndex) + if len(records) == 0 { + if catchUpIndex < s.history.getFirstIndex() { + return nil, errors.Errorf("region history buffer overflow during full sync catch-up, catch-up-index %d, first-index %d", catchUpIndex, s.history.getFirstIndex()) + } + break + } + if err := s.syncHistoryRecords(catchUpIndex, records, stream); err != nil { + return nil, err + } + catchUpIndex += uint64(len(records)) + } + resp := &pdpb.SyncRegionResponse{ + Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()}, + StartIndex: catchUpIndex, + } + if err := stream.Send(resp); err != nil { + log.Error("failed to send sync region response", errs.ZapError(errs.ErrGRPCSend, err)) + return nil, err + } + s.bindStreamLocked(name, syncStream) log.Info("requested server has completed full synchronization with server", zap.String("requested-server", name), zap.String("server", s.server.Name()), zap.Duration("cost", time.Since(start))) - return nil + return syncStream, nil } // bindStream binds the established server stream. @@ -414,11 +486,15 @@ func (s *RegionSyncer) bindStream(name string, stream ServerStream) *regionSyncS syncStream := newRegionSyncStream(stream) s.mu.Lock() defer s.mu.Unlock() + s.bindStreamLocked(name, syncStream) + return syncStream +} + +func (s *RegionSyncer) bindStreamLocked(name string, syncStream *regionSyncStream) { if oldStream := s.mu.streams[name]; oldStream != nil { oldStream.close() } s.mu.streams[name] = syncStream - return syncStream } func (s *RegionSyncer) unbindStream(name string, stream *regionSyncStream) { diff --git a/pkg/syncer/server_test.go b/pkg/syncer/server_test.go index 2b5398e841f..8d8f50aa11e 100644 --- a/pkg/syncer/server_test.go +++ b/pkg/syncer/server_test.go @@ -23,19 +23,289 @@ import ( "time" "github.com/stretchr/testify/require" + "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/tikv/pd/pkg/core" "github.com/tikv/pd/pkg/mock/mockserver" "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/keypath" "github.com/tikv/pd/pkg/utils/testutil" ) +type testSyncRegionsServer struct { + grpc.ServerStream + responses []*pdpb.SyncRegionResponse + onSend func(resp *pdpb.SyncRegionResponse) +} + +func (s *testSyncRegionsServer) Send(resp *pdpb.SyncRegionResponse) error { + s.responses = append(s.responses, resp) + if s.onSend != nil { + s.onSend(resp) + } + return nil +} + +func (*testSyncRegionsServer) Recv() (*pdpb.SyncRegionRequest, error) { + return nil, nil +} + +func TestSyncHistoryRegionFallsBackToFullSyncWhenHistoryGapIsTooLarge(t *testing.T) { + re := require.New(t) + bc := core.NewBasicCluster() + for i := range 3 { + bc.PutRegion(core.NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + StartKey: []byte{byte(i)}, + EndKey: []byte{byte(i + 1)}, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + }, nil)) + } + server := mockserver.NewMockServer( + context.Background(), + nil, + nil, + storage.NewStorageWithMemoryBackend(), + bc, + ) + rc := NewRegionSyncer(server) + rc.history = newHistoryBuffer(2, kv.NewMemoryKV()) + for i := range 4 { + rc.history.record(core.NewRegionInfo(&metapb.Region{Id: uint64(i + 10)}, nil)) + } + re.Equal(uint64(2), rc.history.getFirstIndex()) + + stream := &testSyncRegionsServer{} + req := &pdpb.SyncRegionRequest{ + Header: &pdpb.RequestHeader{ClusterId: keypath.ClusterID()}, + Member: &pdpb.Member{ + Name: "pd-follower", + }, + StartIndex: 1, + } + syncStream, err := rc.syncHistoryRegion(context.Background(), req, stream) + re.NoError(err) + re.NotNil(syncStream) + re.Len(stream.responses, 2) + re.Equal(uint64(0), stream.responses[0].GetStartIndex()) + re.Len(stream.responses[0].GetRegions(), 3) + re.Equal(uint64(4), stream.responses[1].GetStartIndex()) + re.Empty(stream.responses[1].GetRegions()) +} + +func TestSyncHistoryRegionFallsBackToFullSyncWhenRequestedIndexIsZero(t *testing.T) { + re := require.New(t) + bc := core.NewBasicCluster() + for i := range 3 { + bc.PutRegion(core.NewRegionInfo(&metapb.Region{ + Id: uint64(i + 1), + StartKey: []byte{byte(i)}, + EndKey: []byte{byte(i + 1)}, + RegionEpoch: &metapb.RegionEpoch{ + ConfVer: 1, + Version: 1, + }, + }, nil)) + } + server := mockserver.NewMockServer( + context.Background(), + nil, + nil, + storage.NewStorageWithMemoryBackend(), + bc, + ) + rc := NewRegionSyncer(server) + rc.history = newHistoryBuffer(4, kv.NewMemoryKV()) + for i := range 2 { + rc.history.record(core.NewRegionInfo(&metapb.Region{Id: uint64(i + 10)}, nil)) + } + + stream := &testSyncRegionsServer{} + req := &pdpb.SyncRegionRequest{ + Header: &pdpb.RequestHeader{ClusterId: keypath.ClusterID()}, + Member: &pdpb.Member{ + Name: "pd-follower", + }, + StartIndex: 0, + } + syncStream, err := rc.syncHistoryRegion(context.Background(), req, stream) + re.NoError(err) + re.NotNil(syncStream) + re.Len(stream.responses, 2) + re.Equal(uint64(0), stream.responses[0].GetStartIndex()) + re.Len(stream.responses[0].GetRegions(), 3) + re.Equal(uint64(2), stream.responses[1].GetStartIndex()) + re.Empty(stream.responses[1].GetRegions()) +} + +func TestSyncHistoryRegionFallsBackToFullSyncWhenRequestedIndexIsAhead(t *testing.T) { + re := require.New(t) + bc := core.NewBasicCluster() + bc.PutRegion(core.NewRegionInfo(&metapb.Region{ + Id: 1, + StartKey: []byte{0}, + EndKey: []byte{1}, + }, nil)) + server := mockserver.NewMockServer( + context.Background(), + nil, + nil, + storage.NewStorageWithMemoryBackend(), + bc, + ) + rc := NewRegionSyncer(server) + rc.history = newHistoryBuffer(2, kv.NewMemoryKV()) + for i := range 2 { + rc.history.record(core.NewRegionInfo(&metapb.Region{Id: uint64(i + 10)}, nil)) + } + + stream := &testSyncRegionsServer{} + req := &pdpb.SyncRegionRequest{ + Header: &pdpb.RequestHeader{ClusterId: keypath.ClusterID()}, + Member: &pdpb.Member{ + Name: "pd-follower", + }, + StartIndex: rc.history.getNextIndex() + 1, + } + syncStream, err := rc.syncHistoryRegion(context.Background(), req, stream) + re.NoError(err) + re.NotNil(syncStream) + re.Len(stream.responses, 2) + re.Equal(uint64(0), stream.responses[0].GetStartIndex()) + re.Len(stream.responses[0].GetRegions(), 1) + re.Equal(uint64(2), stream.responses[1].GetStartIndex()) + re.Empty(stream.responses[1].GetRegions()) +} + +func TestSyncHistoryRegionSendsIncrementalRecords(t *testing.T) { + re := require.New(t) + server := mockserver.NewMockServer( + context.Background(), + nil, + nil, + storage.NewStorageWithMemoryBackend(), + core.NewBasicCluster(), + ) + rc := NewRegionSyncer(server) + rc.history = newHistoryBuffer(2, kv.NewMemoryKV()) + for i := range 4 { + rc.history.record(core.NewRegionInfo(&metapb.Region{Id: uint64(i + 10)}, nil)) + } + re.Equal(uint64(2), rc.history.getFirstIndex()) + + stream := &testSyncRegionsServer{} + req := &pdpb.SyncRegionRequest{ + Header: &pdpb.RequestHeader{ClusterId: keypath.ClusterID()}, + Member: &pdpb.Member{ + Name: "pd-follower", + }, + StartIndex: rc.history.getFirstIndex(), + } + syncStream, err := rc.syncHistoryRegion(context.Background(), req, stream) + re.NoError(err) + re.Nil(syncStream) + re.Len(stream.responses, 1) + re.Equal(uint64(2), stream.responses[0].GetStartIndex()) + re.Len(stream.responses[0].GetRegions(), 2) +} + +func TestSyncHistoryRegionSendsInSyncResponse(t *testing.T) { + re := require.New(t) + bc := core.NewBasicCluster() + for i := range 3 { + bc.PutRegion(core.NewRegionInfo(&metapb.Region{Id: uint64(i + 1)}, nil)) + } + server := mockserver.NewMockServer( + context.Background(), + nil, + nil, + storage.NewStorageWithMemoryBackend(), + bc, + ) + rc := NewRegionSyncer(server) + rc.history = newHistoryBuffer(2, kv.NewMemoryKV()) + for i := range 4 { + rc.history.record(core.NewRegionInfo(&metapb.Region{Id: uint64(i + 10)}, nil)) + } + + stream := &testSyncRegionsServer{} + req := &pdpb.SyncRegionRequest{ + Header: &pdpb.RequestHeader{ClusterId: keypath.ClusterID()}, + Member: &pdpb.Member{ + Name: "pd-follower", + }, + StartIndex: rc.history.getNextIndex(), + } + syncStream, err := rc.syncHistoryRegion(context.Background(), req, stream) + re.NoError(err) + re.Nil(syncStream) + re.Len(stream.responses, 1) + re.Equal(uint64(4), stream.responses[0].GetStartIndex()) + re.Empty(stream.responses[0].GetRegions()) +} + +func TestSyncFullRegionsCatchesUpHistoryBeforeBindingStream(t *testing.T) { + re := require.New(t) + bc := core.NewBasicCluster() + bc.PutRegion(core.NewRegionInfo(&metapb.Region{ + Id: 1, + StartKey: []byte{0}, + EndKey: []byte{1}, + }, nil)) + server := mockserver.NewMockServer( + context.Background(), + nil, + nil, + storage.NewStorageWithMemoryBackend(), + bc, + ) + rc := NewRegionSyncer(server) + rc.history = newHistoryBuffer(4, kv.NewMemoryKV()) + for i := range 6 { + rc.history.record(core.NewRegionInfo(&metapb.Region{Id: uint64(i + 10)}, nil)) + } + stream := &testSyncRegionsServer{} + stream.onSend = func(resp *pdpb.SyncRegionResponse) { + if len(resp.GetRegions()) == 1 && resp.GetRegions()[0].GetId() == 1 { + rc.history.record(core.NewRegionInfo(&metapb.Region{ + Id: 2, + StartKey: []byte{1}, + EndKey: []byte{2}, + }, nil)) + stream.onSend = nil + } + } + req := &pdpb.SyncRegionRequest{ + Header: &pdpb.RequestHeader{ClusterId: keypath.ClusterID()}, + Member: &pdpb.Member{ + Name: "pd-follower", + }, + StartIndex: 1, + } + + syncStream, err := rc.syncHistoryRegion(context.Background(), req, stream) + re.NoError(err) + re.NotNil(syncStream) + re.Len(stream.responses, 3) + re.Equal(uint64(0), stream.responses[0].GetStartIndex()) + re.Len(stream.responses[0].GetRegions(), 1) + re.Equal(uint64(6), stream.responses[1].GetStartIndex()) + re.Len(stream.responses[1].GetRegions(), 1) + re.Equal(uint64(7), stream.responses[2].GetStartIndex()) + re.Empty(stream.responses[2].GetRegions()) + re.ElementsMatch([]string{"pd-follower"}, rc.GetAllDownstreamNames()) +} + func TestSyncExitsWhenRegionSyncerStops(t *testing.T) { re := require.New(t) tempDir := t.TempDir()