From 3d82cab5585de133b9fd6e1530337373e88ff071 Mon Sep 17 00:00:00 2001 From: Zhe Wu Date: Thu, 11 Jun 2026 23:43:14 -0700 Subject: [PATCH 1/3] fix: do not cancel in-progress blob metadata recovery when gaining new shards When a node joins the committee, it enters RecoverMetadata status and starts a sync-shards task that first recovers all certified blob metadata before starting the individual shard syncs. If metadata recovery takes longer than an epoch and the node gains another shard in a subsequent epoch, the new start_sync_shards call aborts the in-flight task. Since the node is no longer "newly joining", the replacement task skipped metadata recovery, only started syncs for the newly gained shards, and incorrectly flipped the node status from RecoverMetadata to Active. As a result, metadata recovery was silently lost and the shards gained in the earlier epoch were orphaned at status None, with no repair on restart because the node status was already Active. Fix: sync_shards_task no longer takes recover_metadata from the caller and instead derives it from the persisted node status. When it observes RecoverMetadata, it runs the metadata recovery and then starts syncs for all shards that the node owns in the current committee and stores locally (already-active shards are skipped), so a task that aborts its predecessor adopts the predecessor's unstarted work. Stored shards the node does not own (for example, shards locked for transfer to another node in the same epoch change) are filtered out so their status is not clobbered. The RecoverMetadata -> Active transition now only happens in the task that actually performed the metadata recovery. Also adds a pause fail point in sync_certified_blob_metadata and simtests that reproduce both scenarios. --- crates/walrus-service/src/node.rs | 179 +++++++++++++++++-- crates/walrus-service/src/node/shard_sync.rs | 85 ++++----- 2 files changed, 209 insertions(+), 55 deletions(-) diff --git a/crates/walrus-service/src/node.rs b/crates/walrus-service/src/node.rs index 9386e3887d..bdc6ed099c 100644 --- a/crates/walrus-service/src/node.rs +++ b/crates/walrus-service/src/node.rs @@ -2513,7 +2513,7 @@ impl StorageNode { self.inner.set_node_status(NodeStatus::RecoverMetadata)?; } self.shard_sync_handler - .start_sync_shards(shards_gained.to_vec(), new_node_joining_committee) + .start_sync_shards(shards_gained.to_vec()) .await?; } @@ -8128,7 +8128,7 @@ mod tests { cluster.nodes[1] .storage_node .shard_sync_handler - .start_sync_shards(shard_indices, wipe_metadata_before_transfer_in_dst) + .start_sync_shards(shard_indices) .await?; // Waits for the shard to be synced. @@ -8258,7 +8258,7 @@ mod tests { cluster.nodes[1] .storage_node .shard_sync_handler - .start_sync_shards(vec![ShardIndex(0)], wipe_metadata_before_transfer_in_dst) + .start_sync_shards(vec![ShardIndex(0)]) .await?; wait_for_shard_in_active_state(shard_storage_dst.as_ref()).await?; check_all_blobs_are_synced( @@ -8337,7 +8337,7 @@ mod tests { cluster.nodes[1] .storage_node .shard_sync_handler - .start_sync_shards(vec![ShardIndex(0)], wipe_metadata_before_transfer_in_dst) + .start_sync_shards(vec![ShardIndex(0)]) .await?; wait_for_shard_in_active_state(shard_storage_dst.as_ref()).await?; check_all_blobs_are_synced( @@ -8439,7 +8439,7 @@ mod tests { cluster.nodes[1] .storage_node .shard_sync_handler - .start_sync_shards(vec![ShardIndex(0)], false) + .start_sync_shards(vec![ShardIndex(0)]) .await?; // Waits for the shard sync process to stop. @@ -8524,7 +8524,7 @@ mod tests { cluster.nodes[1] .storage_node .shard_sync_handler - .start_sync_shards(vec![ShardIndex(0)], false) + .start_sync_shards(vec![ShardIndex(0)]) .await?; // Waits for the shard sync process to stop. @@ -8641,7 +8641,7 @@ mod tests { cluster.nodes[1] .storage_node .shard_sync_handler - .start_sync_shards(vec![ShardIndex(0)], false) + .start_sync_shards(vec![ShardIndex(0)]) .await?; // Waits for the shard sync process to stop. @@ -8692,7 +8692,7 @@ mod tests { cluster.nodes[1] .storage_node .shard_sync_handler - .start_sync_shards(vec![ShardIndex(0)], false) + .start_sync_shards(vec![ShardIndex(0)]) .await?; // Waits for the shard sync process to stop. @@ -8822,7 +8822,7 @@ mod tests { cluster.nodes[1] .storage_node .shard_sync_handler - .start_sync_shards(vec![ShardIndex(0)], false) + .start_sync_shards(vec![ShardIndex(0)]) .await?; // Waits for the shard sync process to stop. @@ -8905,7 +8905,7 @@ mod tests { cluster.nodes[1] .storage_node .shard_sync_handler - .start_sync_shards(vec![ShardIndex(0)], false) + .start_sync_shards(vec![ShardIndex(0)]) .await?; // Waits for the shard sync process to stop. wait_until_no_sync_tasks(&cluster.nodes[1].storage_node.shard_sync_handler).await?; @@ -8998,7 +8998,7 @@ mod tests { cluster.nodes[1] .storage_node .shard_sync_handler - .start_sync_shards(vec![ShardIndex(0)], true) + .start_sync_shards(vec![ShardIndex(0)]) .await?; // Waits for the shard sync process to stop. @@ -9042,6 +9042,159 @@ mod tests { Ok(()) } + // Tests that gaining a new shard in a later epoch while blob metadata recovery from a + // previous epoch is still in progress neither cancels the metadata recovery nor orphans + // the shards gained in the previous epoch. + #[walrus_simtest] + async fn sync_shard_new_epoch_does_not_cancel_metadata_recovery() -> TestResult { + use std::sync::atomic::{AtomicBool, Ordering}; + + let assignment: &[&[u16]] = &[&[0, 1], &[2, 3]]; + let (cluster, blob_details, storage_dst, shard_storage_set) = + setup_cluster_for_shard_sync_tests(Some(assignment), None, false).await?; + + assert_eq!(shard_storage_set.shard_storage.len(), 2); + let shard_storage_dst_0 = shard_storage_set.shard_storage[0].clone(); + let shard_storage_dst_1 = shard_storage_set.shard_storage[1].clone(); + + // Pause metadata recovery so that it is still in progress when the node gains + // another shard in the next epoch. + let metadata_sync_entered = Arc::new(Notify::new()); + let metadata_sync_entered_clone = metadata_sync_entered.clone(); + let release_metadata_sync = Arc::new(AtomicBool::new(false)); + let release_metadata_sync_clone = release_metadata_sync.clone(); + register_fail_point_async("fail_point_shard_sync_recovery_metadata_pause", move || { + let entered = metadata_sync_entered_clone.clone(); + let release = release_metadata_sync_clone.clone(); + async move { + entered.notify_one(); + while !release.load(Ordering::SeqCst) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + } + }); + + storage_dst.clear_metadata_in_test()?; + storage_dst.set_node_status(NodeStatus::RecoverMetadata)?; + + let shard_sync_handler = &cluster.nodes[1].storage_node.shard_sync_handler; + + // The node joins the committee and gains shard 0; metadata must be recovered first. + shard_sync_handler + .start_sync_shards(vec![ShardIndex(0)]) + .await?; + + // Wait until metadata recovery is in progress, parked at the pause fail point. + tokio::time::timeout(Duration::from_secs(30), metadata_sync_entered.notified()) + .await + .map_err(|_| anyhow::anyhow!("metadata recovery did not start"))?; + + // Node 1 no longer owns its original shards 2 and 3 in the current committee (they + // were handed to node 0 when shards 0 and 1 were reassigned), but their storages + // still exist. Lock shard 3 as if it were being transferred out, to verify that the + // restarted sync-shards task does not clobber shards the node does not own. + let shard_storage_dst_3 = storage_dst + .shard_storage(ShardIndex(3)) + .await + .expect("shard storage should exist"); + shard_storage_dst_3.lock_shard_for_epoch_change().await?; + + // The node gains shard 1 in the next epoch while metadata recovery is still in + // progress; this aborts and replaces the previous sync-shards task. + shard_sync_handler + .start_sync_shards(vec![ShardIndex(1)]) + .await?; + + // Release the paused metadata recovery. + release_metadata_sync.store(true, Ordering::SeqCst); + + wait_until_no_sync_tasks(shard_sync_handler).await?; + + // Metadata recovery must have completed and both shards must be fully synced. + assert_eq!(storage_dst.node_status()?, NodeStatus::Active); + assert_eq!(shard_storage_dst_0.status().await?, ShardStatus::Active); + assert_eq!(shard_storage_dst_1.status().await?, ShardStatus::Active); + + check_all_blobs_are_synced(&blob_details, &storage_dst, &shard_storage_dst_0, &[])?; + assert_eq!( + shard_storage_dst_1.sliver_count(SliverType::Primary), + Ok(23) + ); + assert_eq!( + shard_storage_dst_1.sliver_count(SliverType::Secondary), + Ok(23) + ); + + // The unowned, locked shard must not have been touched by the sync-shards task. + assert_eq!( + shard_storage_dst_3.status().await?, + ShardStatus::LockedToMove + ); + + clear_fail_point("fail_point_shard_sync_recovery_metadata_pause"); + + Ok(()) + } + + // Tests that the sync-shards task in metadata recovery only syncs shards the node owns + // in the current committee. The node may still hold storages for shards it no longer + // owns (for example, shards locked for transfer to another node during the same epoch + // change); those must not have their status clobbered or be synced. + #[walrus_simtest] + async fn sync_shard_recover_metadata_skips_unowned_shards() -> TestResult { + let assignment: &[&[u16]] = &[&[0, 1], &[2, 3]]; + let (cluster, blob_details, storage_dst, shard_storage_set) = + setup_cluster_for_shard_sync_tests(Some(assignment), None, false).await?; + + assert_eq!(shard_storage_set.shard_storage.len(), 2); + let shard_storage_dst_0 = shard_storage_set.shard_storage[0].clone(); + let shard_storage_dst_1 = shard_storage_set.shard_storage[1].clone(); + + // Node 1 no longer owns its original shards 2 and 3 in the current committee (they + // were handed to node 0 when shards 0 and 1 were reassigned), but their storages + // still exist. Lock shard 3 as if it were being transferred out. + let shard_storage_dst_3 = storage_dst + .shard_storage(ShardIndex(3)) + .await + .expect("shard storage should exist"); + shard_storage_dst_3.lock_shard_for_epoch_change().await?; + + storage_dst.clear_metadata_in_test()?; + storage_dst.set_node_status(NodeStatus::RecoverMetadata)?; + + cluster.nodes[1] + .storage_node + .shard_sync_handler + .start_sync_shards(vec![ShardIndex(0), ShardIndex(1)]) + .await?; + + wait_until_no_sync_tasks(&cluster.nodes[1].storage_node.shard_sync_handler).await?; + + // The owned shards must be synced and the node must be active. + assert_eq!(storage_dst.node_status()?, NodeStatus::Active); + assert_eq!(shard_storage_dst_0.status().await?, ShardStatus::Active); + assert_eq!(shard_storage_dst_1.status().await?, ShardStatus::Active); + check_all_blobs_are_synced(&blob_details, &storage_dst, &shard_storage_dst_0, &[])?; + + // The unowned shards must not have been touched: shard 3 stays locked, and shard 2 + // keeps its status. + assert_eq!( + shard_storage_dst_3.status().await?, + ShardStatus::LockedToMove + ); + assert_eq!( + storage_dst + .shard_storage(ShardIndex(2)) + .await + .expect("shard storage should exist") + .status() + .await?, + ShardStatus::Active + ); + + Ok(()) + } + #[walrus_simtest] async fn finish_epoch_change_start_should_not_block_event_processing() -> TestResult { walrus_test_utils::init_tracing(); @@ -9314,7 +9467,7 @@ mod tests { cluster.nodes[1] .storage_node .shard_sync_handler - .start_sync_shards(vec![ShardIndex(0)], false) + .start_sync_shards(vec![ShardIndex(0)]) .await?; // Shard recovery should be completed, and all the data should be synced. @@ -9386,7 +9539,7 @@ mod tests { cluster.nodes[1] .storage_node .shard_sync_handler - .start_sync_shards(vec![ShardIndex(0)], true) + .start_sync_shards(vec![ShardIndex(0)]) .await?; tokio::time::sleep(Duration::from_secs(1)).await; diff --git a/crates/walrus-service/src/node/shard_sync.rs b/crates/walrus-service/src/node/shard_sync.rs index c6f50b1916..ae60c99895 100644 --- a/crates/walrus-service/src/node/shard_sync.rs +++ b/crates/walrus-service/src/node/shard_sync.rs @@ -9,7 +9,7 @@ use std::{ use futures::{StreamExt, stream::FuturesUnordered}; #[cfg(msim)] -use sui_macros::{fail_point_arg, fail_point_if}; +use sui_macros::{fail_point_arg, fail_point_async, fail_point_if}; use tokio::{ sync::{Mutex, Semaphore}, time::Instant, @@ -67,46 +67,65 @@ impl ShardSyncHandler { } } - /// Starts sync shards. If `recover_metadata` is true, syncs certified blob metadata before - /// syncing shards. + /// Starts sync shards. If the node status is [`NodeStatus::RecoverMetadata`], syncs certified + /// blob metadata before syncing shards. pub async fn start_sync_shards( &self, shards: Vec, - recover_metadata: bool, ) -> Result<(), SyncShardClientError> { let mut task_handle = self.task_handle.lock().await; let sync_handler = self.clone(); // If there is an existing task, we need to abort it first before starting a new one. + // Aborting is safe: the task derives its work from the persisted node status and shard + // statuses, so the new task picks up everything the old task has not finished yet, + // including blob metadata recovery. Note that aborting the task does not cancel the + // individual shard sync tasks it has already started; those are tracked separately in + // `shard_sync_in_progress`. if let Some(old_task) = task_handle.take() { old_task.abort(); } - let new_task = tokio::spawn(async move { - sync_handler - .sync_shards_task(shards, recover_metadata) - .await - }); + let new_task = tokio::spawn(async move { sync_handler.sync_shards_task(shards).await }); *task_handle = Some(new_task); Ok(()) } - async fn sync_shards_task(&self, shards: Vec, recover_metadata: bool) { - if recover_metadata { - let node_status = self - .node - .storage - .node_status() - .expect("failed to read node status from db"); - assert_eq!(node_status, NodeStatus::RecoverMetadata); + async fn sync_shards_task(&self, shards: Vec) { + let node_status = self + .node + .storage + .node_status() + .expect("failed to read node status from db"); + let shards = if node_status == NodeStatus::RecoverMetadata { if let Err(err) = self.sync_certified_blob_metadata().await { tracing::error!(?err, "failed to sync blob metadata; aborting shard sync"); return; } - } + + // While the node is recovering metadata, sync all shards the node currently owns + // instead of only the shards passed by the caller: this task may have aborted a + // previous sync-shards task (for example, when gaining shards in a subsequent epoch + // while metadata recovery is still in progress) before that task could start the + // syncs for its own shards. Shards that are already active are skipped in + // `start_new_shard_sync`. Stored shards that the node does not own in the current + // committee (for example, shards locked for transfer to another node) must not be + // synced and are filtered out here. + let owned_shards = self.node.owned_shards_at_latest_epoch(); + self.node + .storage + .existing_shard_storages() + .await + .iter() + .map(|s| s.id()) + .filter(|shard| owned_shards.contains(shard)) + .collect() + } else { + shards + }; // Start sync for each shard for shard in shards { @@ -118,12 +137,8 @@ impl ShardSyncHandler { // Once we have started the shard sync task, the shard status has been persisted to // disk, so we can mark the node as active. Any restart from this point will re-start - // the shard sync tasks only without syncing metadata again. - let node_status = self - .node - .storage - .node_status() - .expect("failed to read node status from db"); + // the shard sync tasks only without syncing metadata again. Only the task that performed + // the metadata recovery (observed `RecoverMetadata` above) may flip the status. if node_status == NodeStatus::RecoverMetadata { self.node .set_node_status(NodeStatus::Active) @@ -147,6 +162,7 @@ impl ShardSyncHandler { #[cfg(msim)] { inject_recovery_metadata_failure_before_fetch()?; + fail_point_async!("fail_point_shard_sync_recovery_metadata_pause"); } let mut futures = FuturesUnordered::new(); @@ -283,24 +299,9 @@ impl ShardSyncHandler { pub async fn restart_syncs(&self) -> Result<(), anyhow::Error> { let current_node_status = self.node.storage.node_status()?; if current_node_status == NodeStatus::RecoverMetadata { - let shards_to_sync = self - .node - .storage - .existing_shard_storages() - .await - .iter() - .map(|s| s.id()) - .collect::>(); - - let sync_handler_clone = self.clone(); - self.task_handle - .lock() - .await - .replace(tokio::spawn(async move { - sync_handler_clone - .sync_shards_task(shards_to_sync, true) - .await - })); + // The task observes the `RecoverMetadata` status and derives the shards to sync from + // the existing shard storages. + self.start_sync_shards(vec![]).await?; } else { for shard_storage in self.node.storage.existing_shard_storages().await { let shard_status = shard_storage From 40ecd7fc4da94f3d4d9e1b2db64a8cc88f84f5cd Mon Sep 17 00:00:00 2001 From: Zhe Wu Date: Thu, 18 Jun 2026 22:18:30 -0700 Subject: [PATCH 2/3] fix: re-read node status before flipping to Active after metadata recovery Metadata recovery can run for a long time, during which a concurrent path (for example, entering recovery or dropping out of the committee at an epoch change) may move the node out of RecoverMetadata. Reusing the node status read at the start of sync_shards_task could therefore clobber such a transition back to Active. Re-read the status from the db immediately before the compare-and-set so we only flip to Active when the node is still recovering metadata. Adds a simtest that pauses metadata recovery, changes the node status concurrently, and asserts the change is not clobbered. --- crates/walrus-service/src/node.rs | 61 ++++++++++++++++++++ crates/walrus-service/src/node/shard_sync.rs | 14 ++++- 2 files changed, 73 insertions(+), 2 deletions(-) diff --git a/crates/walrus-service/src/node.rs b/crates/walrus-service/src/node.rs index bdc6ed099c..e143ab5881 100644 --- a/crates/walrus-service/src/node.rs +++ b/crates/walrus-service/src/node.rs @@ -9195,6 +9195,67 @@ mod tests { Ok(()) } + // Tests that the sync-shards task does not flip the node to `Active` if the node has + // moved out of `RecoverMetadata` while the metadata recovery was in progress. Metadata + // recovery can run for a long time, during which a concurrent path (for example, + // entering recovery) may change the node status; the task must re-read the status before + // the flip rather than reuse the value it read when it started. + #[walrus_simtest] + async fn sync_shard_recover_metadata_does_not_clobber_concurrent_status_change() + -> TestResult { + use std::sync::atomic::{AtomicBool, Ordering}; + + let (cluster, _blob_details, storage_dst, _shard_storage_set) = + setup_cluster_for_shard_sync_tests(None, None, false).await?; + + // Pause metadata recovery so we can change the node status while it is in progress. + let metadata_sync_entered = Arc::new(Notify::new()); + let metadata_sync_entered_clone = metadata_sync_entered.clone(); + let release_metadata_sync = Arc::new(AtomicBool::new(false)); + let release_metadata_sync_clone = release_metadata_sync.clone(); + register_fail_point_async("fail_point_shard_sync_recovery_metadata_pause", move || { + let entered = metadata_sync_entered_clone.clone(); + let release = release_metadata_sync_clone.clone(); + async move { + entered.notify_one(); + while !release.load(Ordering::SeqCst) { + tokio::time::sleep(Duration::from_millis(10)).await; + } + } + }); + + storage_dst.clear_metadata_in_test()?; + storage_dst.set_node_status(NodeStatus::RecoverMetadata)?; + + let shard_sync_handler = &cluster.nodes[1].storage_node.shard_sync_handler; + shard_sync_handler + .start_sync_shards(vec![ShardIndex(0)]) + .await?; + + // Wait until metadata recovery is in progress, parked at the pause fail point. + tokio::time::timeout(Duration::from_secs(30), metadata_sync_entered.notified()) + .await + .map_err(|_| anyhow::anyhow!("metadata recovery did not start"))?; + + // Simulate a concurrent path moving the node out of `RecoverMetadata` while the + // metadata recovery task is still running. + cluster.nodes[1] + .storage_node + .inner + .set_node_status(NodeStatus::RecoveryCatchUp)?; + + // Release the paused metadata recovery and let the task finish. + release_metadata_sync.store(true, Ordering::SeqCst); + wait_until_no_sync_tasks(shard_sync_handler).await?; + + // The task must not have clobbered the concurrent status change back to `Active`. + assert_eq!(storage_dst.node_status()?, NodeStatus::RecoveryCatchUp); + + clear_fail_point("fail_point_shard_sync_recovery_metadata_pause"); + + Ok(()) + } + #[walrus_simtest] async fn finish_epoch_change_start_should_not_block_event_processing() -> TestResult { walrus_test_utils::init_tracing(); diff --git a/crates/walrus-service/src/node/shard_sync.rs b/crates/walrus-service/src/node/shard_sync.rs index ae60c99895..be2c17deaf 100644 --- a/crates/walrus-service/src/node/shard_sync.rs +++ b/crates/walrus-service/src/node/shard_sync.rs @@ -137,8 +137,18 @@ impl ShardSyncHandler { // Once we have started the shard sync task, the shard status has been persisted to // disk, so we can mark the node as active. Any restart from this point will re-start - // the shard sync tasks only without syncing metadata again. Only the task that performed - // the metadata recovery (observed `RecoverMetadata` above) may flip the status. + // the shard sync tasks only without syncing metadata again. + // + // Re-read the node status from the db instead of reusing the value read at the start of + // this task: metadata recovery can run for a long time, during which a concurrent path + // (for example, entering recovery or dropping out of the committee at an epoch change) + // may have moved the node to another status. We must only flip to `Active` if the node + // is still recovering metadata, so that we do not clobber such a transition. + let node_status = self + .node + .storage + .node_status() + .expect("failed to read node status from db"); if node_status == NodeStatus::RecoverMetadata { self.node .set_node_status(NodeStatus::Active) From 59034d589e493dc6f68179960512f0c275e0146b Mon Sep 17 00:00:00 2001 From: Zhe Wu Date: Sat, 27 Jun 2026 22:53:51 -0700 Subject: [PATCH 3/3] refactor: extract restart_metadata_sync helper for clarity Address review nit on PR #3462: passing vec![] to start_sync_shards from restart_syncs read awkwardly since the argument is ignored in the metadata-recovery path. Wrap it in a named restart_metadata_sync helper whose doc spells out the RecoverMetadata precondition and the no-op fallback, so the name does not overpromise a guarantee the body does not enforce. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/walrus-service/src/node/shard_sync.rs | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/crates/walrus-service/src/node/shard_sync.rs b/crates/walrus-service/src/node/shard_sync.rs index be2c17deaf..a98d0b7056 100644 --- a/crates/walrus-service/src/node/shard_sync.rs +++ b/crates/walrus-service/src/node/shard_sync.rs @@ -93,6 +93,16 @@ impl ShardSyncHandler { Ok(()) } + /// Restarts blob metadata recovery, and the subsequent syncs of the shards the node owns. + /// + /// The caller must ensure the persisted node status is [`NodeStatus::RecoverMetadata`]; the + /// spawned task re-reads the status and only performs metadata recovery in that state, + /// otherwise it is a no-op (an empty shard list is passed because the task derives the real + /// shards from the owned shard storages itself). + async fn restart_metadata_sync(&self) -> Result<(), SyncShardClientError> { + self.start_sync_shards(Vec::new()).await + } + async fn sync_shards_task(&self, shards: Vec) { let node_status = self .node @@ -311,7 +321,7 @@ impl ShardSyncHandler { if current_node_status == NodeStatus::RecoverMetadata { // The task observes the `RecoverMetadata` status and derives the shards to sync from // the existing shard storages. - self.start_sync_shards(vec![]).await?; + self.restart_metadata_sync().await?; } else { for shard_storage in self.node.storage.existing_shard_storages().await { let shard_status = shard_storage