Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions pkg/unsaferecovery/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sort"
"strconv"
"strings"
"sync/atomic"
"time"

"go.uber.org/zap"
Expand All @@ -47,6 +48,12 @@ const (
storeRequestInterval = time.Second * 40
)

var globalRecoveryStep = uint64(time.Now().UnixNano())

func nextRecoveryStep() uint64 {
return atomic.AddUint64(&globalRecoveryStep, 1)
}
Comment on lines +51 to +55
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Allocate recovery steps from the process-wide counter for every stage, not just once per run.

This only reserves a unique value for the run’s initial step. After that, the controller still advances u.step locally, so a later run can reuse step values that an earlier run already emitted in later stages. A delayed StoreReport from the previous run can then pass the GetStep() == u.step check in the new run, which is exactly the collision this PR is trying to prevent.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/unsaferecovery/unsafe_recovery_controller.go` around lines 51 - 55, The
controller currently only seeds globalRecoveryStep once and then advances
per-run state via u.step locally, allowing later runs to reuse step numbers;
change all places that advance or set u.step (including initialization) to
allocate a fresh value from the process-wide counter by calling
nextRecoveryStep() and storing that into u.step (replace any u.step++ or local
increments with atomic nextRecoveryStep() assignments), and ensure comparisons
that use GetStep() continue to compare against the updated u.step so every stage
in every run uses a unique process-wide step value (symbols to update:
globalRecoveryStep, nextRecoveryStep(), u.step, GetStep(), and any code paths
that call StoreReport or increment step).


// Stage transition graph: for more details, please check `Controller.HandleStoreHeartbeat()`
//
// +-----------+ +-----------+
Expand Down Expand Up @@ -122,10 +129,11 @@ type Controller struct {
cluster cluster
stage stage
// the round of recovery, which is an increasing number to identify the reports of each round
step uint64
failedStores map[uint64]struct{}
timeout time.Time
autoDetect bool
step uint64
recoveryStartStep uint64
failedStores map[uint64]struct{}
timeout time.Time
autoDetect bool

// collected reports from store, if not reported yet, it would be nil
storeReports map[uint64]*pdpb.StoreReport
Expand Down Expand Up @@ -169,6 +177,7 @@ func NewController(cluster cluster) *Controller {
func (u *Controller) reset() {
u.stage = Idle
u.step = 0
u.recoveryStartStep = 0
u.failedStores = make(map[uint64]struct{})
u.storeReports = make(map[uint64]*pdpb.StoreReport)
u.numStoresReported = 0
Expand Down Expand Up @@ -237,12 +246,29 @@ func (u *Controller) RemoveFailedStores(failedStores map[uint64]struct{}, timeou
}

u.timeout = time.Now().Add(time.Duration(timeout) * time.Second)
u.step = nextRecoveryStep()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we allocate the step from the global monotonic counter on every stage transition, or add a recovery epoch, so delayed reports from a previous run cannot pass the new run's step check?

u.recoveryStartStep = u.step
u.failedStores = failedStores
u.autoDetect = autoDetect
u.changeStage(CollectReport)
return nil
}

// AbortFailedStoresRemoval aborts the current unsafe recovery process in a best-effort way.
// It asks TiKV to exit force leader by dispatching empty recovery plans, but any plan that
// has already been delivered to TiKV may keep running until TiKV finishes or times it out.
func (u *Controller) AbortFailedStoresRemoval() error {
u.Lock()
defer u.Unlock()

if !isRunning(u.stage) {
return errs.ErrUnsafeRecoveryInvalidInput.FastGenByArgs("no ongoing unsafe recovery")
}

u.handleErr(errors.New("aborted by operator"))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make abort a no-op when stage == ExitForceLeader, or keep the controller in ExitForceLeader and add a repeated-abort test?

return nil
}

// Show returns the current status of ongoing unsafe recover operation.
func (u *Controller) Show() []StageOutput {
u.Lock()
Expand Down Expand Up @@ -550,8 +576,9 @@ func (u *Controller) changeStage(stage stage) {
output.Details = append(output.Details, fmt.Sprintf("triggered by error: %v", u.err.Error()))
}
case Finished:
if u.step > 1 {
// == 1 means no operation has done, no need to invalid cache
if u.step > u.recoveryStartStep+1 {
// Only CollectReport has finished when step == recoveryStartStep+1,
// which means no operation has done and no cache invalidation is needed.
u.cluster.ResetRegionCache()
}
output.Info = "Unsafe recovery Finished"
Expand Down
83 changes: 72 additions & 11 deletions pkg/unsaferecovery/unsafe_recovery_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ func TestFinished(t *testing.T) {
re.Empty(resp.RecoveryPlan.Creates)
re.Empty(resp.RecoveryPlan.Demotes)
re.Nil(resp.RecoveryPlan.ForceLeader)
re.Equal(uint64(1), resp.RecoveryPlan.Step)
re.Equal(recoveryController.step, resp.RecoveryPlan.Step)
applyRecoveryPlan(re, storeID, reports, resp)
}

Expand Down Expand Up @@ -426,11 +426,11 @@ func TestForceLeaderFail(t *testing.T) {

req1 := newStoreHeartbeat(1, reports[1])
resp1 := &pdpb.StoreHeartbeatResponse{}
req1.StoreReport.Step = 1
req1.StoreReport.Step = recoveryController.step
recoveryController.HandleStoreHeartbeat(req1, resp1)
req2 := newStoreHeartbeat(2, reports[2])
resp2 := &pdpb.StoreHeartbeatResponse{}
req2.StoreReport.Step = 1
req2.StoreReport.Step = recoveryController.step
recoveryController.HandleStoreHeartbeat(req2, resp2)
re.Equal(ForceLeader, recoveryController.GetStage())
recoveryController.HandleStoreHeartbeat(req1, resp1)
Expand Down Expand Up @@ -545,7 +545,7 @@ func TestForceLeaderForCommitMerge(t *testing.T) {

req := newStoreHeartbeat(1, reports[1])
resp := &pdpb.StoreHeartbeatResponse{}
req.StoreReport.Step = 1
req.StoreReport.Step = recoveryController.step
recoveryController.HandleStoreHeartbeat(req, resp)
re.Equal(ForceLeaderForCommitMerge, recoveryController.GetStage())

Expand Down Expand Up @@ -653,7 +653,7 @@ func TestAutoDetectWithOneLearner(t *testing.T) {
},
}
req := newStoreHeartbeat(1, &storeReport)
req.StoreReport.Step = 1
req.StoreReport.Step = recoveryController.step
resp := &pdpb.StoreHeartbeatResponse{}
recoveryController.HandleStoreHeartbeat(req, resp)
hasStore3AsFailedStore := false
Expand Down Expand Up @@ -1167,7 +1167,7 @@ func TestExecutionTimeout(t *testing.T) {
resp := &pdpb.StoreHeartbeatResponse{}
recoveryController.HandleStoreHeartbeat(req, resp)
re.Equal(ExitForceLeader, recoveryController.GetStage())
req.StoreReport = &pdpb.StoreReport{Step: 2}
req.StoreReport = &pdpb.StoreReport{Step: recoveryController.step}
recoveryController.HandleStoreHeartbeat(req, resp)
re.Equal(Failed, recoveryController.GetStage())

Expand Down Expand Up @@ -1231,7 +1231,7 @@ func TestExitForceLeader(t *testing.T) {
IsForceLeader: true,
},
},
Step: 1,
Step: recoveryController.step,
},
}

Expand Down Expand Up @@ -1317,7 +1317,7 @@ func TestStep(t *testing.T) {
re.Equal(CollectReport, recoveryController.GetStage())

// valid store report
req.StoreReport.Step = 1
req.StoreReport.Step = recoveryController.step
recoveryController.HandleStoreHeartbeat(req, resp)
re.Equal(ForceLeader, recoveryController.GetStage())

Expand Down Expand Up @@ -1547,7 +1547,7 @@ func TestRangeOverlap1(t *testing.T) {
RegionEpoch: &metapb.RegionEpoch{ConfVer: 7, Version: 10},
Peers: []*metapb.Peer{
{Id: 11, StoreId: 1}, {Id: 12, StoreId: 4}, {Id: 13, StoreId: 5}}}}},
}, Step: 1},
}, Step: recoveryController.step},
2: {PeerReports: []*pdpb.PeerReport{
{
RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10, HardState: &eraftpb.HardState{Term: 1, Commit: 10}},
Expand All @@ -1559,7 +1559,7 @@ func TestRangeOverlap1(t *testing.T) {
RegionEpoch: &metapb.RegionEpoch{ConfVer: 5, Version: 8},
Peers: []*metapb.Peer{
{Id: 21, StoreId: 2}, {Id: 22, StoreId: 4}, {Id: 23, StoreId: 5}}}}},
}, Step: 1},
}, Step: recoveryController.step},
3: {PeerReports: []*pdpb.PeerReport{
{
RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10, HardState: &eraftpb.HardState{Term: 1, Commit: 10}},
Expand All @@ -1571,7 +1571,7 @@ func TestRangeOverlap1(t *testing.T) {
RegionEpoch: &metapb.RegionEpoch{ConfVer: 4, Version: 6},
Peers: []*metapb.Peer{
{Id: 31, StoreId: 3}, {Id: 32, StoreId: 4}, {Id: 33, StoreId: 5}}}}},
}, Step: 1},
}, Step: recoveryController.step},
}

advanceUntilFinished(re, recoveryController, reports)
Expand Down Expand Up @@ -1747,6 +1747,67 @@ func TestRemoveFailedStores(t *testing.T) {
}, 60, false))
}

func TestAbortFailedStoresRemoval(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opts := mockconfig.NewTestOptions()
cluster := mockcluster.NewCluster(ctx, opts)
coordinator := schedule.NewCoordinator(ctx, cluster, hbstream.NewTestHeartbeatStreams(ctx, cluster, true))
coordinator.Run()
for _, store := range newTestStores(2, "6.0.0") {
cluster.PutStore(store)
}
recoveryController := NewController(cluster)
re.Error(recoveryController.AbortFailedStoresRemoval())

re.NoError(recoveryController.RemoveFailedStores(map[uint64]struct{}{
1: {},
}, 60, false))
re.NoError(recoveryController.AbortFailedStoresRemoval())
re.Equal(ExitForceLeader, recoveryController.GetStage())
re.Contains(recoveryController.output[1].Details[0], "aborted by operator")

resp := &pdpb.StoreHeartbeatResponse{}
recoveryController.HandleStoreHeartbeat(newStoreHeartbeat(2, nil), resp)
re.NotNil(resp.GetRecoveryPlan())
re.Empty(resp.GetRecoveryPlan().GetForceLeader().GetEnterForceLeaders())
re.Empty(resp.GetRecoveryPlan().GetCreates())
re.Empty(resp.GetRecoveryPlan().GetDemotes())
re.Empty(resp.GetRecoveryPlan().GetTombstones())

report := &pdpb.StoreReport{Step: resp.GetRecoveryPlan().GetStep()}
recoveryController.HandleStoreHeartbeat(newStoreHeartbeat(2, report), &pdpb.StoreHeartbeatResponse{})
re.Equal(Failed, recoveryController.GetStage())
}

func TestUnsafeRecoveryStepIsUniqueAcrossRuns(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

opts := mockconfig.NewTestOptions()
cluster := mockcluster.NewCluster(ctx, opts)
coordinator := schedule.NewCoordinator(ctx, cluster, hbstream.NewTestHeartbeatStreams(ctx, cluster, true))
coordinator.Run()
for _, store := range newTestStores(1, "6.0.0") {
cluster.PutStore(store)
}
recoveryController := NewController(cluster)
re.NoError(recoveryController.RemoveFailedStores(nil, 60, true))
oldStep := recoveryController.step
recoveryController.changeStage(Failed)

re.NoError(recoveryController.RemoveFailedStores(nil, 60, true))
newStep := recoveryController.step
re.NotEqual(oldStep, newStep)

recoveryController.HandleStoreHeartbeat(newStoreHeartbeat(1, &pdpb.StoreReport{Step: oldStep}), &pdpb.StoreHeartbeatResponse{})
re.Equal(CollectReport, recoveryController.GetStage())
re.Nil(recoveryController.storeReports[1])
}

func TestRunning(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down
2 changes: 2 additions & 0 deletions server/api/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router {
unsafeOperationHandler := newUnsafeOperationHandler(svr, rd)
registerFunc(clusterRouter, "/admin/unsafe/remove-failed-stores",
unsafeOperationHandler.RemoveFailedStores, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/admin/unsafe/remove-failed-stores/abort",
unsafeOperationHandler.AbortFailedStoresRemoval, setMethods(http.MethodPost), setAuditBackend(localLog, prometheus))
registerFunc(clusterRouter, "/admin/unsafe/remove-failed-stores/show",
unsafeOperationHandler.GetFailedStoresRemovalStatus, setMethods(http.MethodGet), setAuditBackend(prometheus))

Expand Down
19 changes: 19 additions & 0 deletions server/api/unsafe_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,25 @@ func (h *unsafeOperationHandler) RemoveFailedStores(w http.ResponseWriter, r *ht
h.rd.JSON(w, http.StatusOK, "Request has been accepted.")
}

// AbortFailedStoresRemoval aborts the current failed stores removal.
//
// @Tags unsafe
// @Summary Abort the current failed stores removal.
// @Produce json
//
// Success 200 {string} string "Request has been accepted."
// Failure 500 {string} string "PD server failed to proceed the request."
//
// @Router /admin/unsafe/remove-failed-stores/abort [post]
func (h *unsafeOperationHandler) AbortFailedStoresRemoval(w http.ResponseWriter, r *http.Request) {
rc := getCluster(r)
if err := rc.GetUnsafeRecoveryController().AbortFailedStoresRemoval(); err != nil {
h.rd.JSON(w, http.StatusInternalServerError, err.Error())
return
}
h.rd.JSON(w, http.StatusOK, "Request has been accepted.")
Comment on lines +85 to +101
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Return a 4xx for “no ongoing unsafe recovery” instead of 500.

AbortFailedStoresRemoval() can fail because the request is invalid in the current state, but this handler always turns that into an internal-server-error response. That misclassifies operator misuse as a PD fault and makes the new endpoint harder to automate correctly. The Swagger annotation should be updated alongside the handler.

As per coding guidelines, HTTP handlers must validate payloads and return proper status codes; avoid panics.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@server/api/unsafe_operation.go` around lines 85 - 101, The handler
AbortFailedStoresRemoval currently maps all errors from
rc.GetUnsafeRecoveryController().AbortFailedStoresRemoval() to 500; update the
handler to detect the controller's specific "no ongoing unsafe recovery" /
invalid-state error (e.g. compare with a sentinel error like
ErrNoOngoingUnsafeRecovery or an exported invalid-request error via errors.Is)
and return a 4xx (400 or 409) for that case while keeping 500 for real server
failures, and also update the Swagger comment above AbortFailedStoresRemoval to
include the new 4xx Failure case; locate the logic in AbortFailedStoresRemoval
(server/api/unsafe_operation.go) and the controller method
AbortFailedStoresRemoval() to identify the appropriate error value to match.

}

// GetFailedStoresRemovalStatus gets the current status of failed stores removal.
//
// @Tags unsafe
Expand Down
3 changes: 3 additions & 0 deletions tests/server/api/unsafe_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ func (suite *unsafeOperationTestSuite) checkRemoveFailedStores(cluster *tests.Te
err = testutil.CheckPostJSON(tests.TestDialClient, urlPrefix+"/remove-failed-stores", data, testutil.StatusOK(re))
re.NoError(err)

err = testutil.CheckPostJSON(tests.TestDialClient, urlPrefix+"/remove-failed-stores/abort", nil, testutil.StatusOK(re))
re.NoError(err)

// Test show
var output []unsaferecovery.StageOutput
err = testutil.ReadGetJSON(re, tests.TestDialClient, urlPrefix+"/remove-failed-stores/show", &output)
Expand Down
15 changes: 15 additions & 0 deletions tools/pd-ctl/pdctl/command/unsafe_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func NewRemoveFailedStoresCommand() *cobra.Command {
Note: DO NOT RECOMMEND to use this flag for general use, it's used only for case that PD doesn't have the store information of failed stores after pd-recover;
Note: Do it with caution to make sure all live stores's heartbeats has been reported PD already, otherwise it may regarded some stores as failed mistakenly.`)
cmd.AddCommand(NewRemoveFailedStoresShowCommand())
cmd.AddCommand(NewRemoveFailedStoresAbortCommand())
return cmd
}

Expand All @@ -59,6 +60,15 @@ func NewRemoveFailedStoresShowCommand() *cobra.Command {
}
}

// NewRemoveFailedStoresAbortCommand returns the unsafe remove failed stores abort command.
func NewRemoveFailedStoresAbortCommand() *cobra.Command {
return &cobra.Command{
Use: "abort",
Short: "Abort the current failed stores removal",
Run: removeFailedStoresAbortCommandFunc,
}
}

func removeFailedStoresCommandFunc(cmd *cobra.Command, args []string) {
prefix := fmt.Sprintf("%s/remove-failed-stores", unsafePrefix)
postInput := make(map[string]any, 3)
Expand Down Expand Up @@ -117,3 +127,8 @@ func removeFailedStoresShowCommandFunc(cmd *cobra.Command, _ []string) {
}
cmd.Println(resp)
}

func removeFailedStoresAbortCommandFunc(cmd *cobra.Command, _ []string) {
prefix := fmt.Sprintf("%s/remove-failed-stores/abort", unsafePrefix)
postJSON(cmd, prefix, nil)
}
3 changes: 3 additions & 0 deletions tools/pd-ctl/tests/unsafe/unsafe_operation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ func TestRemoveFailedStores(t *testing.T) {
args = []string{"-u", pdAddr, "unsafe", "remove-failed-stores", "1,2,3", "--timeout", "abc"}
_, err = tests.ExecuteCommand(cmd, args...)
re.Error(err)
args = []string{"-u", pdAddr, "unsafe", "remove-failed-stores", "abort"}
_, err = tests.ExecuteCommand(cmd, args...)
re.NoError(err)
args = []string{"-u", pdAddr, "unsafe", "remove-failed-stores", "show"}
_, err = tests.ExecuteCommand(cmd, args...)
re.NoError(err)
Expand Down
Loading