diff --git a/pkg/unsaferecovery/unsafe_recovery_controller.go b/pkg/unsaferecovery/unsafe_recovery_controller.go index a2e3eec284e..4ec822c085d 100644 --- a/pkg/unsaferecovery/unsafe_recovery_controller.go +++ b/pkg/unsaferecovery/unsafe_recovery_controller.go @@ -21,6 +21,7 @@ import ( "sort" "strconv" "strings" + "sync/atomic" "time" "go.uber.org/zap" @@ -47,6 +48,12 @@ const ( storeRequestInterval = time.Second * 40 ) +var globalRecoveryStep = uint64(time.Now().UnixNano()) + +func nextRecoveryStep() uint64 { + return atomic.AddUint64(&globalRecoveryStep, 1) +} + // Stage transition graph: for more details, please check `Controller.HandleStoreHeartbeat()` // // +-----------+ +-----------+ @@ -122,10 +129,11 @@ type Controller struct { cluster cluster stage stage // the round of recovery, which is an increasing number to identify the reports of each round - step uint64 - failedStores map[uint64]struct{} - timeout time.Time - autoDetect bool + step uint64 + recoveryStartStep uint64 + failedStores map[uint64]struct{} + timeout time.Time + autoDetect bool // collected reports from store, if not reported yet, it would be nil storeReports map[uint64]*pdpb.StoreReport @@ -169,6 +177,7 @@ func NewController(cluster cluster) *Controller { func (u *Controller) reset() { u.stage = Idle u.step = 0 + u.recoveryStartStep = 0 u.failedStores = make(map[uint64]struct{}) u.storeReports = make(map[uint64]*pdpb.StoreReport) u.numStoresReported = 0 @@ -237,12 +246,29 @@ func (u *Controller) RemoveFailedStores(failedStores map[uint64]struct{}, timeou } u.timeout = time.Now().Add(time.Duration(timeout) * time.Second) + u.step = nextRecoveryStep() + u.recoveryStartStep = u.step u.failedStores = failedStores u.autoDetect = autoDetect u.changeStage(CollectReport) return nil } +// AbortFailedStoresRemoval aborts the current unsafe recovery process in a best-effort way. +// It asks TiKV to exit force leader by dispatching empty recovery plans, but any plan that +// has already been delivered to TiKV may keep running until TiKV finishes or times it out. +func (u *Controller) AbortFailedStoresRemoval() error { + u.Lock() + defer u.Unlock() + + if !isRunning(u.stage) { + return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs("no ongoing unsafe recovery") + } + + u.handleErr(errors.New("aborted by operator")) + return nil +} + // Show returns the current status of ongoing unsafe recover operation. func (u *Controller) Show() []StageOutput { u.Lock() @@ -550,8 +576,9 @@ func (u *Controller) changeStage(stage stage) { output.Details = append(output.Details, fmt.Sprintf("triggered by error: %v", u.err.Error())) } case Finished: - if u.step > 1 { - // == 1 means no operation has done, no need to invalid cache + if u.step > u.recoveryStartStep+1 { + // Only CollectReport has finished when step == recoveryStartStep+1, + // which means no operation has done and no cache invalidation is needed. u.cluster.ResetRegionCache() } output.Info = "Unsafe recovery Finished" diff --git a/pkg/unsaferecovery/unsafe_recovery_controller_test.go b/pkg/unsaferecovery/unsafe_recovery_controller_test.go index d6653894879..93a97d9513c 100644 --- a/pkg/unsaferecovery/unsafe_recovery_controller_test.go +++ b/pkg/unsaferecovery/unsafe_recovery_controller_test.go @@ -242,7 +242,7 @@ func TestFinished(t *testing.T) { re.Empty(resp.RecoveryPlan.Creates) re.Empty(resp.RecoveryPlan.Demotes) re.Nil(resp.RecoveryPlan.ForceLeader) - re.Equal(uint64(1), resp.RecoveryPlan.Step) + re.Equal(recoveryController.step, resp.RecoveryPlan.Step) applyRecoveryPlan(re, storeID, reports, resp) } @@ -426,11 +426,11 @@ func TestForceLeaderFail(t *testing.T) { req1 := newStoreHeartbeat(1, reports[1]) resp1 := &pdpb.StoreHeartbeatResponse{} - req1.StoreReport.Step = 1 + req1.StoreReport.Step = recoveryController.step recoveryController.HandleStoreHeartbeat(req1, resp1) req2 := newStoreHeartbeat(2, reports[2]) resp2 := &pdpb.StoreHeartbeatResponse{} - req2.StoreReport.Step = 1 + req2.StoreReport.Step = recoveryController.step recoveryController.HandleStoreHeartbeat(req2, resp2) re.Equal(ForceLeader, recoveryController.GetStage()) recoveryController.HandleStoreHeartbeat(req1, resp1) @@ -545,7 +545,7 @@ func TestForceLeaderForCommitMerge(t *testing.T) { req := newStoreHeartbeat(1, reports[1]) resp := &pdpb.StoreHeartbeatResponse{} - req.StoreReport.Step = 1 + req.StoreReport.Step = recoveryController.step recoveryController.HandleStoreHeartbeat(req, resp) re.Equal(ForceLeaderForCommitMerge, recoveryController.GetStage()) @@ -653,7 +653,7 @@ func TestAutoDetectWithOneLearner(t *testing.T) { }, } req := newStoreHeartbeat(1, &storeReport) - req.StoreReport.Step = 1 + req.StoreReport.Step = recoveryController.step resp := &pdpb.StoreHeartbeatResponse{} recoveryController.HandleStoreHeartbeat(req, resp) hasStore3AsFailedStore := false @@ -1167,7 +1167,7 @@ func TestExecutionTimeout(t *testing.T) { resp := &pdpb.StoreHeartbeatResponse{} recoveryController.HandleStoreHeartbeat(req, resp) re.Equal(ExitForceLeader, recoveryController.GetStage()) - req.StoreReport = &pdpb.StoreReport{Step: 2} + req.StoreReport = &pdpb.StoreReport{Step: recoveryController.step} recoveryController.HandleStoreHeartbeat(req, resp) re.Equal(Failed, recoveryController.GetStage()) @@ -1231,7 +1231,7 @@ func TestExitForceLeader(t *testing.T) { IsForceLeader: true, }, }, - Step: 1, + Step: recoveryController.step, }, } @@ -1317,7 +1317,7 @@ func TestStep(t *testing.T) { re.Equal(CollectReport, recoveryController.GetStage()) // valid store report - req.StoreReport.Step = 1 + req.StoreReport.Step = recoveryController.step recoveryController.HandleStoreHeartbeat(req, resp) re.Equal(ForceLeader, recoveryController.GetStage()) @@ -1547,7 +1547,7 @@ func TestRangeOverlap1(t *testing.T) { RegionEpoch: &metapb.RegionEpoch{ConfVer: 7, Version: 10}, Peers: []*metapb.Peer{ {Id: 11, StoreId: 1}, {Id: 12, StoreId: 4}, {Id: 13, StoreId: 5}}}}}, - }, Step: 1}, + }, Step: recoveryController.step}, 2: {PeerReports: []*pdpb.PeerReport{ { RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10, HardState: &eraftpb.HardState{Term: 1, Commit: 10}}, @@ -1559,7 +1559,7 @@ func TestRangeOverlap1(t *testing.T) { RegionEpoch: &metapb.RegionEpoch{ConfVer: 5, Version: 8}, Peers: []*metapb.Peer{ {Id: 21, StoreId: 2}, {Id: 22, StoreId: 4}, {Id: 23, StoreId: 5}}}}}, - }, Step: 1}, + }, Step: recoveryController.step}, 3: {PeerReports: []*pdpb.PeerReport{ { RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10, HardState: &eraftpb.HardState{Term: 1, Commit: 10}}, @@ -1571,7 +1571,7 @@ func TestRangeOverlap1(t *testing.T) { RegionEpoch: &metapb.RegionEpoch{ConfVer: 4, Version: 6}, Peers: []*metapb.Peer{ {Id: 31, StoreId: 3}, {Id: 32, StoreId: 4}, {Id: 33, StoreId: 5}}}}}, - }, Step: 1}, + }, Step: recoveryController.step}, } advanceUntilFinished(re, recoveryController, reports) @@ -1747,6 +1747,67 @@ func TestRemoveFailedStores(t *testing.T) { }, 60, false)) } +func TestAbortFailedStoresRemoval(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + opts := mockconfig.NewTestOptions() + cluster := mockcluster.NewCluster(ctx, opts) + coordinator := schedule.NewCoordinator(ctx, cluster, hbstream.NewTestHeartbeatStreams(ctx, cluster, true)) + coordinator.Run() + for _, store := range newTestStores(2, "6.0.0") { + cluster.PutStore(store) + } + recoveryController := NewController(cluster) + re.Error(recoveryController.AbortFailedStoresRemoval()) + + re.NoError(recoveryController.RemoveFailedStores(map[uint64]struct{}{ + 1: {}, + }, 60, false)) + re.NoError(recoveryController.AbortFailedStoresRemoval()) + re.Equal(ExitForceLeader, recoveryController.GetStage()) + re.Contains(recoveryController.output[1].Details[0], "aborted by operator") + + resp := &pdpb.StoreHeartbeatResponse{} + recoveryController.HandleStoreHeartbeat(newStoreHeartbeat(2, nil), resp) + re.NotNil(resp.GetRecoveryPlan()) + re.Empty(resp.GetRecoveryPlan().GetForceLeader().GetEnterForceLeaders()) + re.Empty(resp.GetRecoveryPlan().GetCreates()) + re.Empty(resp.GetRecoveryPlan().GetDemotes()) + re.Empty(resp.GetRecoveryPlan().GetTombstones()) + + report := &pdpb.StoreReport{Step: resp.GetRecoveryPlan().GetStep()} + recoveryController.HandleStoreHeartbeat(newStoreHeartbeat(2, report), &pdpb.StoreHeartbeatResponse{}) + re.Equal(Failed, recoveryController.GetStage()) +} + +func TestUnsafeRecoveryStepIsUniqueAcrossRuns(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + opts := mockconfig.NewTestOptions() + cluster := mockcluster.NewCluster(ctx, opts) + coordinator := schedule.NewCoordinator(ctx, cluster, hbstream.NewTestHeartbeatStreams(ctx, cluster, true)) + coordinator.Run() + for _, store := range newTestStores(1, "6.0.0") { + cluster.PutStore(store) + } + recoveryController := NewController(cluster) + re.NoError(recoveryController.RemoveFailedStores(nil, 60, true)) + oldStep := recoveryController.step + recoveryController.changeStage(Failed) + + re.NoError(recoveryController.RemoveFailedStores(nil, 60, true)) + newStep := recoveryController.step + re.NotEqual(oldStep, newStep) + + recoveryController.HandleStoreHeartbeat(newStoreHeartbeat(1, &pdpb.StoreReport{Step: oldStep}), &pdpb.StoreHeartbeatResponse{}) + re.Equal(CollectReport, recoveryController.GetStage()) + re.Nil(recoveryController.storeReports[1]) +} + func TestRunning(t *testing.T) { re := require.New(t) ctx, cancel := context.WithCancel(context.Background()) diff --git a/server/api/router.go b/server/api/router.go index 7533fa9e4b0..3cff254c52e 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -366,6 +366,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { unsafeOperationHandler := newUnsafeOperationHandler(svr, rd) registerFunc(clusterRouter, "/admin/unsafe/remove-failed-stores", unsafeOperationHandler.RemoveFailedStores, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) + registerFunc(clusterRouter, "/admin/unsafe/remove-failed-stores/abort", + unsafeOperationHandler.AbortFailedStoresRemoval, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus)) registerFunc(clusterRouter, "/admin/unsafe/remove-failed-stores/show", unsafeOperationHandler.GetFailedStoresRemovalStatus, setMethods(http.MethodGet), setAuditBackend(prometheus)) diff --git a/server/api/unsafe_operation.go b/server/api/unsafe_operation.go index efcae3fa086..063e11f9a49 100644 --- a/server/api/unsafe_operation.go +++ b/server/api/unsafe_operation.go @@ -82,6 +82,25 @@ func (h *unsafeOperationHandler) RemoveFailedStores(w http.ResponseWriter, r *ht h.rd.JSON(w, http.StatusOK, "Request has been accepted.") } +// AbortFailedStoresRemoval aborts the current failed stores removal. +// +// @Tags unsafe +// @Summary Abort the current failed stores removal. +// @Produce json +// +// Success 200 {string} string "Request has been accepted." +// Failure 500 {string} string "PD server failed to proceed the request." +// +// @Router /admin/unsafe/remove-failed-stores/abort [post] +func (h *unsafeOperationHandler) AbortFailedStoresRemoval(w http.ResponseWriter, r *http.Request) { + rc := getCluster(r) + if err := rc.GetUnsafeRecoveryController().AbortFailedStoresRemoval(); err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + h.rd.JSON(w, http.StatusOK, "Request has been accepted.") +} + // GetFailedStoresRemovalStatus gets the current status of failed stores removal. // // @Tags unsafe diff --git a/tests/server/api/unsafe_operation_test.go b/tests/server/api/unsafe_operation_test.go index 680473151e3..2bb2befac69 100644 --- a/tests/server/api/unsafe_operation_test.go +++ b/tests/server/api/unsafe_operation_test.go @@ -93,6 +93,9 @@ func (suite *unsafeOperationTestSuite) checkRemoveFailedStores(cluster *tests.Te err = testutil.CheckPostJSON(tests.TestDialClient, urlPrefix+"/remove-failed-stores", data, testutil.StatusOK(re)) re.NoError(err) + err = testutil.CheckPostJSON(tests.TestDialClient, urlPrefix+"/remove-failed-stores/abort", nil, testutil.StatusOK(re)) + re.NoError(err) + // Test show var output []unsaferecovery.StageOutput err = testutil.ReadGetJSON(re, tests.TestDialClient, urlPrefix+"/remove-failed-stores/show", &output) diff --git a/tools/pd-ctl/pdctl/command/unsafe_command.go b/tools/pd-ctl/pdctl/command/unsafe_command.go index 04d272385e7..383195b4634 100644 --- a/tools/pd-ctl/pdctl/command/unsafe_command.go +++ b/tools/pd-ctl/pdctl/command/unsafe_command.go @@ -47,6 +47,7 @@ func NewRemoveFailedStoresCommand() *cobra.Command { Note: DO NOT RECOMMEND to use this flag for general use, it's used only for case that PD doesn't have the store information of failed stores after pd-recover; Note: Do it with caution to make sure all live stores's heartbeats has been reported PD already, otherwise it may regarded some stores as failed mistakenly.`) cmd.AddCommand(NewRemoveFailedStoresShowCommand()) + cmd.AddCommand(NewRemoveFailedStoresAbortCommand()) return cmd } @@ -59,6 +60,15 @@ func NewRemoveFailedStoresShowCommand() *cobra.Command { } } +// NewRemoveFailedStoresAbortCommand returns the unsafe remove failed stores abort command. +func NewRemoveFailedStoresAbortCommand() *cobra.Command { + return &cobra.Command{ + Use: "abort", + Short: "Abort the current failed stores removal", + Run: removeFailedStoresAbortCommandFunc, + } +} + func removeFailedStoresCommandFunc(cmd *cobra.Command, args []string) { prefix := fmt.Sprintf("%s/remove-failed-stores", unsafePrefix) postInput := make(map[string]any, 3) @@ -117,3 +127,8 @@ func removeFailedStoresShowCommandFunc(cmd *cobra.Command, _ []string) { } cmd.Println(resp) } + +func removeFailedStoresAbortCommandFunc(cmd *cobra.Command, _ []string) { + prefix := fmt.Sprintf("%s/remove-failed-stores/abort", unsafePrefix) + postJSON(cmd, prefix, nil) +} diff --git a/tools/pd-ctl/tests/unsafe/unsafe_operation_test.go b/tools/pd-ctl/tests/unsafe/unsafe_operation_test.go index 86f11f43647..f999b9e6796 100644 --- a/tools/pd-ctl/tests/unsafe/unsafe_operation_test.go +++ b/tools/pd-ctl/tests/unsafe/unsafe_operation_test.go @@ -49,6 +49,9 @@ func TestRemoveFailedStores(t *testing.T) { args = []string{"-u", pdAddr, "unsafe", "remove-failed-stores", "1,2,3", "--timeout", "abc"} _, err = tests.ExecuteCommand(cmd, args...) re.Error(err) + args = []string{"-u", pdAddr, "unsafe", "remove-failed-stores", "abort"} + _, err = tests.ExecuteCommand(cmd, args...) + re.NoError(err) args = []string{"-u", pdAddr, "unsafe", "remove-failed-stores", "show"} _, err = tests.ExecuteCommand(cmd, args...) re.NoError(err)