diff --git a/crates/walrus-service/node_config_example.yaml b/crates/walrus-service/node_config_example.yaml index 79bb066122..c9dc54907a 100644 --- a/crates/walrus-service/node_config_example.yaml +++ b/crates/walrus-service/node_config_example.yaml @@ -228,6 +228,8 @@ consistency_check: enable_blob_info_invariants_check: false enable_sliver_data_existence_check: true sliver_data_existence_check_sample_rate_percentage: 100 +blob_info_snapshot: + enabled: false checkpoint_config: max_db_checkpoints: 3 db_checkpoint_interval: diff --git a/crates/walrus-service/src/node.rs b/crates/walrus-service/src/node.rs index 9386e3887d..0546058894 100644 --- a/crates/walrus-service/src/node.rs +++ b/crates/walrus-service/src/node.rs @@ -7,6 +7,7 @@ use std::{ collections::{BTreeMap, hash_map::Entry}, future::Future, num::{NonZero, NonZeroU16, NonZeroUsize}, + path::PathBuf, pin::Pin, sync::{ Arc, @@ -213,6 +214,7 @@ pub mod server; pub mod system_events; pub(crate) mod blob_event_processor; +pub(crate) mod blob_info_snapshot_writer; pub(crate) mod consistency_check; pub(crate) mod db_checkpoint; pub(crate) mod errors; @@ -691,6 +693,8 @@ pub struct StorageNodeInner { // Sender for updating the latest event epoch. latest_event_epoch_sender: watch::Sender>, consistency_check_config: StorageNodeConsistencyCheckConfig, + blob_info_snapshot_config: blob_info_snapshot_writer::BlobInfoSnapshotWriterConfig, + blob_info_snapshot_dir: PathBuf, checkpoint_manager: Option>, garbage_collection_config: GarbageCollectionConfig, // Server-side cap on the `sliver_count` a sync-shard request may ask for; `None` disables it. @@ -893,6 +897,10 @@ impl StorageNode { latest_event_epoch_sender, latest_event_epoch_watcher, consistency_check_config: config.consistency_check.clone(), + blob_info_snapshot_config: config.blob_info_snapshot.clone(), + blob_info_snapshot_dir: blob_info_snapshot_writer::snapshot_base_dir( + &config.storage_path, + ), checkpoint_manager, garbage_collection_config: config.garbage_collection, max_sliver_count_per_sync_request: config @@ -1004,6 +1012,10 @@ impl StorageNode { /// Run the walrus-node logic until cancelled using the provided cancellation token. pub async fn run(&self, cancel_token: CancellationToken) -> anyhow::Result<()> { + // If the node crashed before or during the epoch-boundary checkpoint cleanup, finish + // it now so that a stale checkpoint does not pin disk space for a full epoch. + blob_info_snapshot_writer::spawn_startup_cleanup(self.inner.clone()); + if let Err(error) = self .epoch_change_driver .schedule_relevant_calls_for_current_epoch() @@ -1945,9 +1957,13 @@ impl StorageNode { // phase 1's disk traffic on the same RocksDB instance. self.start_garbage_collection_task(event.epoch).await?; - // Capture the event index before the handle is moved into `execute_epoch_change` so - // we can later detect whether this event is being reprocessed. + // Determine whether this event is being reprocessed before the handle is moved into + // `execute_epoch_change`: the finisher task it spawns marks the event as complete in + // the background, so checking afterwards could misclassify normal processing as + // reprocessing and skip the checkpoint and consistency check below. let event_index = event_handle.index(); + let node_is_reprocessing_events = + self.inner.storage.get_latest_handled_event_index()? >= event_index; // During epoch change, we need to lock the read access to shard map until all the new // shards are created. @@ -1971,6 +1987,30 @@ impl StorageNode { self.epoch_change_driver .schedule_post_epoch_change_subsidies(); + // Create the blob info snapshot checkpoint after GC phase 1 has settled the blob info + // tables and before any further events are processed. Operators serialize and compare + // the checkpoints offline with `db-tool bench-blob-info-snapshot`. + // + // This runs before scheduling the consistency check so that the checkpoint (a memtable + // flush plus hard links, seconds) finishes before the check's long background scan + // starts competing for disk I/O. Both capture their state inline while event + // processing is blocked, so the ordering between them does not affect determinism. + if self.inner.blob_info_snapshot_config.enabled + && !node_is_reprocessing_events + && let Err(error) = blob_info_snapshot_writer::create_checkpoint_at_epoch_boundary( + self.inner.clone(), + event.epoch, + ) + .await + { + self.inner.metrics.blob_info_snapshot_error_total.inc(); + tracing::warn!( + ?error, + walrus.epoch = event.epoch, + "failed to create the blob info snapshot checkpoint at the epoch boundary" + ); + } + // Schedule the storage node consistency check after garbage collection has settled the // aggregate blob info table. The iterator's `is_certified` filter relies on counters // that GC decrements for newly-expired deletable and pooled blobs, so the digest @@ -1981,8 +2021,6 @@ impl StorageNode { // - consistency check is disabled // - node is reprocessing events (blob info table should not be affected by future // events) - let node_is_reprocessing_events = - self.inner.storage.get_latest_handled_event_index()? >= event_index; if self.inner.consistency_check_config.enable_consistency_check && !node_is_reprocessing_events && let Err(err) = consistency_check::schedule_background_consistency_check( diff --git a/crates/walrus-service/src/node/blob_info_snapshot_writer.rs b/crates/walrus-service/src/node/blob_info_snapshot_writer.rs new file mode 100644 index 0000000000..b84922ecdf --- /dev/null +++ b/crates/walrus-service/src/node/blob_info_snapshot_writer.rs @@ -0,0 +1,250 @@ +// Copyright (c) Walrus Foundation +// SPDX-License-Identifier: Apache-2.0 + +//! Epoch-boundary database checkpoints for blob info snapshot verification. +//! +//! At the epoch boundary, directly after garbage-collection phase 1 and before any further +//! events are processed, the blob info tables are identical across all honest nodes. When +//! enabled, this module creates a RocksDB checkpoint of the database at exactly that point +//! and removes the previous epoch's checkpoint, so that at most one checkpoint exists at a +//! time. +//! +//! The node itself does not serialize anything: operators serialize and compare the +//! checkpoints offline by running +//! `walrus-node db-tool bench-blob-info-snapshot --db-path ` on each node and +//! comparing the reported snapshot digests for the same epoch. The checkpoint captures the +//! deterministic post-GC-phase-1 state, so the digests must be identical across nodes. +//! +//! This module deliberately does not reuse [`super::db_checkpoint::DbCheckpointManager`]: +//! that subsystem implements wall-clock-periodic backups with retry-on-failure semantics and +//! recommends a separate physical disk. All three properties are wrong here: a retried +//! checkpoint would capture a non-boundary state (events have resumed), a cross-filesystem +//! checkpoint degrades from hard links to a full copy, and the schedule must be the epoch +//! boundary, not an interval. Only the underlying RocksDB checkpoint primitive is shared. + +use std::{ + fs, + path::{Path, PathBuf}, + sync::Arc, + time::Instant, +}; + +use anyhow::{Context, Result}; +use serde::{Deserialize, Serialize}; +use walrus_core::Epoch; + +use super::StorageNodeInner; + +/// Configuration for the blob info snapshot checkpoint writer. +#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(default)] +pub struct BlobInfoSnapshotWriterConfig { + /// Whether to create a database checkpoint at each epoch boundary. + /// + /// The checkpoint directory lives under the storage path and must therefore be on the + /// same filesystem as the database, so creation uses hard links. Note that disabling + /// this flag leaves the last checkpoint on disk until it is removed manually. + pub enabled: bool, +} + +/// Returns the directory under which the writer keeps its checkpoints. +pub fn snapshot_base_dir(storage_path: &Path) -> PathBuf { + storage_path.join("blob_info_snapshots") +} + +fn checkpoint_dir_path(base_dir: &Path, epoch: Epoch) -> PathBuf { + base_dir.join(format!("checkpoint_epoch_{epoch}")) +} + +/// Parses the epoch out of a (possibly temporary) checkpoint directory name. +fn checkpoint_epoch(file_name: &str) -> Option { + file_name + .strip_suffix(".tmp") + .unwrap_or(file_name) + .strip_prefix("checkpoint_epoch_")? + .parse() + .ok() +} + +/// Creates the database checkpoint for `epoch` at the epoch boundary and removes all older +/// checkpoints. +/// +/// Must be called after GC phase 1 has completed for `epoch` and before any further events +/// are processed, so that the checkpoint captures the deterministic post-GC state. The +/// checkpoint creation (memtable flush plus hard-linking every SST file) blocks event +/// processing; its duration is reported through the +/// `blob_info_snapshot_checkpoint_duration_seconds` metric. +pub(super) async fn create_checkpoint_at_epoch_boundary( + node: Arc, + epoch: Epoch, +) -> Result<()> { + let base_dir = node.blob_info_snapshot_dir.clone(); + fs::create_dir_all(&base_dir)?; + remove_checkpoints_matching(&base_dir, |checkpoint_epoch| checkpoint_epoch != epoch); + + let final_path = checkpoint_dir_path(&base_dir, epoch); + if final_path.exists() { + // Already created, e.g., because the epoch change event is being reprocessed after a + // restart. + tracing::debug!( + walrus.epoch = epoch, + "blob info snapshot checkpoint already exists" + ); + return Ok(()); + } + let tmp_path = base_dir.join(format!("checkpoint_epoch_{epoch}.tmp")); + if tmp_path.exists() { + fs::remove_dir_all(&tmp_path)?; + } + + let start = Instant::now(); + let storage_node = node.clone(); + let checkpoint_tmp_path = tmp_path.clone(); + tokio::task::spawn_blocking(move || { + storage_node + .storage + .checkpoint_database(&checkpoint_tmp_path) + }) + .await + .context("checkpoint creation task panicked")? + .context("failed to create the blob info snapshot checkpoint")?; + fs::rename(&tmp_path, &final_path)?; + let elapsed = start.elapsed(); + + node.metrics + .blob_info_snapshot_checkpoint_duration_seconds + .set(elapsed.as_secs_f64()); + tracing::info!( + walrus.epoch = epoch, + ?elapsed, + path = %final_path.display(), + "created blob info snapshot checkpoint" + ); + Ok(()) +} + +/// Finishes the checkpoint cleanup at startup, in case the node crashed before or during the +/// epoch-boundary cleanup: removes temporary checkpoints and keeps only the latest one. +/// +/// Without this, a stale checkpoint would survive until the next epoch boundary and keep +/// hard links to SST files that the live database has long deleted, pinning their disk space +/// for up to a full epoch. +pub(super) fn spawn_startup_cleanup(node: Arc) { + if !node.blob_info_snapshot_config.enabled { + return; + } + tokio::spawn(async move { + let base_dir = node.blob_info_snapshot_dir.clone(); + if base_dir.exists() { + remove_stale_checkpoints(&base_dir); + } + }); +} + +/// Removes all temporary checkpoints and all checkpoints older than the latest one. +fn remove_stale_checkpoints(base_dir: &Path) { + let latest_epoch = checkpoint_entries(base_dir) + .filter(|(name, _)| !name.ends_with(".tmp")) + .filter_map(|(name, _)| checkpoint_epoch(&name)) + .max(); + for (name, path) in checkpoint_entries(base_dir) { + let is_stale = name.ends_with(".tmp") + || checkpoint_epoch(&name).is_some_and(|epoch| Some(epoch) < latest_epoch); + if is_stale && let Err(error) = fs::remove_dir_all(&path) { + tracing::warn!( + ?error, + path = %path.display(), + "failed to remove stale blob info snapshot checkpoint" + ); + } + } +} + +/// Removes all checkpoints (including temporary ones) whose epoch matches `should_remove`. +fn remove_checkpoints_matching(base_dir: &Path, should_remove: impl Fn(Epoch) -> bool) { + for (name, path) in checkpoint_entries(base_dir) { + if checkpoint_epoch(&name).is_some_and(&should_remove) + && let Err(error) = fs::remove_dir_all(&path) + { + tracing::warn!( + ?error, + path = %path.display(), + "failed to remove blob info snapshot checkpoint" + ); + } + } +} + +/// Returns the names and paths of all checkpoint directories under `base_dir`. +fn checkpoint_entries(base_dir: &Path) -> impl Iterator { + fs::read_dir(base_dir) + .into_iter() + .flatten() + .flatten() + .filter_map(|entry| { + let name = entry.file_name().to_str()?.to_string(); + checkpoint_epoch(&name)?; + Some((name, entry.path())) + }) +} + +#[cfg(test)] +mod tests { + use tempfile::tempdir; + + use super::*; + + #[test] + fn config_default_is_disabled() { + assert!(!BlobInfoSnapshotWriterConfig::default().enabled); + let parsed: BlobInfoSnapshotWriterConfig = + serde_yaml::from_str("enabled: true\n").expect("config should deserialize"); + assert!(parsed.enabled); + } + + #[test] + fn checkpoint_epoch_parses_directory_names() { + assert_eq!(checkpoint_epoch("checkpoint_epoch_7"), Some(7)); + assert_eq!(checkpoint_epoch("checkpoint_epoch_7.tmp"), Some(7)); + assert_eq!(checkpoint_epoch("unrelated"), None); + assert_eq!(checkpoint_epoch("checkpoint_epoch_x"), None); + } + + #[test] + fn boundary_cleanup_removes_other_epochs() -> Result<()> { + let dir = tempdir()?; + let base = dir.path(); + fs::create_dir(checkpoint_dir_path(base, 3))?; + fs::create_dir(checkpoint_dir_path(base, 4))?; + fs::create_dir(base.join("checkpoint_epoch_4.tmp"))?; + fs::write(base.join("unrelated.file"), b"keep me")?; + + remove_checkpoints_matching(base, |epoch| epoch != 4); + + assert!(!checkpoint_dir_path(base, 3).exists()); + assert!(checkpoint_dir_path(base, 4).exists()); + // The temporary directory for epoch 4 is also kept by this filter; it is replaced by + // the checkpoint creation itself and removed by the startup cleanup otherwise. + assert!(base.join("checkpoint_epoch_4.tmp").exists()); + assert!(base.join("unrelated.file").exists()); + Ok(()) + } + + #[test] + fn startup_cleanup_keeps_only_the_latest_checkpoint() -> Result<()> { + let dir = tempdir()?; + let base = dir.path(); + fs::create_dir(checkpoint_dir_path(base, 3))?; + fs::create_dir(checkpoint_dir_path(base, 5))?; + fs::create_dir(base.join("checkpoint_epoch_6.tmp"))?; + fs::write(base.join("unrelated.file"), b"keep me")?; + + remove_stale_checkpoints(base); + + assert!(!checkpoint_dir_path(base, 3).exists()); + assert!(checkpoint_dir_path(base, 5).exists()); + assert!(!base.join("checkpoint_epoch_6.tmp").exists()); + assert!(base.join("unrelated.file").exists()); + Ok(()) + } +} diff --git a/crates/walrus-service/src/node/config.rs b/crates/walrus-service/src/node/config.rs index 7a26fef2ff..326623ce7a 100644 --- a/crates/walrus-service/src/node/config.rs +++ b/crates/walrus-service/src/node/config.rs @@ -44,6 +44,7 @@ use walrus_sui::types::{ use walrus_utils::config::Config as _; use super::{ + blob_info_snapshot_writer::BlobInfoSnapshotWriterConfig, consistency_check::StorageNodeConsistencyCheckConfig, garbage_collector::GarbageCollectionConfig, storage::DatabaseConfig, @@ -317,6 +318,9 @@ pub struct StorageNodeConfig { /// Configuration for the consistency check. #[serde(default, skip_serializing_if = "defaults::is_default")] pub consistency_check: StorageNodeConsistencyCheckConfig, + /// Configuration for the blob info snapshot checkpoint writer. + #[serde(default, skip_serializing_if = "defaults::is_default")] + pub blob_info_snapshot: BlobInfoSnapshotWriterConfig, /// Configuration for the checkpointing task. #[serde(default, skip_serializing_if = "defaults::is_default")] pub checkpoint_config: DbCheckpointConfig, @@ -546,6 +550,7 @@ impl Default for StorageNodeConfig { balance_check: Default::default(), thread_pool: Default::default(), consistency_check: Default::default(), + blob_info_snapshot: Default::default(), checkpoint_config: Default::default(), admin_socket_path: None, node_recovery_config: Default::default(), diff --git a/crates/walrus-service/src/node/dbtool.rs b/crates/walrus-service/src/node/dbtool.rs index ea6475fc91..b67fb3180f 100644 --- a/crates/walrus-service/src/node/dbtool.rs +++ b/crates/walrus-service/src/node/dbtool.rs @@ -3,7 +3,14 @@ //! Tools for inspecting and maintaining the RocksDB database. -use std::{collections::BTreeMap, path::PathBuf, thread::sleep, time::Duration}; +use std::{ + collections::BTreeMap, + fs::File, + io::BufWriter, + path::PathBuf, + thread::sleep, + time::{Duration, Instant}, +}; use anyhow::{Context, Result, bail}; use bincode::Options; @@ -22,7 +29,7 @@ use rocksdb::{ use serde::{Deserialize, Serialize}; use serde_with::serde_as; use sui_types::base_types::ObjectID; -use typed_store::rocks::be_fix_int_ser; +use typed_store::{TypedStoreError, rocks::be_fix_int_ser}; use walrus_core::{ BlobId, Epoch, @@ -34,7 +41,7 @@ use super::DatabaseTableOptionsFactory; use crate::{ event::{ event_processor::db::constants::{self as event_processor_constants}, - events::{InitState, PositionedStreamEvent}, + events::{EventStreamCursor, InitState, PositionedStreamEvent}, }, node::{ DatabaseConfig, @@ -55,11 +62,14 @@ use crate::{ BlobInfo, CertifiedBlobInfoApi, PerObjectBlobInfo, + PerObjectPooledBlobInfo, + StoragePoolInfo, blob_info_cf_options, per_object_blob_info_cf_options, per_object_pooled_blob_info_cf_options, storage_pool_info_cf_options, }, + blob_info_snapshot::{SnapshotHeader, read_snapshot, write_snapshot}, constants::{ aggregate_blob_info_cf_name, event_cursor_cf_name, @@ -308,6 +318,32 @@ pub enum DbToolCommands { #[command(subcommand)] command: EventProcessorCommands, }, + + /// Benchmark blob info snapshot serialization against an existing database. + /// + /// Serializes the `per_object_blob_info`, `per_object_pooled_blob_info`, and + /// `storage_pool_info` column families into a snapshot file and reports its size, the + /// serialization and deserialization durations, the bulk-load duration into a scratch + /// database, and zstd compression ratios. The database is opened read-only; run this + /// against a stopped node's database, a RocksDB checkpoint, or a filesystem snapshot of + /// the database directory. Plain file copies of a live database directory are not safe + /// inputs, and a live database, while it usually opens, yields a view that is stale by + /// the unflushed WAL tail. + BenchBlobInfoSnapshot { + /// Path to the RocksDB database directory. + #[arg(long)] + db_path: PathBuf, + /// Path of the snapshot file written by the benchmark; defaults to + /// `blob_info_snapshot.bench` in the system temp directory. + #[arg(long)] + output: Option, + /// Zstd compression levels to measure. + #[arg(long, value_delimiter = ',', default_value = "1,3,9")] + zstd_levels: Vec, + /// Skip the bulk-load timing into a scratch database. + #[arg(long, default_value_t = false)] + skip_bulk_load: bool, + }, } /// Commands for reading event blob writer metadata. @@ -422,6 +458,12 @@ impl DbToolCommands { Self::EventProcessor { db_path, command } => match command { EventProcessorCommands::ReadInitState => read_event_processor_init_state(db_path), }, + Self::BenchBlobInfoSnapshot { + db_path, + output, + zstd_levels, + skip_bulk_load, + } => bench_blob_info_snapshot(db_path, output, zstd_levels, skip_bulk_load), } } } @@ -1549,6 +1591,270 @@ fn read_failed_to_attest_event_blobs(db_path: PathBuf) -> Result<()> { Ok(()) } +/// Returns a typed iterator over all entries of a blob-info-related column family, decoding +/// keys with the typed-store key encoding and values with BCS, matching how the storage node +/// itself reads these tables. +fn snapshot_source_iter<'db, V: serde::de::DeserializeOwned>( + db: &'db DB, + cf_name: &'static str, +) -> Result> + 'db> { + let cf = db + .cf_handle(cf_name) + .with_context(|| format!("column family {cf_name} should exist in the database"))?; + Ok(db + .iterator_cf(&cf, rocksdb::IteratorMode::Start) + .map(|item| { + let (key, value) = + item.map_err(|error| TypedStoreError::RocksDBError(error.to_string()))?; + let key_config = bincode::DefaultOptions::new() + .with_big_endian() + .with_fixint_encoding(); + let object_id: ObjectID = key_config + .deserialize(&key) + .map_err(|error| TypedStoreError::SerializationError(error.to_string()))?; + let value: V = bcs::from_bytes(&value) + .map_err(|error| TypedStoreError::SerializationError(error.to_string()))?; + Ok((object_id, value)) + })) +} + +fn bench_blob_info_snapshot( + db_path: PathBuf, + output: Option, + zstd_levels: Vec, + skip_bulk_load: bool, +) -> Result<()> { + let factory = DatabaseTableOptionsFactory::new(DatabaseConfig::default(), false); + let db = DB::open_cf_with_opts_for_read_only( + &RocksdbOptions::default(), + &db_path, + [ + ( + per_object_blob_info_cf_name(), + per_object_blob_info_cf_options(&factory), + ), + ( + per_object_pooled_blob_info_cf_name(), + per_object_pooled_blob_info_cf_options(&factory), + ), + ( + storage_pool_info_cf_name(), + storage_pool_info_cf_options(&factory), + ), + ], + false, + ) + .context("failed to open the database read-only; run against a stopped node or a DB copy")?; + + let output_path = + output.unwrap_or_else(|| std::env::temp_dir().join("blob_info_snapshot.bench")); + println!("Blob info snapshot benchmark"); + println!(" database: {}", db_path.display()); + println!(" snapshot file: {}", output_path.display()); + + // When the input is a node-created `checkpoint_epoch_` directory, embed its epoch in + // the header so that the digest only matches across snapshots taken at the same epoch + // boundary. The event cursor deliberately stays at its default: the cursor stored in the + // database is not deterministic at the checkpoint instant (event completion is marked by + // a background task), so embedding it would cause spurious digest mismatches. The real + // writer receives both values in-process from the epoch change event instead. + let checkpoint_epoch = db_path + .file_name() + .and_then(|name| name.to_str()) + .and_then(|name| name.strip_prefix("checkpoint_epoch_")) + .and_then(|epoch| epoch.parse::().ok()); + match checkpoint_epoch { + Some(epoch) => println!(" checkpoint epoch: {epoch} (embedded in the digest)"), + None => println!( + " checkpoint epoch: unknown (not a checkpoint_epoch_ directory); \ + using epoch 0 in the header" + ), + } + let header = SnapshotHeader::new( + checkpoint_epoch.unwrap_or(0), + EventStreamCursor::default(), + BlobId::ZERO, + ); + + let file = File::create(&output_path)?; + let mut writer = BufWriter::with_capacity(1 << 20, file); + let serialize_start = Instant::now(); + let stats = write_snapshot( + &mut writer, + &header, + snapshot_source_iter::(&db, per_object_blob_info_cf_name())?, + snapshot_source_iter::( + &db, + per_object_pooled_blob_info_cf_name(), + )?, + snapshot_source_iter::(&db, storage_pool_info_cf_name())?, + )?; + writer.into_inner()?.sync_all()?; + let serialize_elapsed = serialize_start.elapsed(); + drop(db); + + let total_entries = + stats.per_object_count + stats.per_object_pooled_count + stats.storage_pool_count; + println!(" entries:"); + println!( + " per_object_blob_info: {}", + stats.per_object_count + ); + println!( + " per_object_pooled_blob_info: {}", + stats.per_object_pooled_count + ); + println!( + " storage_pool_info: {}", + stats.storage_pool_count + ); + println!( + " snapshot digest (xxhash64): {:016x} <- compare across nodes for the same epoch", + stats.checksum + ); + println!( + " snapshot size: {} bytes ({:.1} MiB, {:.1} bytes/entry)", + stats.bytes_written, + mib(stats.bytes_written), + ratio(stats.bytes_written, total_entries), + ); + println!( + " serialize + write + sync: {:.2?} ({:.1} MiB/s, {:.0} entries/s)", + serialize_elapsed, + mib(stats.bytes_written) / serialize_elapsed.as_secs_f64(), + ratio(total_entries, 1) / serialize_elapsed.as_secs_f64(), + ); + + let bytes = std::fs::read(&output_path)?; + let deserialize_start = Instant::now(); + let contents = read_snapshot(&bytes)?; + let deserialize_elapsed = deserialize_start.elapsed(); + println!( + " read + deserialize + verify checksum: {:.2?} ({:.1} MiB/s, {:.0} entries/s)", + deserialize_elapsed, + mib(stats.bytes_written) / deserialize_elapsed.as_secs_f64(), + ratio(total_entries, 1) / deserialize_elapsed.as_secs_f64(), + ); + + if !skip_bulk_load { + let load_elapsed = bulk_load_into_scratch_db(&contents)?; + println!( + " bulk load into scratch db: {:.2?} ({:.0} entries/s)", + load_elapsed, + ratio(total_entries, 1) / load_elapsed.as_secs_f64(), + ); + } + + for level in zstd_levels { + let compress_start = Instant::now(); + let compressed = zstd::stream::encode_all(bytes.as_slice(), level)?; + let compress_elapsed = compress_start.elapsed(); + let decompress_start = Instant::now(); + let decompressed = zstd::stream::decode_all(compressed.as_slice())?; + let decompress_elapsed = decompress_start.elapsed(); + anyhow::ensure!(decompressed == bytes, "zstd roundtrip mismatch"); + println!( + " zstd level {level}: {} bytes ({:.2}x ratio), compress {:.2?}, decompress {:.2?}", + compressed.len(), + ratio(stats.bytes_written, u64::try_from(compressed.len())?), + compress_elapsed, + decompress_elapsed, + ); + } + + Ok(()) +} + +/// Loads the snapshot contents into a freshly created scratch database with the same column +/// families and key/value encodings as a real node, and returns the elapsed time. The scratch +/// database is deleted afterwards. +fn bulk_load_into_scratch_db( + contents: &crate::node::storage::blob_info_snapshot::SnapshotContents, +) -> Result { + const BATCH_SIZE: usize = 10_000; + + let scratch_path = + std::env::temp_dir().join(format!("blob_info_snapshot_load_{}", std::process::id())); + let factory = DatabaseTableOptionsFactory::new(DatabaseConfig::default(), false); + let mut db_options = RocksdbOptions::default(); + db_options.create_if_missing(true); + db_options.create_missing_column_families(true); + let db = DB::open_cf_with_opts( + &db_options, + &scratch_path, + [ + ( + per_object_blob_info_cf_name(), + per_object_blob_info_cf_options(&factory), + ), + ( + per_object_pooled_blob_info_cf_name(), + per_object_pooled_blob_info_cf_options(&factory), + ), + ( + storage_pool_info_cf_name(), + storage_pool_info_cf_options(&factory), + ), + ], + )?; + + fn load_section( + db: &DB, + cf_name: &'static str, + entries: &[(ObjectID, V)], + ) -> Result<()> { + let cf = db + .cf_handle(cf_name) + .with_context(|| format!("column family {cf_name} should exist in the scratch db"))?; + for chunk in entries.chunks(BATCH_SIZE) { + let mut batch = rocksdb::WriteBatch::default(); + for (object_id, value) in chunk { + batch.put_cf(&cf, be_fix_int_ser(object_id)?, bcs::to_bytes(value)?); + } + db.write(batch)?; + } + Ok(()) + } + + let start = Instant::now(); + load_section(&db, per_object_blob_info_cf_name(), &contents.per_object)?; + load_section( + &db, + per_object_pooled_blob_info_cf_name(), + &contents.per_object_pooled, + )?; + load_section(&db, storage_pool_info_cf_name(), &contents.storage_pools)?; + db.flush()?; + let elapsed = start.elapsed(); + + drop(db); + let _ = DB::destroy(&RocksdbOptions::default(), &scratch_path); + let _ = std::fs::remove_dir_all(&scratch_path); + Ok(elapsed) +} + +/// Converts a byte count to MiB as a float for reporting. +fn mib(bytes: u64) -> f64 { + const BYTES_PER_MIB: f64 = (1u64 << 20) as f64; + u64_to_f64(bytes) / BYTES_PER_MIB +} + +/// Returns `numerator / denominator` as a float for reporting; returns 0.0 for a zero +/// denominator. +fn ratio(numerator: u64, denominator: u64) -> f64 { + if denominator == 0 { + return 0.0; + } + u64_to_f64(numerator) / u64_to_f64(denominator) +} + +/// Converts a `u64` to `f64` for reporting purposes, where precision loss on huge values is +/// acceptable. +#[allow(clippy::cast_precision_loss)] +fn u64_to_f64(value: u64) -> f64 { + value as f64 +} + #[cfg(test)] mod tests { use clap::{CommandFactory, Parser}; @@ -1628,6 +1934,136 @@ mod tests { compact_db(db_dir.path().to_path_buf(), CompactDbMode::Full) } + /// Exercises the full operator verification workflow: a real node database is + /// checkpointed at the epoch boundary, and the benchmark serializes the blob info tables + /// from the checkpoint directory. + #[tokio::test] + async fn bench_runs_on_a_node_database_checkpoint() -> Result<()> { + use walrus_sui::{test_utils::EventForTesting as _, types::BlobRegistered}; + + let storage = crate::test_utils::empty_storage_with_shards(&[]).await; + let blob_id = walrus_core::test_utils::blob_id_from_u64(42); + storage + .as_ref() + .update_blob_info(0, &BlobRegistered::for_testing(blob_id).into())?; + + let checkpoint_dir = tempdir()?; + let checkpoint_path = checkpoint_dir.path().join("checkpoint_epoch_1"); + storage.as_ref().checkpoint_database(&checkpoint_path)?; + + let output_path = checkpoint_dir.path().join("snapshot.bench"); + bench_blob_info_snapshot(checkpoint_path, Some(output_path.clone()), vec![1], true)?; + + let contents = read_snapshot(&std::fs::read(&output_path)?)?; + assert_eq!(contents.per_object.len(), 1); + Ok(()) + } + + #[test] + fn bench_blob_info_snapshot_roundtrips_database_contents() -> Result<()> { + use walrus_core::test_utils::blob_id_from_u64; + use walrus_sui::test_utils::{FIXED_STORAGE_POOL_ID, fixed_event_id_for_testing}; + + let db_dir = tempdir()?; + let db_table_opts_factory = + DatabaseTableOptionsFactory::new(DatabaseConfig::default(), false); + let mut db_opts = RocksdbOptions::from(&db_table_opts_factory.global()); + db_opts.create_if_missing(true); + db_opts.create_missing_column_families(true); + + let db = DB::open_cf_descriptors( + &db_opts, + db_dir.path(), + vec![ + ColumnFamilyDescriptor::new( + per_object_blob_info_cf_name(), + per_object_blob_info_cf_options(&db_table_opts_factory), + ), + ColumnFamilyDescriptor::new( + per_object_pooled_blob_info_cf_name(), + per_object_pooled_blob_info_cf_options(&db_table_opts_factory), + ), + ColumnFamilyDescriptor::new( + storage_pool_info_cf_name(), + storage_pool_info_cf_options(&db_table_opts_factory), + ), + // An unrelated column family, mimicking a real node database with many more + // column families than the benchmark opens. + ColumnFamilyDescriptor::new("custom_cf", RocksdbOptions::default()), + ], + )?; + + let per_object_entries = vec![ + ( + ObjectID::from_single_byte(1), + PerObjectBlobInfo::new_for_testing( + blob_id_from_u64(1), + 1, + Some(2), + 10, + true, + fixed_event_id_for_testing(7), + false, + ), + ), + ( + ObjectID::from_single_byte(2), + PerObjectBlobInfo::new_for_testing( + blob_id_from_u64(2), + 3, + None, + 20, + false, + fixed_event_id_for_testing(8), + false, + ), + ), + ]; + let pooled_entries = vec![( + ObjectID::from_single_byte(3), + PerObjectPooledBlobInfo::new_for_testing( + blob_id_from_u64(3), + 4, + None, + FIXED_STORAGE_POOL_ID, + fixed_event_id_for_testing(9), + ), + )]; + let pool_entries = vec![(ObjectID::from_single_byte(4), StoragePoolInfo::new(1, 30))]; + + fn put_entries( + db: &DB, + cf_name: &str, + entries: &[(ObjectID, V)], + ) -> Result<()> { + let cf = db + .cf_handle(cf_name) + .expect("column family should exist in test db"); + for (object_id, value) in entries { + db.put_cf(&cf, be_fix_int_ser(object_id)?, bcs::to_bytes(value)?)?; + } + Ok(()) + } + put_entries(&db, per_object_blob_info_cf_name(), &per_object_entries)?; + put_entries(&db, per_object_pooled_blob_info_cf_name(), &pooled_entries)?; + put_entries(&db, storage_pool_info_cf_name(), &pool_entries)?; + drop(db); + + let output_path = db_dir.path().join("snapshot.bench"); + bench_blob_info_snapshot( + db_dir.path().to_path_buf(), + Some(output_path.clone()), + vec![1], + false, + )?; + + let contents = read_snapshot(&std::fs::read(&output_path)?)?; + assert_eq!(contents.per_object, per_object_entries); + assert_eq!(contents.per_object_pooled, pooled_entries); + assert_eq!(contents.storage_pools, pool_entries); + Ok(()) + } + #[test] fn compact_db_drain_exits_for_idle_storage_db() -> Result<()> { let db_dir = tempdir()?; diff --git a/crates/walrus-service/src/node/metrics.rs b/crates/walrus-service/src/node/metrics.rs index d4b76f3bc2..3725ae79be 100644 --- a/crates/walrus-service/src/node/metrics.rs +++ b/crates/walrus-service/src/node/metrics.rs @@ -194,6 +194,13 @@ walrus_utils::metrics::define_metric_set! { blob info table."] per_object_blob_info_consistency_check_error: IntCounter[], + #[help = "The duration of creating the epoch-boundary database checkpoint for the blob \ + info snapshot, in seconds."] + blob_info_snapshot_checkpoint_duration_seconds: Gauge[], + + #[help = "The number of errors while creating blob info snapshot checkpoints."] + blob_info_snapshot_error_total: IntCounter[], + #[help = "The number of certified per-object blobs scanned during the per-object blob info \ consistency check."] per_object_blob_info_consistency_check_certified_scanned: IntCounterVec["epoch"], diff --git a/crates/walrus-service/src/node/storage.rs b/crates/walrus-service/src/node/storage.rs index 616d4069d1..d142d6f3f3 100644 --- a/crates/walrus-service/src/node/storage.rs +++ b/crates/walrus-service/src/node/storage.rs @@ -66,6 +66,7 @@ use super::{ use crate::utils::{self, BatchProcessingResult}; pub(crate) mod blob_info; +pub(crate) mod blob_info_snapshot; pub(crate) mod constants; mod database_config; @@ -475,6 +476,12 @@ impl Storage { self.blob_info.clear() } + /// Creates a RocksDB checkpoint of the entire database at `path`; see + /// [`blob_info::BlobInfoTable::checkpoint_database`]. + pub(crate) fn checkpoint_database(&self, path: &Path) -> Result<(), TypedStoreError> { + self.blob_info.checkpoint_database(path) + } + /// Returns lock write access to the shards map, and returns the underlying shard map. pub(crate) async fn lock_shards(&self) -> StorageShardLock { let shards_guard = self.shards.clone().write_owned().await; diff --git a/crates/walrus-service/src/node/storage/blob_info.rs b/crates/walrus-service/src/node/storage/blob_info.rs index afc9f9f5fe..61734f001b 100644 --- a/crates/walrus-service/src/node/storage/blob_info.rs +++ b/crates/walrus-service/src/node/storage/blob_info.rs @@ -438,6 +438,13 @@ impl BlobInfoTable { ) } + /// Creates a RocksDB checkpoint of the entire database (all column families, not only the + /// blob info tables) at `path`. Uses hard links, so `path` must be on the same filesystem + /// as the database. + pub fn checkpoint_database(&self, path: &std::path::Path) -> Result<(), TypedStoreError> { + self.per_object_blob_info.checkpoint_db(path) + } + /// Returns an iterator over all entries in the aggregate blob info table within the given /// range. pub fn aggregate_blob_info_range_iter( diff --git a/crates/walrus-service/src/node/storage/blob_info/per_object_pooled_blob_info.rs b/crates/walrus-service/src/node/storage/blob_info/per_object_pooled_blob_info.rs index c1a21b9fa5..62bf990db3 100644 --- a/crates/walrus-service/src/node/storage/blob_info/per_object_pooled_blob_info.rs +++ b/crates/walrus-service/src/node/storage/blob_info/per_object_pooled_blob_info.rs @@ -155,6 +155,25 @@ impl From for PerObjectPooledBlobInfo { } } +impl PerObjectPooledBlobInfo { + #[cfg(test)] + pub(crate) fn new_for_testing( + blob_id: BlobId, + registered_epoch: Epoch, + certified_epoch: Option, + storage_pool_id: ObjectID, + event: EventID, + ) -> Self { + Self::V1(PerObjectPooledBlobInfoV1 { + blob_id, + registered_epoch, + certified_epoch, + storage_pool_id, + event, + }) + } +} + impl ToBytes for PerObjectPooledBlobInfo {} impl Mergeable for PerObjectPooledBlobInfo { diff --git a/crates/walrus-service/src/node/storage/blob_info_snapshot.rs b/crates/walrus-service/src/node/storage/blob_info_snapshot.rs new file mode 100644 index 0000000000..bdc60dcfd2 --- /dev/null +++ b/crates/walrus-service/src/node/storage/blob_info_snapshot.rs @@ -0,0 +1,624 @@ +// Copyright (c) Walrus Foundation +// SPDX-License-Identifier: Apache-2.0 + +//! Serialization format for blob info snapshots. +//! +//! A blob info snapshot is a deterministic serialization of the blob info tables that are +//! identical across all honest nodes at the epoch boundary (after garbage-collection phase 1): +//! `per_object_blob_info`, `per_object_pooled_blob_info`, and `storage_pool_info`. The +//! `aggregate_blob_info` table is deliberately excluded: it is a materialized view that contains +//! node-local state (`is_metadata_stored`) and entries whose deletion timing depends on the +//! background GC phase 2, so it is not deterministic across nodes; it is reconstructed from the +//! per-object tables during recovery. +//! +//! The format is versioned and self-delimiting so it can be written and read in a single pass: +//! +//! ```text +//! +----------------------+ +//! | Magic (4 B, BE) | +//! +----------------------+ +//! | Version (4 B, BE) | +//! +----------------------+ +//! | Header len (4 B, BE) | +//! +----------------------+ +//! | Header (BCS bytes) | +//! +----------------------+ +//! | Section (tag = 1) | per_object_blob_info +//! +----------------------+ +//! | Section (tag = 2) | per_object_pooled_blob_info +//! +----------------------+ +//! | Section (tag = 3) | storage_pool_info +//! +----------------------+ +//! | Checksum (8 B, BE) | xxhash64 of all preceding bytes +//! +----------------------+ +//! +//! Section := tag (1 B) +//! { 0x01 | key len (4 B, BE) | key BCS | value len (4 B, BE) | value BCS }* +//! 0x00 | entry count (8 B, BE) +//! ``` +//! +//! Entries within a section are required to be in strictly increasing key order (the natural +//! RocksDB iteration order), which makes the serialization a pure function of the table contents. +//! Any change that affects the serialized bytes (entry types, section layout, future compression) +//! MUST bump [`SNAPSHOT_FORMAT_VERSION`]: the snapshot bytes are consensus-critical, since all +//! nodes must produce bit-identical snapshots for the same epoch. + +use std::{ + hash::Hasher as _, + io::{Cursor, Read, Write}, +}; + +use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt}; +use serde::{Deserialize, Serialize, de::DeserializeOwned}; +use sui_types::base_types::ObjectID; +use twox_hash::XxHash64; +use typed_store::TypedStoreError; +use walrus_core::{BlobId, Epoch}; + +use super::blob_info::{PerObjectBlobInfo, PerObjectPooledBlobInfo, StoragePoolInfo}; +use crate::event::events::EventStreamCursor; + +/// The magic bytes at the start of a blob info snapshot. +pub(crate) const SNAPSHOT_MAGIC: u32 = 0xB10B1F05; +/// The current format version of the blob info snapshot. +pub(crate) const SNAPSHOT_FORMAT_VERSION: u32 = 1; + +const SECTION_TAG_PER_OBJECT: u8 = 1; +const SECTION_TAG_PER_OBJECT_POOLED: u8 = 2; +const SECTION_TAG_STORAGE_POOL: u8 = 3; + +const ENTRY_MARKER: u8 = 0x01; +const SECTION_END_MARKER: u8 = 0x00; + +const CHECKSUM_SEED: u64 = 0; +const CHECKSUM_SIZE: usize = size_of::(); + +/// Errors occurring during blob info snapshot (de)serialization. +#[derive(Debug, thiserror::Error)] +pub(crate) enum SnapshotError { + /// An I/O error occurred while reading or writing the snapshot. + #[error("I/O error during snapshot (de)serialization")] + Io(#[from] std::io::Error), + /// Reading from the underlying database failed. + #[error("database error during snapshot serialization")] + Storage(#[from] TypedStoreError), + /// BCS (de)serialization of a header or entry failed. + #[error("BCS (de)serialization error")] + Encoding(#[from] bcs::Error), + /// The snapshot does not start with the expected magic bytes. + #[error("invalid snapshot magic: expected {SNAPSHOT_MAGIC:#010x}, found {0:#010x}")] + InvalidMagic(u32), + /// The snapshot has a format version that this binary cannot read. + #[error("unsupported snapshot format version {0}")] + UnsupportedVersion(u32), + /// The snapshot is structurally invalid. + #[error("corrupt snapshot: {0}")] + Corrupt(String), + /// The snapshot checksum does not match its contents. + #[error("snapshot checksum mismatch")] + ChecksumMismatch, + /// The entries of a section are not in strictly increasing key order. + #[error("keys are not in strictly increasing order in section with tag {0}")] + UnsortedKeys(u8), +} + +/// The header of a blob info snapshot. +/// +/// The header pins the exact event-stream position the snapshot corresponds to: the snapshot +/// contains the table state after applying all events up to and including `event_cursor`, with +/// the inline GC phase 1 for `epoch` applied. The chunk fields are reserved for splitting large +/// snapshots across multiple blobs; the current writer always produces a single chunk. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +pub(crate) struct SnapshotHeader { + /// The epoch whose boundary this snapshot was taken at. + pub epoch: Epoch, + /// The position of the last event included in the snapshot. + pub event_cursor: EventStreamCursor, + /// The blob ID of the previous epoch's snapshot, or [`BlobId::ZERO`] if unknown. + pub prev_snapshot_blob_id: BlobId, + /// The index of this chunk; always 0 until chunking is implemented. + pub chunk_index: u32, + /// The total number of chunks; always 1 until chunking is implemented. + pub chunk_count: u32, +} + +impl SnapshotHeader { + /// Creates a single-chunk snapshot header. + pub fn new( + epoch: Epoch, + event_cursor: EventStreamCursor, + prev_snapshot_blob_id: BlobId, + ) -> Self { + Self { + epoch, + event_cursor, + prev_snapshot_blob_id, + chunk_index: 0, + chunk_count: 1, + } + } +} + +/// Statistics about a written snapshot, for logging and metrics. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub(crate) struct SnapshotStats { + /// The total number of bytes written, including the checksum. + pub bytes_written: u64, + /// The number of entries serialized from `per_object_blob_info`. + pub per_object_count: u64, + /// The number of entries serialized from `per_object_pooled_blob_info`. + pub per_object_pooled_count: u64, + /// The number of entries serialized from `storage_pool_info`. + pub storage_pool_count: u64, + /// The xxhash64 checksum of the snapshot contents (also stored in the snapshot trailer). + /// + /// Since the serialization is deterministic, this is a fingerprint of the snapshotted + /// table contents: nodes with identical tables produce identical checksums. + pub checksum: u64, +} + +/// The fully deserialized contents of a blob info snapshot. +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct SnapshotContents { + /// The snapshot header. + pub header: SnapshotHeader, + /// The entries of the `per_object_blob_info` table. + pub per_object: Vec<(ObjectID, PerObjectBlobInfo)>, + /// The entries of the `per_object_pooled_blob_info` table. + pub per_object_pooled: Vec<(ObjectID, PerObjectPooledBlobInfo)>, + /// The entries of the `storage_pool_info` table. + pub storage_pools: Vec<(ObjectID, StoragePoolInfo)>, +} + +/// A writer wrapper that maintains a running xxhash64 and byte count of everything written. +struct HashingWriter { + inner: W, + hasher: XxHash64, + bytes_written: u64, +} + +impl HashingWriter { + fn new(inner: W) -> Self { + Self { + inner, + hasher: XxHash64::with_seed(CHECKSUM_SEED), + bytes_written: 0, + } + } +} + +impl Write for HashingWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let written = self.inner.write(buf)?; + self.hasher.write(&buf[..written]); + self.bytes_written += u64::try_from(written).expect("usize fits in u64"); + Ok(written) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.inner.flush() + } +} + +/// Serializes a blob info snapshot to `writer`. +/// +/// The entry iterators must yield entries in strictly increasing key order, as produced by +/// RocksDB iteration; this is checked and [`SnapshotError::UnsortedKeys`] is returned otherwise. +pub(crate) fn write_snapshot( + writer: W, + header: &SnapshotHeader, + per_object: impl IntoIterator>, + per_object_pooled: impl IntoIterator< + Item = Result<(ObjectID, PerObjectPooledBlobInfo), TypedStoreError>, + >, + storage_pools: impl IntoIterator>, +) -> Result { + let mut writer = HashingWriter::new(writer); + + writer.write_u32::(SNAPSHOT_MAGIC)?; + writer.write_u32::(SNAPSHOT_FORMAT_VERSION)?; + let header_bytes = bcs::to_bytes(header)?; + writer.write_u32::(checked_len(header_bytes.len())?)?; + writer.write_all(&header_bytes)?; + + let per_object_count = write_section(&mut writer, SECTION_TAG_PER_OBJECT, per_object)?; + let per_object_pooled_count = write_section( + &mut writer, + SECTION_TAG_PER_OBJECT_POOLED, + per_object_pooled, + )?; + let storage_pool_count = write_section(&mut writer, SECTION_TAG_STORAGE_POOL, storage_pools)?; + + let checksum = writer.hasher.finish(); + writer.write_u64::(checksum)?; + writer.flush()?; + + Ok(SnapshotStats { + bytes_written: writer.bytes_written, + per_object_count, + per_object_pooled_count, + storage_pool_count, + checksum, + }) +} + +/// Deserializes a blob info snapshot from `bytes`, verifying its structure and checksum. +pub(crate) fn read_snapshot(bytes: &[u8]) -> Result { + verify_checksum(bytes)?; + let mut reader = Cursor::new(&bytes[..bytes.len() - CHECKSUM_SIZE]); + + let magic = reader.read_u32::()?; + if magic != SNAPSHOT_MAGIC { + return Err(SnapshotError::InvalidMagic(magic)); + } + let version = reader.read_u32::()?; + if version != SNAPSHOT_FORMAT_VERSION { + return Err(SnapshotError::UnsupportedVersion(version)); + } + + let header_len = reader.read_u32::()?; + let header_bytes = read_chunk(&mut reader, header_len)?; + let header: SnapshotHeader = bcs::from_bytes(&header_bytes)?; + + let per_object = read_section(&mut reader, SECTION_TAG_PER_OBJECT)?; + let per_object_pooled = read_section(&mut reader, SECTION_TAG_PER_OBJECT_POOLED)?; + let storage_pools = read_section(&mut reader, SECTION_TAG_STORAGE_POOL)?; + + let position = reader.position(); + let remaining = u64::try_from(reader.get_ref().len()).expect("usize fits in u64") - position; + if remaining != 0 { + return Err(SnapshotError::Corrupt(format!( + "{remaining} unexpected trailing bytes after the last section" + ))); + } + + Ok(SnapshotContents { + header, + per_object, + per_object_pooled, + storage_pools, + }) +} + +fn write_section( + writer: &mut W, + tag: u8, + entries: impl IntoIterator>, +) -> Result { + writer.write_u8(tag)?; + let mut count: u64 = 0; + let mut previous_key: Option = None; + + for entry in entries { + let (key, value) = entry?; + if previous_key.is_some_and(|previous| previous >= key) { + return Err(SnapshotError::UnsortedKeys(tag)); + } + previous_key = Some(key); + + writer.write_u8(ENTRY_MARKER)?; + let key_bytes = bcs::to_bytes(&key)?; + writer.write_u32::(checked_len(key_bytes.len())?)?; + writer.write_all(&key_bytes)?; + let value_bytes = bcs::to_bytes(&value)?; + writer.write_u32::(checked_len(value_bytes.len())?)?; + writer.write_all(&value_bytes)?; + count += 1; + } + + writer.write_u8(SECTION_END_MARKER)?; + writer.write_u64::(count)?; + Ok(count) +} + +fn read_section( + reader: &mut Cursor<&[u8]>, + expected_tag: u8, +) -> Result, SnapshotError> { + let tag = reader.read_u8()?; + if tag != expected_tag { + return Err(SnapshotError::Corrupt(format!( + "expected section tag {expected_tag}, found {tag}" + ))); + } + + let mut entries = Vec::new(); + loop { + let marker = reader.read_u8()?; + match marker { + SECTION_END_MARKER => break, + ENTRY_MARKER => { + let key_len = reader.read_u32::()?; + let key_bytes = read_chunk(reader, key_len)?; + let key: ObjectID = bcs::from_bytes(&key_bytes)?; + let value_len = reader.read_u32::()?; + let value_bytes = read_chunk(reader, value_len)?; + let value: V = bcs::from_bytes(&value_bytes)?; + entries.push((key, value)); + } + other => { + return Err(SnapshotError::Corrupt(format!( + "invalid entry marker {other} in section with tag {expected_tag}" + ))); + } + } + } + + let count = reader.read_u64::()?; + let actual_count = u64::try_from(entries.len()).expect("usize fits in u64"); + if count != actual_count { + return Err(SnapshotError::Corrupt(format!( + "section with tag {expected_tag} declares {count} entries but contains {actual_count}" + ))); + } + Ok(entries) +} + +fn verify_checksum(bytes: &[u8]) -> Result<(), SnapshotError> { + let Some(content_len) = bytes.len().checked_sub(CHECKSUM_SIZE) else { + return Err(SnapshotError::Corrupt( + "snapshot is too short to contain a checksum".to_string(), + )); + }; + let mut hasher = XxHash64::with_seed(CHECKSUM_SEED); + hasher.write(&bytes[..content_len]); + let expected = hasher.finish(); + + let mut checksum_bytes = [0u8; CHECKSUM_SIZE]; + checksum_bytes.copy_from_slice(&bytes[content_len..]); + let found = u64::from_be_bytes(checksum_bytes); + + if expected != found { + return Err(SnapshotError::ChecksumMismatch); + } + Ok(()) +} + +fn read_chunk(reader: &mut Cursor<&[u8]>, len: u32) -> Result, SnapshotError> { + let mut buffer = vec![0u8; usize::try_from(len).expect("u32 fits in usize")]; + reader.read_exact(&mut buffer)?; + Ok(buffer) +} + +fn checked_len(len: usize) -> Result { + u32::try_from(len) + .map_err(|_| SnapshotError::Corrupt(format!("entry of {len} bytes exceeds the u32 limit"))) +} + +#[cfg(test)] +mod tests { + use sui_types::event::EventID; + use walrus_core::test_utils::blob_id_from_u64; + use walrus_sui::test_utils::{FIXED_STORAGE_POOL_ID, fixed_event_id_for_testing}; + + use super::*; + + fn object_id(byte: u8) -> ObjectID { + ObjectID::from_single_byte(byte) + } + + fn test_header() -> SnapshotHeader { + SnapshotHeader::new( + 7, + EventStreamCursor::new(Some(fixed_event_id_for_testing(3)), 42), + BlobId::ZERO, + ) + } + + fn test_per_object_entries() -> Vec<(ObjectID, PerObjectBlobInfo)> { + vec![ + ( + object_id(1), + PerObjectBlobInfo::new_for_testing( + blob_id_from_u64(1), + 1, + Some(2), + 10, + true, + fixed_event_id_for_testing(7), + false, + ), + ), + ( + object_id(2), + PerObjectBlobInfo::new_for_testing( + blob_id_from_u64(2), + 3, + None, + 20, + false, + fixed_event_id_for_testing(8), + false, + ), + ), + ] + } + + fn test_pooled_entries() -> Vec<(ObjectID, PerObjectPooledBlobInfo)> { + vec![( + object_id(3), + PerObjectPooledBlobInfo::new_for_testing( + blob_id_from_u64(3), + 4, + None, + FIXED_STORAGE_POOL_ID, + fixed_event_id_for_testing(9), + ), + )] + } + + fn test_pool_entries() -> Vec<(ObjectID, StoragePoolInfo)> { + vec![(object_id(4), StoragePoolInfo::new(1, 30))] + } + + fn ok_iter( + entries: Vec<(ObjectID, V)>, + ) -> impl Iterator> { + entries.into_iter().map(Ok) + } + + fn write_test_snapshot() -> (Vec, SnapshotStats) { + let mut bytes = Vec::new(); + let stats = write_snapshot( + &mut bytes, + &test_header(), + ok_iter(test_per_object_entries()), + ok_iter(test_pooled_entries()), + ok_iter(test_pool_entries()), + ) + .expect("serialization should succeed"); + (bytes, stats) + } + + #[test] + fn roundtrip_empty_snapshot() { + let mut bytes = Vec::new(); + let stats = write_snapshot( + &mut bytes, + &test_header(), + ok_iter(vec![]), + ok_iter(vec![]), + ok_iter(vec![]), + ) + .expect("serialization should succeed"); + assert_eq!(stats.per_object_count, 0); + assert_eq!(stats.bytes_written, u64::try_from(bytes.len()).unwrap()); + + let contents = read_snapshot(&bytes).expect("deserialization should succeed"); + assert_eq!(contents.header, test_header()); + assert!(contents.per_object.is_empty()); + assert!(contents.per_object_pooled.is_empty()); + assert!(contents.storage_pools.is_empty()); + } + + #[test] + fn roundtrip_snapshot_with_entries() { + let (bytes, stats) = write_test_snapshot(); + assert_eq!(stats.per_object_count, 2); + assert_eq!(stats.per_object_pooled_count, 1); + assert_eq!(stats.storage_pool_count, 1); + assert_eq!(stats.bytes_written, u64::try_from(bytes.len()).unwrap()); + + let contents = read_snapshot(&bytes).expect("deserialization should succeed"); + assert_eq!(contents.header, test_header()); + assert_eq!(contents.per_object, test_per_object_entries()); + assert_eq!(contents.per_object_pooled, test_pooled_entries()); + assert_eq!(contents.storage_pools, test_pool_entries()); + } + + #[test] + fn serialization_is_deterministic() { + let (first, _) = write_test_snapshot(); + let (second, _) = write_test_snapshot(); + assert_eq!(first, second); + } + + #[test] + fn rejects_unsorted_keys() { + let mut entries = test_per_object_entries(); + entries.reverse(); + let result = write_snapshot( + &mut Vec::new(), + &test_header(), + ok_iter(entries), + ok_iter(vec![]), + ok_iter(vec![]), + ); + assert!(matches!( + result, + Err(SnapshotError::UnsortedKeys(SECTION_TAG_PER_OBJECT)) + )); + } + + #[test] + fn rejects_corrupted_bytes() { + let (mut bytes, _) = write_test_snapshot(); + let flip_position = bytes.len() / 2; + bytes[flip_position] ^= 0xFF; + assert!(matches!( + read_snapshot(&bytes), + Err(SnapshotError::ChecksumMismatch) + )); + } + + #[test] + fn rejects_truncated_snapshot() { + let (bytes, _) = write_test_snapshot(); + assert!(read_snapshot(&bytes[..bytes.len() - 1]).is_err()); + assert!(read_snapshot(&[]).is_err()); + } + + #[test] + fn rejects_invalid_magic_and_version() { + let (bytes, _) = write_test_snapshot(); + + let mut wrong_magic = bytes.clone(); + wrong_magic[0] ^= 0xFF; + fix_checksum(&mut wrong_magic); + assert!(matches!( + read_snapshot(&wrong_magic), + Err(SnapshotError::InvalidMagic(_)) + )); + + let mut wrong_version = bytes; + wrong_version[7] ^= 0xFF; + fix_checksum(&mut wrong_version); + assert!(matches!( + read_snapshot(&wrong_version), + Err(SnapshotError::UnsupportedVersion(_)) + )); + } + + /// Recomputes and overwrites the trailing checksum after a test mutated the snapshot bytes. + fn fix_checksum(bytes: &mut [u8]) { + let content_len = bytes.len() - CHECKSUM_SIZE; + let mut hasher = XxHash64::with_seed(CHECKSUM_SEED); + hasher.write(&bytes[..content_len]); + bytes[content_len..].copy_from_slice(&hasher.finish().to_be_bytes()); + } + + /// Golden test pinning the exact serialized bytes of format version 1. + /// + /// If this test fails, the serialized representation has changed: this is a breaking format + /// change that requires bumping [`SNAPSHOT_FORMAT_VERSION`], because snapshot bytes must be + /// bit-identical across all nodes of the network for the same table contents. + #[test] + fn golden_bytes_format_v1() { + let (bytes, _) = write_test_snapshot(); + let expected_hex = concat!( + "b10b1f05000000010000005e0700000001202a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a", + "2a2a2a2a2a2a2a2a03000000000000002a00000000000000000000000000000000000000000000000000", + "000000000000000000000000000000000000010000000101000000200000000000000000000000000000", + "000000000000000000000000000000000001000000590000000000000000000000000000000000000000", + "000000000000000000000000010100000001020000000a00000001202a2a2a2a2a2a2a2a2a2a2a2a2a2a", + "2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a070000000000000000010000002000000000000000000000", + "000000000000000000000000000000000000000000020000005500000000000000000000000000000000", + "000000000000000000000000000000000203000000001400000000202a2a2a2a2a2a2a2a2a2a2a2a2a2a", + "2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a080000000000000000000000000000000002020100000020", + "00000000000000000000000000000000000000000000000000000000000000030000006f000000000000", + "000000000000000000000000000000000000000000000000000003040000000000000000000000000000", + "00000000000000000000000000000000000000000063202a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a", + "2a2a2a2a2a2a2a2a2a2a2a2a2a0900000000000000000000000000000001030100000020000000000000", + "00000000000000000000000000000000000000000000000000040000000900010000001e000000000000", + "000000000001fdbf5eaf0d0fecff", + ); + assert_eq!( + hex::encode(&bytes), + expected_hex, + "snapshot format v1 bytes changed; this requires a format version bump" + ); + } + + /// The event ID used in the golden test must itself be stable. + #[test] + fn golden_test_inputs_are_fixed() { + let event = fixed_event_id_for_testing(3); + assert_eq!( + event, + EventID { + tx_digest: sui_types::digests::TransactionDigest::new([42; 32]), + event_seq: 3, + } + ); + } +} diff --git a/crates/walrus-service/src/test_utils.rs b/crates/walrus-service/src/test_utils.rs index 99c511762f..b614c03738 100644 --- a/crates/walrus-service/src/test_utils.rs +++ b/crates/walrus-service/src/test_utils.rs @@ -3289,6 +3289,7 @@ pub fn storage_node_config() -> WithTempDir { WithTempDir { inner: StorageNodeConfig { name: "node".to_string(), + blob_info_snapshot: Default::default(), protocol_key_pair: walrus_core::test_utils::protocol_key_pair().into(), next_protocol_key_pair: None, network_key_pair: walrus_core::test_utils::network_key_pair().into(), diff --git a/crates/walrus-service/src/testbed.rs b/crates/walrus-service/src/testbed.rs index 9c3c84fbdb..2020df3e5c 100644 --- a/crates/walrus-service/src/testbed.rs +++ b/crates/walrus-service/src/testbed.rs @@ -716,6 +716,7 @@ pub async fn create_storage_node_configs( name: node.name.clone(), storage_path, blocklist_path: None, + blob_info_snapshot: Default::default(), protocol_key_pair, next_protocol_key_pair: None, network_key_pair: node.network_keypair.into(),