diff --git a/src/lib.rs b/src/lib.rs index d429d71c5..a88ccfc3d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -144,6 +144,10 @@ pub mod util; mod value; mod value_type; + +/// Integrity verification for SST and blob files. +pub mod verify; + mod version; mod vlog; diff --git a/src/verify.rs b/src/verify.rs new file mode 100644 index 000000000..3901caec0 --- /dev/null +++ b/src/verify.rs @@ -0,0 +1,211 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +use crate::{checksum::Checksum, table::TableId}; +use std::path::PathBuf; + +/// Describes a single integrity error found during verification. +#[derive(Debug)] +#[non_exhaustive] +pub enum IntegrityError { + /// Full-file checksum mismatch for an SST table. + SstFileCorrupted { + /// Table ID + table_id: TableId, + /// Path to the corrupted file + path: PathBuf, + /// Checksum stored in the manifest + expected: Checksum, + /// Checksum computed from disk + got: Checksum, + }, + + /// Full-file checksum mismatch for a blob file. + BlobFileCorrupted { + /// Blob file ID + blob_file_id: u64, + /// Path to the corrupted file + path: PathBuf, + /// Checksum stored in the manifest + expected: Checksum, + /// Checksum computed from disk + got: Checksum, + }, + + /// I/O error while reading a file during verification. + IoError { + /// Path to the file that could not be read + path: PathBuf, + /// The underlying I/O error + error: std::io::Error, + }, +} + +impl std::fmt::Display for IntegrityError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::SstFileCorrupted { + table_id, + path, + expected, + got, + } => write!( + f, + "SST table {table_id} corrupted at {}: expected {expected}, got {got}", + path.display() + ), + Self::BlobFileCorrupted { + blob_file_id, + path, + expected, + got, + } => write!( + f, + "blob file {blob_file_id} corrupted at {}: expected {expected}, got {got}", + path.display() + ), + Self::IoError { path, error } => { + write!(f, "I/O error reading {}: {}", path.display(), error) + } + } + } +} + +impl std::error::Error for IntegrityError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + Self::IoError { error, .. } => Some(error), + _ => None, + } + } +} + +/// Result of an integrity verification scan. +/// +/// The `sst_files_checked` and `blob_files_checked` counters reflect +/// the number of files *attempted* — including those that produced I/O +/// errors. This lets callers reconcile the total against the manifest +/// even when some files were unreadable. +#[derive(Debug)] +#[non_exhaustive] +pub struct IntegrityReport { + /// Number of SST table files checked (includes I/O errors). + pub sst_files_checked: usize, + + /// Number of blob files checked (includes I/O errors). + pub blob_files_checked: usize, + + /// Integrity errors found during verification. + pub errors: Vec, +} + +impl IntegrityReport { + /// Returns `true` if no errors were found. + #[must_use] + pub fn is_ok(&self) -> bool { + self.errors.is_empty() + } + + /// Total number of files checked (SST + blob). + #[must_use] + pub fn files_checked(&self) -> usize { + self.sst_files_checked + self.blob_files_checked + } +} + +/// Computes a streaming XXH3 128-bit checksum for a file without loading it entirely into memory. +/// +/// Uses `BufReader` so the caller does not need to manage a read buffer. +fn stream_checksum(path: &std::path::Path) -> std::io::Result { + use std::io::BufRead; + + let mut reader = std::io::BufReader::with_capacity(64 * 1024, std::fs::File::open(path)?); + let mut hasher = xxhash_rust::xxh3::Xxh3Default::new(); + + loop { + let chunk = reader.fill_buf()?; + if chunk.is_empty() { + break; + } + hasher.update(chunk); + let n = chunk.len(); + reader.consume(n); + } + + Ok(Checksum::from_raw(hasher.digest128())) +} + +/// Verifies full-file checksums for all SST and blob files in the given tree. +/// +/// Each file's content is read from disk and hashed with XXHash-3 128-bit, +/// then compared against the checksum stored in the version manifest. +/// +/// This detects silent bit-rot, partial writes, and other on-disk corruption. +/// +/// Per-file errors (e.g., unreadable files, checksum mismatches) are collected +/// into [`IntegrityReport::errors`] — the scan always runs to completion. +#[must_use] +pub fn verify_integrity(tree: &impl crate::AbstractTree) -> IntegrityReport { + let version = tree.current_version(); + + let mut report = IntegrityReport { + sst_files_checked: 0, + blob_files_checked: 0, + errors: Vec::new(), + }; + + // Verify all SST table files + for table in version.iter_tables() { + let path = &*table.path; + let expected = table.checksum(); + + match stream_checksum(path) { + Ok(got) if got != expected => { + report.errors.push(IntegrityError::SstFileCorrupted { + table_id: table.id(), + path: (*table.path).clone(), + expected, + got, + }); + } + Ok(_) => {} + Err(e) => { + report.errors.push(IntegrityError::IoError { + path: (*table.path).clone(), + error: e, + }); + } + } + + report.sst_files_checked += 1; + } + + // Verify all blob files + for blob_file in version.blob_files.iter() { + let path = blob_file.path(); + let expected = blob_file.checksum(); + + match stream_checksum(path) { + Ok(got) if got != expected => { + report.errors.push(IntegrityError::BlobFileCorrupted { + blob_file_id: blob_file.id(), + path: path.to_path_buf(), + expected, + got, + }); + } + Ok(_) => {} + Err(e) => { + report.errors.push(IntegrityError::IoError { + path: path.to_path_buf(), + error: e, + }); + } + } + + report.blob_files_checked += 1; + } + + report +} diff --git a/tests/verify_integrity.rs b/tests/verify_integrity.rs new file mode 100644 index 000000000..a54a0c33b --- /dev/null +++ b/tests/verify_integrity.rs @@ -0,0 +1,379 @@ +use lsm_tree::{ + // AbstractTree must be in scope for enum_dispatch method resolution on AnyTree + get_tmp_folder, + verify, + AbstractTree, + Config, + KvSeparationOptions, + SequenceNumberCounter, +}; +use test_log::test; + +#[test] +fn verify_integrity_clean_tree() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + + let report = verify::verify_integrity(&tree); + + assert!(report.is_ok(), "clean tree should have no errors"); + assert_eq!(1, report.sst_files_checked); + assert_eq!(0, report.blob_files_checked); + + Ok(()) +} + +#[test] +fn verify_integrity_detect_sst_corruption() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + + // Corrupt a byte in the SST file + let version = tree.current_version(); + let table = version.iter_tables().next().unwrap(); + { + use std::io::{Seek, Write}; + let mut f = std::fs::OpenOptions::new().write(true).open(&*table.path)?; + f.seek(std::io::SeekFrom::Start(100))?; + f.write_all(b"CORRUPT")?; + f.sync_all()?; + } + + let report = verify::verify_integrity(&tree); + + assert!(!report.is_ok(), "corrupted tree should have errors"); + assert_eq!(1, report.sst_files_checked); + assert_eq!(1, report.errors.len()); + + // Verify error type + match &report.errors[0] { + verify::IntegrityError::SstFileCorrupted { table_id, .. } => { + assert_eq!(*table_id, table.id()); + } + other => panic!("expected SstFileCorrupted, got: {other}"), + } + + Ok(()) +} + +#[test] +fn verify_integrity_blob_tree_clean() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + + let report = verify::verify_integrity(&tree); + + assert!(report.is_ok(), "clean blob tree should have no errors"); + assert!(report.sst_files_checked > 0); + assert!(report.blob_files_checked > 0); + + Ok(()) +} + +#[test] +fn verify_integrity_detect_blob_corruption() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + + // Corrupt a byte in the blob file + let version = tree.current_version(); + let blob_file = version.blob_files.iter().next().unwrap(); + { + use std::io::{Seek, Write}; + let mut f = std::fs::OpenOptions::new() + .write(true) + .open(blob_file.path())?; + f.seek(std::io::SeekFrom::Start(100))?; + f.write_all(b"CORRUPT")?; + f.sync_all()?; + } + + let report = verify::verify_integrity(&tree); + + assert!(!report.is_ok(), "corrupted blob tree should have errors"); + assert_eq!(1, report.errors.len()); + + match &report.errors[0] { + verify::IntegrityError::BlobFileCorrupted { blob_file_id, .. } => { + assert_eq!(*blob_file_id, blob_file.id()); + } + other => panic!("expected BlobFileCorrupted, got: {other}"), + } + + Ok(()) +} + +#[test] +fn verify_integrity_multiple_tables() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + // Create multiple SST files + for batch in 0..3 { + for i in 0..10 { + let key = format!("batch{batch}_key{i:04}"); + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + } + + let report = verify::verify_integrity(&tree); + + assert!(report.is_ok()); + assert_eq!(3, report.sst_files_checked); + assert_eq!(3, report.files_checked()); + + Ok(()) +} + +#[test] +fn verify_integrity_missing_sst_file() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + + // Delete the SST file to trigger an IoError + let version = tree.current_version(); + let table = version.iter_tables().next().unwrap(); + std::fs::remove_file(&*table.path)?; + + let report = verify::verify_integrity(&tree); + + assert!(!report.is_ok(), "missing file should produce an error"); + assert_eq!(1, report.errors.len()); + + match &report.errors[0] { + verify::IntegrityError::IoError { path, .. } => { + assert_eq!(path, table.path.as_ref()); + } + other => panic!("expected IoError, got: {other}"), + } + + Ok(()) +} + +#[test] +fn verify_integrity_missing_blob_file() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + + // Delete the blob file to trigger an IoError on blob path + let version = tree.current_version(); + let blob_file = version.blob_files.iter().next().unwrap(); + let blob_path = blob_file.path().to_path_buf(); + std::fs::remove_file(&blob_path)?; + + let report = verify::verify_integrity(&tree); + + assert!(!report.is_ok(), "missing blob file should produce an error"); + assert_eq!(1, report.errors.len()); + + match &report.errors[0] { + verify::IntegrityError::IoError { path, .. } => { + assert_eq!(path, &blob_path); + } + other => panic!("expected IoError, got: {other}"), + } + + Ok(()) +} + +#[test] +fn verify_integrity_display_and_error_trait() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + // -- SstFileCorrupted Display -- + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree.insert(key, b"value", 0); + } + tree.flush_active_memtable(0)?; + + // Corrupt SST + let version = tree.current_version(); + let table = version.iter_tables().next().unwrap(); + { + use std::io::{Seek, Write}; + let mut f = std::fs::OpenOptions::new().write(true).open(&*table.path)?; + f.seek(std::io::SeekFrom::Start(50))?; + f.write_all(b"XX")?; + f.sync_all()?; + } + + let report = verify::verify_integrity(&tree); + assert!(!report.errors.is_empty()); + + let msg = format!("{}", &report.errors[0]); + assert!(msg.contains("SST table"), "SstFileCorrupted Display: {msg}"); + assert!( + msg.contains("corrupted at"), + "SstFileCorrupted Display: {msg}" + ); + + // Error::source for non-IoError should be None + assert!( + std::error::Error::source(&report.errors[0]).is_none(), + "SstFileCorrupted should have no source" + ); + + drop(tree); + + // -- BlobFileCorrupted Display -- + let folder2 = get_tmp_folder(); + let tree2 = Config::new( + &folder2, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1))) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree2.insert(key, b"value", 0); + } + tree2.flush_active_memtable(0)?; + + // Corrupt blob + let version2 = tree2.current_version(); + let blob_file = version2.blob_files.iter().next().unwrap(); + { + use std::io::{Seek, Write}; + let mut f = std::fs::OpenOptions::new() + .write(true) + .open(blob_file.path())?; + f.seek(std::io::SeekFrom::Start(50))?; + f.write_all(b"XX")?; + f.sync_all()?; + } + + let report2 = verify::verify_integrity(&tree2); + let blob_err = report2 + .errors + .iter() + .find(|e| matches!(e, verify::IntegrityError::BlobFileCorrupted { .. })); + assert!(blob_err.is_some(), "should have BlobFileCorrupted error"); + + let msg = format!("{}", blob_err.unwrap()); + assert!( + msg.contains("blob file"), + "BlobFileCorrupted Display: {msg}" + ); + assert!( + msg.contains("corrupted at"), + "BlobFileCorrupted Display: {msg}" + ); + + drop(tree2); + + // -- IoError Display + Error::source -- + let folder3 = get_tmp_folder(); + let tree3 = Config::new( + &folder3, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + for key in ('a'..='z').map(|c| c.to_string()) { + tree3.insert(key, b"value", 0); + } + tree3.flush_active_memtable(0)?; + + let version3 = tree3.current_version(); + let table3 = version3.iter_tables().next().unwrap(); + std::fs::remove_file(&*table3.path)?; + + let report3 = verify::verify_integrity(&tree3); + assert!(!report3.errors.is_empty()); + + let msg = format!("{}", &report3.errors[0]); + assert!(msg.contains("I/O error reading"), "IoError Display: {msg}"); + + // Error::source for IoError should return the underlying io::Error + assert!( + std::error::Error::source(&report3.errors[0]).is_some(), + "IoError should have a source" + ); + + Ok(()) +}