diff --git a/crates/walrus-service/src/node/consistency_check.rs b/crates/walrus-service/src/node/consistency_check.rs index e8648b21f7..16816129d4 100644 --- a/crates/walrus-service/src/node/consistency_check.rs +++ b/crates/walrus-service/src/node/consistency_check.rs @@ -25,7 +25,11 @@ use super::{ NodeStatus, StorageNodeInner, blob_sync::BlobSyncHandler, - storage::blob_info::{BlobInfoIterator, PerObjectBlobInfoIterator}, + storage::blob_info::{ + BlobInfoIterator, + PerObjectBlobInfoIterator, + PerObjectPooledBlobInfoIterator, + }, }; /// Configuration for the consistency check. @@ -135,6 +139,12 @@ pub(super) async fn schedule_background_consistency_check( .storage .certified_per_object_blob_info_iter_before_epoch(epoch); + // Create a per-object pooled blob info iterator that takes the current blob info table as + // the snapshot. + let per_object_pooled_blob_info_iterator = node + .storage + .certified_per_object_pooled_blob_info_iter_before_epoch(epoch); + // Unblock event processing. let _ = tx.send(()); @@ -160,6 +170,12 @@ pub(super) async fn schedule_background_consistency_check( epoch, ); + compose_certified_pooled_object_blob_list_digest( + node.clone(), + per_object_pooled_blob_info_iterator, + epoch, + ); + if node .consistency_check_config .enable_blob_info_invariants_check @@ -459,13 +475,19 @@ fn certified_blob_consistency_check( }; } -/// Compose the digest of the blob list returned by the iterator. -/// `scan_counter` keeps track of the number of blobs scanned. -fn compose_blob_object_list_digest( - blob_info_iter: PerObjectBlobInfoIterator, +/// Compose the digest of the object list returned by the iterator. +/// +/// Works for any per-object table keyed by `ObjectID` (the regular and the pooled per-object blob +/// info tables). `scan_counter` keeps track of the number of objects scanned. +fn compose_blob_object_list_digest( + blob_info_iter: I, epoch: Epoch, scan_counter: &IntCounterVec, -) -> Result { +) -> Result +where + B: AsRef<[u8]>, + I: Iterator>, +{ let epoch_bucket = get_epoch_bucket(epoch); // xxhash is not a cryptographic hash function, but it is fast, has good collision @@ -541,3 +563,57 @@ fn compose_certified_object_blob_list_digest( } ); } + +/// Compose the digest of the certified pooled object blob list. +fn compose_certified_pooled_object_blob_list_digest( + node: Arc, + per_object_pooled_blob_info_iter: PerObjectPooledBlobInfoIterator, + epoch: Epoch, +) { + let _scope = monitored_scope::monitored_scope( + "EpochChange::background_certified_pooled_blob_object_info_consistency_check", + ); + let epoch_bucket = get_epoch_bucket(epoch); + + let blob_object_list_digest = match compose_blob_object_list_digest( + per_object_pooled_blob_info_iter, + epoch, + &node + .metrics + .per_object_pooled_blob_info_consistency_check_certified_scanned, + ) { + Ok(value) => value, + Err(error) => { + tracing::warn!(?error, "error when processing per object pooled blob info"); + node.metrics + .per_object_pooled_blob_info_consistency_check_error + .inc(); + return; + } + }; + + tracing::info!( + epoch, + certified_blob_hash = blob_object_list_digest, + "background per-object pooled blob info consistency check finished" + ); + #[allow(clippy::cast_possible_wrap)] // wrapping is fine here + walrus_utils::with_label!( + node.metrics.per_object_pooled_blob_info_consistency_check, + epoch_bucket + ) + .set(blob_object_list_digest as i64); + + // No-op out side of simtest. + sui_macros::fail_point_arg!( + "storage_node_certified_pooled_blob_object_digest", + |digest_map: Arc>>>| { + digest_map + .lock() + .expect("failed to lock the digest map") + .entry(epoch) + .or_insert_with(|| HashMap::new()) + .insert(node.node_capability, blob_object_list_digest); + } + ); +} diff --git a/crates/walrus-service/src/node/metrics.rs b/crates/walrus-service/src/node/metrics.rs index d4b76f3bc2..84e03290fe 100644 --- a/crates/walrus-service/src/node/metrics.rs +++ b/crates/walrus-service/src/node/metrics.rs @@ -198,6 +198,18 @@ walrus_utils::metrics::define_metric_set! { consistency check."] per_object_blob_info_consistency_check_certified_scanned: IntCounterVec["epoch"], + #[help = "The hash of the list of certified per-object pooled blobs at the beginning of \ + the epoch. Note that the label is epoch % EPOCH_BUCKET_COUNT (see consistency_check.rs)."] + per_object_pooled_blob_info_consistency_check: IntGaugeVec["epoch"], + + #[help = "The number of errors occurred when checking the consistency of the per-object \ + pooled blob info table."] + per_object_pooled_blob_info_consistency_check_error: IntCounter[], + + #[help = "The number of certified per-object pooled blobs scanned during the per-object \ + pooled blob info consistency check."] + per_object_pooled_blob_info_consistency_check_certified_scanned: IntCounterVec["epoch"], + #[help = "The ratio of fully stored blobs during the blob info consistency check."] node_blob_data_fully_stored_ratio: GaugeVec["epoch"], diff --git a/crates/walrus-service/src/node/storage.rs b/crates/walrus-service/src/node/storage.rs index 616d4069d1..2fe60d0953 100644 --- a/crates/walrus-service/src/node/storage.rs +++ b/crates/walrus-service/src/node/storage.rs @@ -43,6 +43,7 @@ use self::{ PerObjectBlobInfo, PerObjectBlobInfoIterator, PerObjectPooledBlobInfo, + PerObjectPooledBlobInfoIterator, }, constants::{ garbage_collector_last_completed_epoch_key, @@ -1307,6 +1308,19 @@ impl Storage { .certified_per_object_blob_info_iter_before_epoch(epoch, std::ops::Bound::Unbounded) } + /// Returns an iterator over the certified per-object pooled blob info before the specified + /// epoch. + pub(crate) fn certified_per_object_pooled_blob_info_iter_before_epoch( + &self, + epoch: Epoch, + ) -> PerObjectPooledBlobInfoIterator<'_> { + self.blob_info + .certified_per_object_pooled_blob_info_iter_before_epoch( + epoch, + std::ops::Bound::Unbounded, + ) + } + /// Checks internal invariants of the blob info table. pub(crate) fn blob_info_invariants_check(&self) { if let Err(error) = self.blob_info.check_invariants() { diff --git a/crates/walrus-service/src/node/storage/blob_info.rs b/crates/walrus-service/src/node/storage/blob_info.rs index afc9f9f5fe..ae5da94503 100644 --- a/crates/walrus-service/src/node/storage/blob_info.rs +++ b/crates/walrus-service/src/node/storage/blob_info.rs @@ -75,6 +75,12 @@ pub type PerObjectBlobInfoIterator<'a> = BlobInfoIter< dyn Iterator> + Send + 'a, >; +pub type PerObjectPooledBlobInfoIterator<'a> = BlobInfoIter< + ObjectID, + PerObjectPooledBlobInfo, + dyn Iterator> + Send + 'a, +>; + #[derive(Debug, Clone)] pub(super) struct BlobInfoTable { aggregate_blob_info: DBMap, @@ -517,6 +523,24 @@ impl BlobInfoTable { ) } + /// Returns an iterator over all pooled blob objects that were certified before the specified + /// epoch in the per-object pooled blob info table starting with the `starting_object_id` bound. + #[tracing::instrument(skip_all)] + pub fn certified_per_object_pooled_blob_info_iter_before_epoch( + &self, + before_epoch: Epoch, + starting_object_id_bound: Bound, + ) -> PerObjectPooledBlobInfoIterator<'_> { + BlobInfoIter::new( + Box::new( + self.per_object_pooled_blob_info + .safe_range_iter((starting_object_id_bound, Unbounded)) + .expect("per_object_pooled_blob_info cf must always exist in storage node"), + ), + before_epoch, + ) + } + /// Returns the blob info for `blob_id`. pub fn get(&self, blob_id: &BlobId) -> Result, TypedStoreError> { self.aggregate_blob_info.get(blob_id) diff --git a/crates/walrus-service/src/node/storage/blob_info/per_object_pooled_blob_info.rs b/crates/walrus-service/src/node/storage/blob_info/per_object_pooled_blob_info.rs index c1a21b9fa5..0c6c30b952 100644 --- a/crates/walrus-service/src/node/storage/blob_info/per_object_pooled_blob_info.rs +++ b/crates/walrus-service/src/node/storage/blob_info/per_object_pooled_blob_info.rs @@ -11,6 +11,7 @@ use walrus_core::{BlobId, Epoch}; use super::{ BlobStatusChangeType, + CertifiedBlobInfoApi, Mergeable, PooledBlobChangeInfo, PooledChangeTypeAndInfo, @@ -155,6 +156,34 @@ impl From for PerObjectPooledBlobInfo { } } +impl CertifiedBlobInfoApi for PerObjectPooledBlobInfoV1 { + fn is_certified(&self, current_epoch: Epoch) -> bool { + // A pooled blob object is removed from the table on deletion (the pool manages its + // lifetime), so an entry that is still present and has a certified epoch at or before + // `current_epoch` is certified. + self.certified_epoch + .is_some_and(|epoch| epoch <= current_epoch) + } + + fn initial_certified_epoch(&self) -> Option { + self.certified_epoch + } +} + +impl CertifiedBlobInfoApi for PerObjectPooledBlobInfo { + fn is_certified(&self, current_epoch: Epoch) -> bool { + match self { + Self::V1(value) => value.is_certified(current_epoch), + } + } + + fn initial_certified_epoch(&self) -> Option { + match self { + Self::V1(value) => value.initial_certified_epoch(), + } + } +} + impl ToBytes for PerObjectPooledBlobInfo {} impl Mergeable for PerObjectPooledBlobInfo { diff --git a/crates/walrus-simtest/src/test_utils.rs b/crates/walrus-simtest/src/test_utils.rs index 4fbfa7d376..643bda9be8 100644 --- a/crates/walrus-simtest/src/test_utils.rs +++ b/crates/walrus-simtest/src/test_utils.rs @@ -358,6 +358,8 @@ pub mod simtest_utils { certified_blob_digest_map: Arc>>>, // Per epoch, the per object blob digest of all nodes. per_object_blob_digest_map: Arc>>>, + // Per epoch, the per object pooled blob digest of all nodes. + per_object_pooled_blob_digest_map: Arc>>>, // Per epoch, the existence check of all nodes. blob_existence_check_map: Arc>>>, checked: Arc, @@ -372,6 +374,8 @@ pub mod simtest_utils { let certified_blob_digest_map_clone = certified_blob_digest_map.clone(); let per_object_blob_digest_map = Arc::new(Mutex::new(HashMap::new())); let per_object_blob_digest_map_clone = per_object_blob_digest_map.clone(); + let per_object_pooled_blob_digest_map = Arc::new(Mutex::new(HashMap::new())); + let per_object_pooled_blob_digest_map_clone = per_object_pooled_blob_digest_map.clone(); let blob_existence_check_map = Arc::new(Mutex::new(HashMap::new())); let blob_existence_check_map_clone = blob_existence_check_map.clone(); @@ -396,6 +400,13 @@ pub mod simtest_utils { }, ); + sui_macros::register_fail_point_arg( + "storage_node_certified_pooled_blob_object_digest", + move || -> Option>>>> { + Some(per_object_pooled_blob_digest_map_clone.clone()) + }, + ); + sui_macros::register_fail_point_arg( "storage_node_certified_blob_existence_check", move || -> Option>>>> { @@ -407,6 +418,7 @@ pub mod simtest_utils { event_source_map, certified_blob_digest_map, per_object_blob_digest_map, + per_object_pooled_blob_digest_map, blob_existence_check_map, checked: Arc::new(AtomicBool::new(false)), } @@ -443,6 +455,7 @@ pub mod simtest_utils { self.check_certified_blob_digest(min_epoch); self.check_per_object_blob_digest(min_epoch); + self.check_per_object_pooled_blob_digest(min_epoch); self.check_blob_existence(min_epoch); } @@ -480,6 +493,25 @@ pub mod simtest_utils { } } + /// Ensures that for all epochs, all nodes have the same per object pooled blob digest. + #[tracing::instrument(skip(self))] + fn check_per_object_pooled_blob_digest(&self, min_epoch: Epoch) { + tracing::info!( + "checking per object pooled blob digest consistency starting with epoch \ + {min_epoch}" + ); + let digest_map = self.per_object_pooled_blob_digest_map.lock().unwrap(); + for (epoch, node_digest_map) in digest_map.iter().sorted_by_key(|(epoch, _)| *epoch) { + if *epoch < min_epoch { + tracing::info!( + "skipping epoch {epoch} because it is before the minimum epoch {min_epoch}" + ); + continue; + } + Self::check_digests_are_equal(*epoch, node_digest_map); + } + } + fn check_digests_are_equal(epoch: Epoch, node_digest_map: &HashMap) { let mut epoch_digest = None; for (node_id, digest) in node_digest_map.iter() { @@ -532,6 +564,7 @@ pub mod simtest_utils { sui_macros::clear_fail_point("storage_node_event_index_source"); sui_macros::clear_fail_point("storage_node_certified_blob_digest"); sui_macros::clear_fail_point("storage_node_certified_blob_object_digest"); + sui_macros::clear_fail_point("storage_node_certified_pooled_blob_object_digest"); sui_macros::clear_fail_point("storage_node_certified_blob_existence_check"); } }