Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions pkg/mcs/router/server/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ func (s *RegionSyncer) sync(ctx context.Context, leaderAddr string) {
continue
}
log.Info("server starts to synchronize with leader", zap.String("server", s.name), zap.String("leader", leaderAddr), zap.Uint64("request-index", s.nextSyncIndex))
syncingHistory := true
fullSyncing := false
for {
resp, err := stream.Recv()
failpoint.Inject("syncMetError", func() {
Expand Down Expand Up @@ -310,6 +312,10 @@ func (s *RegionSyncer) sync(ctx context.Context, leaderAddr string) {
s.triggerMembershipCheck()
return
}
if syncingHistory && resp.GetStartIndex() == 0 && s.nextSyncIndex != 0 {
bc.ResetRegionCache()
fullSyncing = true
}
// client maybe loss some region info, need to reset the nextSyncIndex
if s.nextSyncIndex != resp.GetStartIndex() {
log.Warn("server sync index not match the leader",
Expand Down Expand Up @@ -353,6 +359,12 @@ func (s *RegionSyncer) sync(ctx context.Context, leaderAddr string) {
s.nextSyncIndex++
}
}
if fullSyncing && len(regions) == 0 {
fullSyncing = false
syncingHistory = false
} else if syncingHistory && !fullSyncing {
syncingHistory = false
}
}
}
}
Expand Down
64 changes: 61 additions & 3 deletions pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,43 @@ func (s *RegionSyncer) syncRegion(ctx context.Context, conn *grpc.ClientConn) (C

var regionGuide = core.GenerateRegionGuideFunc(false)

func (s *RegionSyncer) handleRegionSyncResponse(ctx context.Context, resp *pdpb.SyncRegionResponse, bc *core.BasicCluster, regionStorage storage.Storage) {
type regionSyncState struct {
syncingHistory bool
fullSyncing bool
}

func (*RegionSyncer) resetRegionCacheAndStorage(ctx context.Context, bc *core.BasicCluster, regionStorage storage.Storage) error {
if err := regionStorage.Flush(); err != nil {
return err
}
for _, region := range bc.GetRegions() {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
if err := regionStorage.DeleteRegion(region.GetMeta()); err != nil {
return err
}
}
bc.ResetRegionCache()
return nil
}

func (s *RegionSyncer) handleRegionSyncResponse(
ctx context.Context,
resp *pdpb.SyncRegionResponse,
bc *core.BasicCluster,
regionStorage storage.Storage,
state *regionSyncState,
) error {
if state.syncingHistory && resp.GetStartIndex() == 0 && s.history.getNextIndex() != 0 {
s.streamingRunning.Store(false)
if err := s.resetRegionCacheAndStorage(ctx, bc, regionStorage); err != nil {
return err
}
state.fullSyncing = true
}
if s.history.getNextIndex() != resp.GetStartIndex() {
log.Warn("server sync index not match the leader",
zap.String("server", s.server.Name()),
Expand Down Expand Up @@ -146,8 +182,20 @@ func (s *RegionSyncer) handleRegionSyncResponse(ctx context.Context, resp *pdpb.
_ = regionStorage.DeleteRegion(old.GetMeta())
}
}
if state.fullSyncing && len(regions) == 0 {
if err := regionStorage.Flush(); err != nil {
return err
}
state.fullSyncing = false
state.syncingHistory = false
} else if state.syncingHistory && !state.fullSyncing {
state.syncingHistory = false
}
// mark the client as running status when it finished the first history region sync.
s.streamingRunning.Store(true)
if !state.fullSyncing {
s.streamingRunning.Store(true)
}
return nil
}

// IsRunning returns whether the region syncer client is running.
Expand Down Expand Up @@ -233,6 +281,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
continue
}
log.Info("server starts to synchronize with leader", zap.String("server", s.server.Name()), zap.String("leader", s.server.GetLeader().GetName()), zap.Uint64("request-index", s.history.getNextIndex()))
syncState := &regionSyncState{syncingHistory: true}
for {
resp, err := stream.Recv()
if err == io.EOF {
Expand Down Expand Up @@ -260,7 +309,16 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
}
break
}
s.handleRegionSyncResponse(ctx, resp, bc, regionStorage)
if err = s.handleRegionSyncResponse(ctx, resp, bc, regionStorage, syncState); err != nil {
log.Warn("failed to handle region sync response",
zap.String("server", s.server.Name()),
zap.String("leader", s.server.GetLeader().GetName()),
errs.ZapError(err))
if err = stream.CloseSend(); err != nil {
log.Warn("failed to terminate client stream", errs.ZapError(errs.ErrGRPCCloseSend, err))
}
break
}
}
}
}()
Expand Down
126 changes: 126 additions & 0 deletions pkg/syncer/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ import (

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/mock/mockserver"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/keypath"
"github.com/tikv/pd/pkg/utils/testutil"
)

Expand Down Expand Up @@ -94,3 +96,127 @@ func TestErrorCode(t *testing.T) {
re.True(ok)
re.Equal(codes.Canceled, ev.Code())
}

func TestResetRegionCacheAndStorage(t *testing.T) {
re := require.New(t)
tempDir := t.TempDir()
rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil)
re.NoError(err)
defer func() {
re.NoError(rs.Close())
}()
storageWithRegionStorage := storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs)
storage.TrySwitchRegionStorage(storageWithRegionStorage, true)
bc := core.NewBasicCluster()
server := mockserver.NewMockServer(
context.Background(),
nil,
nil,
storageWithRegionStorage,
bc,
)
for i := range 2 {
region := &metapb.Region{
Id: uint64(i + 1),
StartKey: []byte{byte(i)},
EndKey: []byte{byte(i + 1)},
}
re.NoError(storageWithRegionStorage.SaveRegion(region))
bc.PutRegion(core.NewRegionInfo(region, nil))
}
re.NoError(storageWithRegionStorage.Flush())

rc := NewRegionSyncer(server)
re.NoError(rc.resetRegionCacheAndStorage(context.Background(), bc, storageWithRegionStorage))
re.Empty(bc.GetRegions())
for i := range 2 {
region := &metapb.Region{}
ok, err := storageWithRegionStorage.LoadRegion(uint64(i+1), region)
re.NoError(err)
re.False(ok)
}
}

func TestHandleFullSyncResponseResetsStaleRegions(t *testing.T) {
re := require.New(t)
tempDir := t.TempDir()
rs, err := storage.NewRegionStorageWithLevelDBBackend(context.Background(), tempDir, nil)
re.NoError(err)
defer func() {
re.NoError(rs.Close())
}()
storageWithRegionStorage := storage.NewCoreStorage(storage.NewStorageWithMemoryBackend(), rs)
storage.TrySwitchRegionStorage(storageWithRegionStorage, true)
bc := core.NewBasicCluster()
server := mockserver.NewMockServer(
context.Background(),
nil,
nil,
storageWithRegionStorage,
bc,
)
staleRegion1 := &metapb.Region{
Id: 1,
StartKey: []byte{0},
EndKey: []byte{1},
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
}
staleRegion2 := &metapb.Region{
Id: 2,
StartKey: []byte{1},
EndKey: []byte{2},
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 1,
},
}
for _, region := range []*metapb.Region{staleRegion1, staleRegion2} {
re.NoError(storageWithRegionStorage.SaveRegion(region))
bc.PutRegion(core.NewRegionInfo(region, nil))
}

rc := NewRegionSyncer(server)
rc.history.resetWithIndex(10)
state := &regionSyncState{syncingHistory: true}
latestRegion := &metapb.Region{
Id: 1,
StartKey: []byte{0},
EndKey: []byte{1},
RegionEpoch: &metapb.RegionEpoch{
ConfVer: 1,
Version: 2,
},
}
re.NoError(rc.handleRegionSyncResponse(context.Background(), &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()},
StartIndex: 0,
Regions: []*metapb.Region{latestRegion},
}, bc, storageWithRegionStorage, state))
re.False(rc.IsRunning())
re.True(state.fullSyncing)
re.NoError(rc.handleRegionSyncResponse(context.Background(), &pdpb.SyncRegionResponse{
Header: &pdpb.ResponseHeader{ClusterId: keypath.ClusterID()},
StartIndex: 20,
}, bc, storageWithRegionStorage, state))

re.True(rc.IsRunning())
re.False(state.fullSyncing)
re.False(state.syncingHistory)
re.Equal(uint64(20), rc.history.getNextIndex())
historyIndex, err := rs.Load(historyKey)
re.NoError(err)
re.Equal("20", historyIndex)
re.Len(bc.GetRegions(), 1)
re.Equal(uint64(2), bc.GetRegion(1).GetRegionEpoch().GetVersion())
storedRegion := &metapb.Region{}
ok, err := storageWithRegionStorage.LoadRegion(1, storedRegion)
re.NoError(err)
re.True(ok)
re.Equal(uint64(2), storedRegion.GetRegionEpoch().GetVersion())
ok, err = storageWithRegionStorage.LoadRegion(2, storedRegion)
re.NoError(err)
re.False(ok)
}
7 changes: 7 additions & 0 deletions pkg/syncer/history_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (h *historyBuffer) resetWithIndex(index uint64) {
h.head = 0
h.tail = 0
h.flushCount = defaultFlushCount
h.persist()
}

func (h *historyBuffer) getNextIndex() uint64 {
Expand All @@ -134,6 +135,12 @@ func (h *historyBuffer) getNextIndex() uint64 {
return h.index
}

func (h *historyBuffer) getFirstIndex() uint64 {
h.RLock()
defer h.RUnlock()
return h.firstIndex()
}

func (h *historyBuffer) get(index uint64) *core.RegionInfo {
h.RLock()
defer h.RUnlock()
Expand Down
Loading
Loading