Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 82 additions & 6 deletions crates/walrus-service/src/node/consistency_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ use super::{
NodeStatus,
StorageNodeInner,
blob_sync::BlobSyncHandler,
storage::blob_info::{BlobInfoIterator, PerObjectBlobInfoIterator},
storage::blob_info::{
BlobInfoIterator,
PerObjectBlobInfoIterator,
PerObjectPooledBlobInfoIterator,
},
};

/// Configuration for the consistency check.
Expand Down Expand Up @@ -135,6 +139,12 @@ pub(super) async fn schedule_background_consistency_check(
.storage
.certified_per_object_blob_info_iter_before_epoch(epoch);

// Create a per-object pooled blob info iterator that takes the current blob info table as
// the snapshot.
let per_object_pooled_blob_info_iterator = node
.storage
.certified_per_object_pooled_blob_info_iter_before_epoch(epoch);

// Unblock event processing.
let _ = tx.send(());

Expand All @@ -160,6 +170,12 @@ pub(super) async fn schedule_background_consistency_check(
epoch,
);

compose_certified_pooled_object_blob_list_digest(
node.clone(),
per_object_pooled_blob_info_iterator,
epoch,
);

if node
.consistency_check_config
.enable_blob_info_invariants_check
Expand Down Expand Up @@ -459,13 +475,19 @@ fn certified_blob_consistency_check(
};
}

/// Compose the digest of the blob list returned by the iterator.
/// `scan_counter` keeps track of the number of blobs scanned.
fn compose_blob_object_list_digest(
blob_info_iter: PerObjectBlobInfoIterator,
/// Compose the digest of the object list returned by the iterator.
///
/// Works for any per-object table keyed by `ObjectID` (the regular and the pooled per-object blob
/// info tables). `scan_counter` keeps track of the number of objects scanned.
fn compose_blob_object_list_digest<B, T, I>(
blob_info_iter: I,
epoch: Epoch,
scan_counter: &IntCounterVec,
) -> Result<u64, TypedStoreError> {
) -> Result<u64, TypedStoreError>
where
B: AsRef<[u8]>,
I: Iterator<Item = Result<(B, T), TypedStoreError>>,
{
let epoch_bucket = get_epoch_bucket(epoch);

// xxhash is not a cryptographic hash function, but it is fast, has good collision
Expand Down Expand Up @@ -541,3 +563,57 @@ fn compose_certified_object_blob_list_digest(
}
);
}

/// Compose the digest of the certified pooled object blob list.
fn compose_certified_pooled_object_blob_list_digest(
node: Arc<StorageNodeInner>,
per_object_pooled_blob_info_iter: PerObjectPooledBlobInfoIterator,
epoch: Epoch,
) {
let _scope = monitored_scope::monitored_scope(
"EpochChange::background_certified_pooled_blob_object_info_consistency_check",
);
let epoch_bucket = get_epoch_bucket(epoch);

let blob_object_list_digest = match compose_blob_object_list_digest(
per_object_pooled_blob_info_iter,
Comment thread
halfprice marked this conversation as resolved.
epoch,
&node
.metrics
.per_object_pooled_blob_info_consistency_check_certified_scanned,
) {
Ok(value) => value,
Err(error) => {
tracing::warn!(?error, "error when processing per object pooled blob info");
node.metrics
.per_object_pooled_blob_info_consistency_check_error
.inc();
return;
}
};

tracing::info!(
epoch,
certified_blob_hash = blob_object_list_digest,
"background per-object pooled blob info consistency check finished"
);
#[allow(clippy::cast_possible_wrap)] // wrapping is fine here
walrus_utils::with_label!(
node.metrics.per_object_pooled_blob_info_consistency_check,
epoch_bucket
)
.set(blob_object_list_digest as i64);

// No-op out side of simtest.
Comment thread
halfprice marked this conversation as resolved.
sui_macros::fail_point_arg!(
"storage_node_certified_pooled_blob_object_digest",
|digest_map: Arc<Mutex<HashMap<Epoch, HashMap<ObjectID, u64>>>>| {
digest_map
.lock()
.expect("failed to lock the digest map")
.entry(epoch)
.or_insert_with(|| HashMap::new())
.insert(node.node_capability, blob_object_list_digest);
}
);
}
12 changes: 12 additions & 0 deletions crates/walrus-service/src/node/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ walrus_utils::metrics::define_metric_set! {
consistency check."]
per_object_blob_info_consistency_check_certified_scanned: IntCounterVec["epoch"],

#[help = "The hash of the list of certified per-object pooled blobs at the beginning of \
the epoch. Note that the label is epoch % EPOCH_BUCKET_COUNT (see consistency_check.rs)."]
per_object_pooled_blob_info_consistency_check: IntGaugeVec["epoch"],

#[help = "The number of errors occurred when checking the consistency of the per-object \
pooled blob info table."]
per_object_pooled_blob_info_consistency_check_error: IntCounter[],

#[help = "The number of certified per-object pooled blobs scanned during the per-object \
pooled blob info consistency check."]
per_object_pooled_blob_info_consistency_check_certified_scanned: IntCounterVec["epoch"],

#[help = "The ratio of fully stored blobs during the blob info consistency check."]
node_blob_data_fully_stored_ratio: GaugeVec["epoch"],

Expand Down
14 changes: 14 additions & 0 deletions crates/walrus-service/src/node/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ use self::{
PerObjectBlobInfo,
PerObjectBlobInfoIterator,
PerObjectPooledBlobInfo,
PerObjectPooledBlobInfoIterator,
},
constants::{
garbage_collector_last_completed_epoch_key,
Expand Down Expand Up @@ -1307,6 +1308,19 @@ impl Storage {
.certified_per_object_blob_info_iter_before_epoch(epoch, std::ops::Bound::Unbounded)
}

/// Returns an iterator over the certified per-object pooled blob info before the specified
/// epoch.
pub(crate) fn certified_per_object_pooled_blob_info_iter_before_epoch(
&self,
epoch: Epoch,
) -> PerObjectPooledBlobInfoIterator<'_> {
self.blob_info
.certified_per_object_pooled_blob_info_iter_before_epoch(
epoch,
std::ops::Bound::Unbounded,
)
}

/// Checks internal invariants of the blob info table.
pub(crate) fn blob_info_invariants_check(&self) {
if let Err(error) = self.blob_info.check_invariants() {
Expand Down
24 changes: 24 additions & 0 deletions crates/walrus-service/src/node/storage/blob_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ pub type PerObjectBlobInfoIterator<'a> = BlobInfoIter<
dyn Iterator<Item = Result<(ObjectID, PerObjectBlobInfo), TypedStoreError>> + Send + 'a,
>;

pub type PerObjectPooledBlobInfoIterator<'a> = BlobInfoIter<
ObjectID,
PerObjectPooledBlobInfo,
dyn Iterator<Item = Result<(ObjectID, PerObjectPooledBlobInfo), TypedStoreError>> + Send + 'a,
>;

#[derive(Debug, Clone)]
pub(super) struct BlobInfoTable {
aggregate_blob_info: DBMap<BlobId, BlobInfo>,
Expand Down Expand Up @@ -517,6 +523,24 @@ impl BlobInfoTable {
)
}

/// Returns an iterator over all pooled blob objects that were certified before the specified
/// epoch in the per-object pooled blob info table starting with the `starting_object_id` bound.
#[tracing::instrument(skip_all)]
pub fn certified_per_object_pooled_blob_info_iter_before_epoch(
&self,
before_epoch: Epoch,
starting_object_id_bound: Bound<ObjectID>,
) -> PerObjectPooledBlobInfoIterator<'_> {
BlobInfoIter::new(
Box::new(
self.per_object_pooled_blob_info
.safe_range_iter((starting_object_id_bound, Unbounded))
.expect("per_object_pooled_blob_info cf must always exist in storage node"),
),
before_epoch,
)
}

/// Returns the blob info for `blob_id`.
pub fn get(&self, blob_id: &BlobId) -> Result<Option<BlobInfo>, TypedStoreError> {
self.aggregate_blob_info.get(blob_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use walrus_core::{BlobId, Epoch};

use super::{
BlobStatusChangeType,
CertifiedBlobInfoApi,
Mergeable,
PooledBlobChangeInfo,
PooledChangeTypeAndInfo,
Expand Down Expand Up @@ -155,6 +156,34 @@ impl From<PerObjectPooledBlobInfoV1> for PerObjectPooledBlobInfo {
}
}

impl CertifiedBlobInfoApi for PerObjectPooledBlobInfoV1 {
fn is_certified(&self, current_epoch: Epoch) -> bool {
// A pooled blob object is removed from the table on deletion (the pool manages its
// lifetime), so an entry that is still present and has a certified epoch at or before
// `current_epoch` is certified.
self.certified_epoch
.is_some_and(|epoch| epoch <= current_epoch)
}

fn initial_certified_epoch(&self) -> Option<Epoch> {
self.certified_epoch
}
}

impl CertifiedBlobInfoApi for PerObjectPooledBlobInfo {
fn is_certified(&self, current_epoch: Epoch) -> bool {
match self {
Self::V1(value) => value.is_certified(current_epoch),
}
}

fn initial_certified_epoch(&self) -> Option<Epoch> {
match self {
Self::V1(value) => value.initial_certified_epoch(),
}
}
}

impl ToBytes for PerObjectPooledBlobInfo {}

impl Mergeable for PerObjectPooledBlobInfo {
Expand Down
33 changes: 33 additions & 0 deletions crates/walrus-simtest/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,8 @@ pub mod simtest_utils {
certified_blob_digest_map: Arc<Mutex<HashMap<Epoch, HashMap<ObjectID, u64>>>>,
// Per epoch, the per object blob digest of all nodes.
per_object_blob_digest_map: Arc<Mutex<HashMap<Epoch, HashMap<ObjectID, u64>>>>,
// Per epoch, the per object pooled blob digest of all nodes.
per_object_pooled_blob_digest_map: Arc<Mutex<HashMap<Epoch, HashMap<ObjectID, u64>>>>,
// Per epoch, the existence check of all nodes.
blob_existence_check_map: Arc<Mutex<HashMap<Epoch, HashMap<ObjectID, f64>>>>,
checked: Arc<AtomicBool>,
Expand All @@ -372,6 +374,8 @@ pub mod simtest_utils {
let certified_blob_digest_map_clone = certified_blob_digest_map.clone();
let per_object_blob_digest_map = Arc::new(Mutex::new(HashMap::new()));
let per_object_blob_digest_map_clone = per_object_blob_digest_map.clone();
let per_object_pooled_blob_digest_map = Arc::new(Mutex::new(HashMap::new()));
let per_object_pooled_blob_digest_map_clone = per_object_pooled_blob_digest_map.clone();
let blob_existence_check_map = Arc::new(Mutex::new(HashMap::new()));
let blob_existence_check_map_clone = blob_existence_check_map.clone();

Expand All @@ -396,6 +400,13 @@ pub mod simtest_utils {
},
);

sui_macros::register_fail_point_arg(
"storage_node_certified_pooled_blob_object_digest",
move || -> Option<Arc<Mutex<HashMap<Epoch, HashMap<ObjectID, u64>>>>> {
Some(per_object_pooled_blob_digest_map_clone.clone())
},
);

sui_macros::register_fail_point_arg(
"storage_node_certified_blob_existence_check",
move || -> Option<Arc<Mutex<HashMap<Epoch, HashMap<ObjectID, f64>>>>> {
Expand All @@ -407,6 +418,7 @@ pub mod simtest_utils {
event_source_map,
certified_blob_digest_map,
per_object_blob_digest_map,
per_object_pooled_blob_digest_map,
blob_existence_check_map,
checked: Arc::new(AtomicBool::new(false)),
}
Expand Down Expand Up @@ -443,6 +455,7 @@ pub mod simtest_utils {

self.check_certified_blob_digest(min_epoch);
self.check_per_object_blob_digest(min_epoch);
self.check_per_object_pooled_blob_digest(min_epoch);
self.check_blob_existence(min_epoch);
}

Expand Down Expand Up @@ -480,6 +493,25 @@ pub mod simtest_utils {
}
}

/// Ensures that for all epochs, all nodes have the same per object pooled blob digest.
#[tracing::instrument(skip(self))]
fn check_per_object_pooled_blob_digest(&self, min_epoch: Epoch) {
tracing::info!(
"checking per object pooled blob digest consistency starting with epoch \
{min_epoch}"
);
let digest_map = self.per_object_pooled_blob_digest_map.lock().unwrap();
for (epoch, node_digest_map) in digest_map.iter().sorted_by_key(|(epoch, _)| *epoch) {
if *epoch < min_epoch {
tracing::info!(
"skipping epoch {epoch} because it is before the minimum epoch {min_epoch}"
);
continue;
}
Self::check_digests_are_equal(*epoch, node_digest_map);
}
}

fn check_digests_are_equal(epoch: Epoch, node_digest_map: &HashMap<ObjectID, u64>) {
let mut epoch_digest = None;
for (node_id, digest) in node_digest_map.iter() {
Expand Down Expand Up @@ -532,6 +564,7 @@ pub mod simtest_utils {
sui_macros::clear_fail_point("storage_node_event_index_source");
sui_macros::clear_fail_point("storage_node_certified_blob_digest");
sui_macros::clear_fail_point("storage_node_certified_blob_object_digest");
sui_macros::clear_fail_point("storage_node_certified_pooled_blob_object_digest");
sui_macros::clear_fail_point("storage_node_certified_blob_existence_check");
}
}
Expand Down
Loading