diff --git a/crates/walrus-service/src/node/consistency_check.rs b/crates/walrus-service/src/node/consistency_check.rs index 16816129d4..54cccb4d3b 100644 --- a/crates/walrus-service/src/node/consistency_check.rs +++ b/crates/walrus-service/src/node/consistency_check.rs @@ -27,7 +27,9 @@ use super::{ blob_sync::BlobSyncHandler, storage::blob_info::{ BlobInfoIterator, + PerObjectBlobInfo, PerObjectBlobInfoIterator, + PerObjectPooledBlobInfo, PerObjectPooledBlobInfoIterator, }, }; @@ -478,14 +480,21 @@ fn certified_blob_consistency_check( /// Compose the digest of the object list returned by the iterator. /// /// Works for any per-object table keyed by `ObjectID` (the regular and the pooled per-object blob -/// info tables). `scan_counter` keeps track of the number of objects scanned. -fn compose_blob_object_list_digest( +/// info tables). When `extra_hash_input` is `Some`, the bytes it returns for each value are folded +/// into the digest in addition to the key; this lets callers include value fields (such as the +/// storage pool ID for pooled blobs) that must also be consistent across nodes. Callers that only +/// need the key to be consistent pass `None`. `scan_counter` keeps track of the number of objects +/// scanned. +fn compose_blob_object_list_digest( blob_info_iter: I, epoch: Epoch, scan_counter: &IntCounterVec, + extra_hash_input: Option, ) -> Result where B: AsRef<[u8]>, + E: AsRef<[u8]>, + F: Fn(&T) -> E, I: Iterator>, { let epoch_bucket = get_epoch_bucket(epoch); @@ -497,6 +506,9 @@ where match item { Ok(blob_info) => { hasher.write(blob_info.0.as_ref()); + if let Some(extra_hash_input) = &extra_hash_input { + hasher.write(extra_hash_input(&blob_info.1).as_ref()); + } walrus_utils::with_label!(scan_counter, epoch_bucket).inc(); } Err(error) => { @@ -527,6 +539,11 @@ fn compose_certified_object_blob_list_digest( &node .metrics .per_object_blob_info_consistency_check_certified_scanned, + // The regular per-object table only needs the object key to be consistent across nodes, + // so no extra per-value bytes are folded into the digest. This keeps the digest identical + // to before this function was generalized, preserving cross-node compatibility with nodes + // running the pre-generalization code. + None:: [u8; 0]>, ) { Ok(value) => value, Err(error) => { @@ -581,6 +598,9 @@ fn compose_certified_pooled_object_blob_list_digest( &node .metrics .per_object_pooled_blob_info_consistency_check_certified_scanned, + // Pooled blobs additionally fold the storage pool ID into the digest so that a node + // disagreeing on which pool a blob object belongs to is detected as an inconsistency. + Some(|info: &PerObjectPooledBlobInfo| info.storage_pool_id()), ) { Ok(value) => value, Err(error) => { @@ -617,3 +637,64 @@ fn compose_certified_pooled_object_blob_list_digest( } ); } + +#[cfg(test)] +mod tests { + use prometheus::{IntCounterVec, Opts}; + use sui_types::base_types::ObjectID; + use typed_store::TypedStoreError; + use walrus_core::Epoch; + + use super::compose_blob_object_list_digest; + use crate::node::storage::blob_info::PerObjectPooledBlobInfo; + + fn test_scan_counter() -> IntCounterVec { + IntCounterVec::new( + Opts::new("test_scan_counter", "scan counter for tests"), + &["epoch"], + ) + .expect("failed to create test scan counter") + } + + /// Computes the pooled per-object digest the same way the consistency check does, folding in + /// the storage pool ID of each entry. + fn pooled_digest(entries: Vec<(ObjectID, PerObjectPooledBlobInfo)>, epoch: Epoch) -> u64 { + let counter = test_scan_counter(); + compose_blob_object_list_digest( + entries.into_iter().map(Ok::<_, TypedStoreError>), + epoch, + &counter, + Some(|info: &PerObjectPooledBlobInfo| info.storage_pool_id()), + ) + .expect("digest computation should succeed") + } + + /// Two entries with the same object key but different storage pool IDs must produce different + /// digests, so that a node disagreeing on a blob object's pool is detected as a cross-node + /// inconsistency. Reusing the same pool ID reproduces the same digest. + #[test] + fn pooled_digest_folds_in_storage_pool_id() { + let object_id = ObjectID::new([7; 32]); + let blob_id = walrus_core::test_utils::blob_id_from_u64(1); + let epoch: Epoch = 5; + let entry = |pool_id| { + vec![( + object_id, + PerObjectPooledBlobInfo::new_for_test(blob_id, 2, pool_id), + )] + }; + + let digest_pool_a = pooled_digest(entry(ObjectID::new([1; 32])), epoch); + let digest_pool_b = pooled_digest(entry(ObjectID::new([2; 32])), epoch); + + assert_ne!( + digest_pool_a, digest_pool_b, + "digest must differ when an entry's storage pool ID differs" + ); + assert_eq!( + digest_pool_a, + pooled_digest(entry(ObjectID::new([1; 32])), epoch), + "digest must match for identical object key and storage pool ID" + ); + } +} diff --git a/crates/walrus-service/src/node/storage/blob_info/per_object_pooled_blob_info.rs b/crates/walrus-service/src/node/storage/blob_info/per_object_pooled_blob_info.rs index 0c6c30b952..f8531a960a 100644 --- a/crates/walrus-service/src/node/storage/blob_info/per_object_pooled_blob_info.rs +++ b/crates/walrus-service/src/node/storage/blob_info/per_object_pooled_blob_info.rs @@ -156,6 +156,31 @@ impl From for PerObjectPooledBlobInfo { } } +impl PerObjectPooledBlobInfo { + /// Returns the ID of the storage pool this blob object belongs to. + pub(crate) fn storage_pool_id(&self) -> ObjectID { + match self { + Self::V1(value) => value.storage_pool_id, + } + } + + /// Constructs a certified per-object pooled blob info for testing. + #[cfg(test)] + pub(crate) fn new_for_test( + blob_id: BlobId, + certified_epoch: Epoch, + storage_pool_id: ObjectID, + ) -> Self { + Self::V1(PerObjectPooledBlobInfoV1 { + blob_id, + registered_epoch: certified_epoch, + certified_epoch: Some(certified_epoch), + storage_pool_id, + event: walrus_sui::test_utils::event_id_for_testing(), + }) + } +} + impl CertifiedBlobInfoApi for PerObjectPooledBlobInfoV1 { fn is_certified(&self, current_epoch: Epoch) -> bool { // A pooled blob object is removed from the table on deletion (the pool manages its