Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 83 additions & 2 deletions crates/walrus-service/src/node/consistency_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use super::{
blob_sync::BlobSyncHandler,
storage::blob_info::{
BlobInfoIterator,
PerObjectBlobInfo,
PerObjectBlobInfoIterator,
PerObjectPooledBlobInfo,
PerObjectPooledBlobInfoIterator,
},
};
Expand Down Expand Up @@ -478,14 +480,21 @@ fn certified_blob_consistency_check(
/// Compose the digest of the object list returned by the iterator.
///
/// Works for any per-object table keyed by `ObjectID` (the regular and the pooled per-object blob
/// info tables). `scan_counter` keeps track of the number of objects scanned.
fn compose_blob_object_list_digest<B, T, I>(
/// info tables). When `extra_hash_input` is `Some`, the bytes it returns for each value are folded
/// into the digest in addition to the key; this lets callers include value fields (such as the
/// storage pool ID for pooled blobs) that must also be consistent across nodes. Callers that only
/// need the key to be consistent pass `None`. `scan_counter` keeps track of the number of objects
/// scanned.
fn compose_blob_object_list_digest<B, T, E, F, I>(
blob_info_iter: I,
epoch: Epoch,
scan_counter: &IntCounterVec,
extra_hash_input: Option<F>,
) -> Result<u64, TypedStoreError>
where
B: AsRef<[u8]>,
E: AsRef<[u8]>,
F: Fn(&T) -> E,
I: Iterator<Item = Result<(B, T), TypedStoreError>>,
{
let epoch_bucket = get_epoch_bucket(epoch);
Expand All @@ -497,6 +506,9 @@ where
match item {
Ok(blob_info) => {
hasher.write(blob_info.0.as_ref());
if let Some(extra_hash_input) = &extra_hash_input {
hasher.write(extra_hash_input(&blob_info.1).as_ref());
}
walrus_utils::with_label!(scan_counter, epoch_bucket).inc();
}
Err(error) => {
Expand Down Expand Up @@ -527,6 +539,11 @@ fn compose_certified_object_blob_list_digest(
&node
.metrics
.per_object_blob_info_consistency_check_certified_scanned,
// The regular per-object table only needs the object key to be consistent across nodes,
// so no extra per-value bytes are folded into the digest. This keeps the digest identical
// to before this function was generalized, preserving cross-node compatibility with nodes
// running the pre-generalization code.
None::<fn(&PerObjectBlobInfo) -> [u8; 0]>,
) {
Ok(value) => value,
Err(error) => {
Expand Down Expand Up @@ -581,6 +598,9 @@ fn compose_certified_pooled_object_blob_list_digest(
&node
.metrics
.per_object_pooled_blob_info_consistency_check_certified_scanned,
// Pooled blobs additionally fold the storage pool ID into the digest so that a node
// disagreeing on which pool a blob object belongs to is detected as an inconsistency.
Some(|info: &PerObjectPooledBlobInfo| info.storage_pool_id()),
) {
Ok(value) => value,
Err(error) => {
Expand Down Expand Up @@ -617,3 +637,64 @@ fn compose_certified_pooled_object_blob_list_digest(
}
);
}

#[cfg(test)]
mod tests {
use prometheus::{IntCounterVec, Opts};
use sui_types::base_types::ObjectID;
use typed_store::TypedStoreError;
use walrus_core::Epoch;

use super::compose_blob_object_list_digest;
use crate::node::storage::blob_info::PerObjectPooledBlobInfo;

fn test_scan_counter() -> IntCounterVec {
IntCounterVec::new(
Opts::new("test_scan_counter", "scan counter for tests"),
&["epoch"],
)
.expect("failed to create test scan counter")
}

/// Computes the pooled per-object digest the same way the consistency check does, folding in
/// the storage pool ID of each entry.
fn pooled_digest(entries: Vec<(ObjectID, PerObjectPooledBlobInfo)>, epoch: Epoch) -> u64 {
let counter = test_scan_counter();
compose_blob_object_list_digest(
entries.into_iter().map(Ok::<_, TypedStoreError>),
epoch,
&counter,
Some(|info: &PerObjectPooledBlobInfo| info.storage_pool_id()),
)
.expect("digest computation should succeed")
}

/// Two entries with the same object key but different storage pool IDs must produce different
/// digests, so that a node disagreeing on a blob object's pool is detected as a cross-node
/// inconsistency. Reusing the same pool ID reproduces the same digest.
#[test]
fn pooled_digest_folds_in_storage_pool_id() {
let object_id = ObjectID::new([7; 32]);
let blob_id = walrus_core::test_utils::blob_id_from_u64(1);
let epoch: Epoch = 5;
let entry = |pool_id| {
vec![(
object_id,
PerObjectPooledBlobInfo::new_for_test(blob_id, 2, pool_id),
)]
};

let digest_pool_a = pooled_digest(entry(ObjectID::new([1; 32])), epoch);
let digest_pool_b = pooled_digest(entry(ObjectID::new([2; 32])), epoch);

assert_ne!(
digest_pool_a, digest_pool_b,
"digest must differ when an entry's storage pool ID differs"
);
assert_eq!(
digest_pool_a,
pooled_digest(entry(ObjectID::new([1; 32])), epoch),
"digest must match for identical object key and storage pool ID"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,31 @@ impl From<PerObjectPooledBlobInfoV1> for PerObjectPooledBlobInfo {
}
}

impl PerObjectPooledBlobInfo {
/// Returns the ID of the storage pool this blob object belongs to.
pub(crate) fn storage_pool_id(&self) -> ObjectID {
match self {
Self::V1(value) => value.storage_pool_id,
}
}

/// Constructs a certified per-object pooled blob info for testing.
#[cfg(test)]
pub(crate) fn new_for_test(
blob_id: BlobId,
certified_epoch: Epoch,
storage_pool_id: ObjectID,
) -> Self {
Self::V1(PerObjectPooledBlobInfoV1 {
blob_id,
registered_epoch: certified_epoch,
certified_epoch: Some(certified_epoch),
storage_pool_id,
event: walrus_sui::test_utils::event_id_for_testing(),
})
}
}

impl CertifiedBlobInfoApi for PerObjectPooledBlobInfoV1 {
fn is_certified(&self, current_epoch: Epoch) -> bool {
// A pooled blob object is removed from the table on deletion (the pool manages its
Expand Down
Loading