diff --git a/benches/index_block.rs b/benches/index_block.rs index c2f1b6a35..490644ef9 100644 --- a/benches/index_block.rs +++ b/benches/index_block.rs @@ -116,6 +116,7 @@ fn build_table_for_point_read(restart_interval: u8) -> BenchTable { 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(8))), + Arc::new(lsm_tree::fs::StdFs), false, false, None, diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 0d8ed42d9..32c68eec1 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.94.0" +channel = "1.94.1" components = ["rustfmt", "clippy"] diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs index ff1c05d7d..6826779d0 100644 --- a/src/blob_tree/ingest.rs +++ b/src/blob_tree/ingest.rs @@ -227,6 +227,7 @@ impl<'a> BlobIngestion<'a> { index.id, index.config.cache.clone(), index.config.descriptor_table.clone(), + self.table.level_fs.clone(), false, false, index.config.encryption.clone(), diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index ccb0489ce..53946d760 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -139,7 +139,6 @@ pub struct BlobTree { impl BlobTree { pub(crate) fn open(config: Config) -> crate::Result { use crate::file::{BLOBS_FOLDER, fsync_directory}; - use crate::fs::Fs; let index = crate::Tree::open(config)?; @@ -425,7 +424,7 @@ impl AbstractTree for BlobTree { self.index.table_id_counter.clone(), 64 * 1_024 * 1_024, 0, - level_fs, + level_fs.clone(), )? .set_comparator(self.index.config.comparator.clone()) .use_data_block_restart_interval(data_block_restart_interval) @@ -545,6 +544,7 @@ impl AbstractTree for BlobTree { self.index.id, self.index.config.cache.clone(), self.index.config.descriptor_table.clone(), + level_fs.clone(), pin_filter, pin_index, self.index.config.encryption.clone(), diff --git a/src/compaction/flavour.rs b/src/compaction/flavour.rs index 5d6960d6f..0b82237f7 100644 --- a/src/compaction/flavour.rs +++ b/src/compaction/flavour.rs @@ -371,6 +371,7 @@ impl StandardCompaction { fn consume_writer(self, opts: &Options, dst_lvl: usize) -> crate::Result> { let table_base_folder = self.table_writer.base_path.clone(); + let level_fs = self.table_writer.fs.clone(); let pin_filter = opts.config.filter_block_pinning_policy.get(dst_lvl); let pin_index = opts.config.index_block_pinning_policy.get(dst_lvl); @@ -386,6 +387,7 @@ impl StandardCompaction { opts.tree_id, opts.config.cache.clone(), opts.config.descriptor_table.clone(), + level_fs.clone(), pin_filter, pin_index, opts.config.encryption.clone(), diff --git a/src/config/mod.rs b/src/config/mod.rs index c3ecbd2d5..03ac58b85 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -216,22 +216,17 @@ impl KvSeparationOptions { } /// Tree configuration builder -/// -/// The generic parameter `F` selects the filesystem backend. -/// It defaults to [`StdFs`], so existing code that writes `Config` -/// without a type parameter continues to work unchanged. -pub struct Config { +pub struct Config { /// Folder path #[doc(hidden)] pub path: PathBuf, /// Filesystem backend /// - // All Config fields are `#[doc(hidden)] pub` by convention — callers use - // builder methods or `..Default::default()`, not struct literals directly. - // A `with_fs()` builder will be added when call-site refactoring lands. + /// Defaults to [`StdFs`]. Use [`Config::with_fs`] to plug in an + /// alternative backend such as [`MemFs`](crate::fs::MemFs). #[doc(hidden)] - pub fs: Arc, + pub fs: Arc, /// Per-level filesystem routing for tiered storage. /// @@ -447,6 +442,32 @@ impl Config { } } + /// Sets the filesystem backend. + /// + /// Defaults to [`StdFs`]. Use [`MemFs`](crate::fs::MemFs) for + /// in-memory trees (testing, ephemeral indexes). + /// + /// # Example + /// + /// ``` + /// use lsm_tree::{Config, SequenceNumberCounter}; + /// use lsm_tree::fs::MemFs; + /// + /// let tree = Config::new( + /// "/virtual/tree", + /// SequenceNumberCounter::default(), + /// SequenceNumberCounter::default(), + /// ) + /// .with_fs(MemFs::new()) + /// .open() + /// .unwrap(); + /// ``` + #[must_use] + pub fn with_fs(mut self, fs: F) -> Self { + self.fs = Arc::new(fs); + self + } + /// Opens a tree using the config. /// /// # Errors @@ -582,7 +603,7 @@ mod tests { } } -impl Config { +impl Config { /// Returns the tables folder path and [`Fs`] backend for the given level. /// /// If [`level_routes`](Self::level_routes) has an entry covering this diff --git a/src/file.rs b/src/file.rs index 81bb7f0ca..302581aab 100644 --- a/src/file.rs +++ b/src/file.rs @@ -47,40 +47,54 @@ pub fn read_exact(file: &dyn FsFile, offset: u64, size: usize) -> std::io::Resul Ok(builder.freeze().into()) } -/// Atomically rewrites a file. +/// Atomically rewrites a file via the [`Fs`] trait. +/// +/// Writes `content` to a temporary file in the same directory, fsyncs it, +/// then renames over `path`. This ensures readers never see a partial write. pub fn rewrite_atomic(path: &Path, content: &[u8], fs: &dyn Fs) -> std::io::Result<()> { + use crate::fs::FsOpenOptions; + use std::sync::atomic::{AtomicU64, Ordering}; + + static TEMP_SEQ: AtomicU64 = AtomicU64::new(0); + #[expect( clippy::expect_used, reason = "every file should have a parent directory" )] let folder = path.parent().expect("should have a parent"); - // NOTE: tempfile crate uses std::fs internally; migrating temp-file - // creation to Fs would require a custom implementation. - let mut temp_file = tempfile::NamedTempFile::new_in(folder)?; - temp_file.write_all(content)?; - temp_file.flush()?; - temp_file.as_file_mut().sync_all()?; - temp_file.persist(path)?; - - // Suppress unused-variable warning on Windows where the post-persist - // sync block is skipped (directory fsync is unsupported). - let _ = &fs; - - #[cfg(not(target_os = "windows"))] - { - use crate::fs::FsOpenOptions; - - let file = fs.open(path, &FsOpenOptions::new().read(true))?; + // PID + monotonic seq gives uniqueness within a process and across + // concurrent processes. A crash-then-PID-reuse collision is theoretically + // possible but vanishingly unlikely (requires exact PID reuse AND seq + // counter restart to same value). lsm-tree uses exclusive file locking + // so the same data directory is never written by two processes. + let seq = TEMP_SEQ.fetch_add(1, Ordering::Relaxed); + let pid = std::process::id(); + let tmp_path = folder.join(format!(".tmp_{pid}_{seq}")); + + let result = (|| -> std::io::Result<()> { + let mut file = fs.open( + &tmp_path, + &FsOpenOptions::new().write(true).create_new(true), + )?; + file.write_all(content)?; + file.flush()?; FsFile::sync_all(&*file)?; + drop(file); + // std::fs::rename overwrites existing destinations on all platforms + // (Rust uses MoveFileExW with MOVEFILE_REPLACE_EXISTING on Windows). + fs.rename(&tmp_path, path)?; + Ok(()) + })(); - #[expect( - clippy::expect_used, - reason = "files should always have a parent directory" - )] - let folder = path.parent().expect("should have parent folder"); - fs.sync_directory(folder)?; + if result.is_err() { + // Best-effort cleanup of the temp file on any failure path. + // Safe to call even if fs.open() failed (file never created) — + // remove_file will return NotFound which we ignore. + let _ = fs.remove_file(&tmp_path); } + result?; + fsync_directory(folder, fs)?; Ok(()) } diff --git a/src/file_accessor.rs b/src/file_accessor.rs index c617882b4..2d6316c3f 100644 --- a/src/file_accessor.rs +++ b/src/file_accessor.rs @@ -4,7 +4,8 @@ use crate::GlobalTableId; use crate::descriptor_table::DescriptorTable; -use crate::fs::FsFile; +use crate::fs::{Fs, FsFile, FsOpenOptions}; +use std::path::Path; use std::sync::Arc; /// Allows accessing a file (either cached or pinned) @@ -15,46 +16,87 @@ pub enum FileAccessor { /// This is used in case file descriptor cache is `None` (to skip cache lookups) File(Arc), - /// Access to file descriptor cache - DescriptorTable(Arc), + /// Access to file descriptor cache with [`Fs`]-based fallback for + /// cache misses. + DescriptorTable { + /// The FD cache. + table: Arc, + /// Filesystem backend for opening files on cache miss. + fs: Arc, + }, } impl FileAccessor { #[must_use] pub fn as_descriptor_table(&self) -> Option<&DescriptorTable> { match self { - Self::DescriptorTable(d) => Some(d), + Self::DescriptorTable { table, .. } => Some(table), Self::File(_) => None, } } - #[must_use] - pub fn access_for_table(&self, table_id: &GlobalTableId) -> Option> { + /// Returns a cached table FD or opens the file via [`Fs`] on cache miss. + /// + /// The returned `bool` indicates whether the file descriptor was already + /// cached (`true`) or freshly opened (`false`). + pub fn get_or_open_table( + &self, + table_id: &GlobalTableId, + path: &Path, + ) -> std::io::Result<(Arc, bool)> { match self { - Self::File(fd) => Some(fd.clone()), - Self::DescriptorTable(descriptor_table) => descriptor_table.access_for_table(table_id), - } - } - - pub fn insert_for_table(&self, table_id: GlobalTableId, fd: Arc) { - if let Self::DescriptorTable(descriptor_table) = self { - descriptor_table.insert_for_table(table_id, fd); + Self::File(fd) => Ok((fd.clone(), true)), + Self::DescriptorTable { table, fs } => { + if let Some(fd) = table.access_for_table(table_id) { + return Ok((fd, true)); + } + let fd: Arc = + Arc::from(fs.open(path, &FsOpenOptions::new().read(true))?); + table.insert_for_table(*table_id, fd.clone()); + Ok((fd, false)) + } } } - #[must_use] - pub fn access_for_blob_file(&self, table_id: &GlobalTableId) -> Option> { + /// Returns a cached blob file FD or opens it via [`Fs`] on cache miss. + /// + /// The returned `bool` indicates whether the file descriptor was already + /// cached (`true`) or freshly opened (`false`). + pub fn get_or_open_blob_file( + &self, + table_id: &GlobalTableId, + path: &Path, + ) -> std::io::Result<(Arc, bool)> { match self { - Self::File(fd) => Some(fd.clone()), - Self::DescriptorTable(descriptor_table) => { - descriptor_table.access_for_blob_file(table_id) + Self::File(fd) => Ok((fd.clone(), true)), + Self::DescriptorTable { table, fs } => { + if let Some(fd) = table.access_for_blob_file(table_id) { + return Ok((fd, true)); + } + let fd: Arc = + Arc::from(fs.open(path, &FsOpenOptions::new().read(true))?); + table.insert_for_blob_file(*table_id, fd.clone()); + Ok((fd, false)) } } } + /// Pre-populates the blob file FD cache after creating a new blob file. pub fn insert_for_blob_file(&self, table_id: GlobalTableId, fd: Arc) { - if let Self::DescriptorTable(descriptor_table) = self { - descriptor_table.insert_for_blob_file(table_id, fd); + if let Self::DescriptorTable { table, .. } = self { + table.insert_for_blob_file(table_id, fd); + } + } + + pub fn remove_for_table(&self, table_id: &GlobalTableId) { + if let Self::DescriptorTable { table, .. } = self { + table.remove_for_table(table_id); + } + } + + pub fn remove_for_blob_file(&self, table_id: &GlobalTableId) { + if let Self::DescriptorTable { table, .. } = self { + table.remove_for_blob_file(table_id); } } } @@ -63,7 +105,7 @@ impl std::fmt::Debug for FileAccessor { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { Self::File(_) => write!(f, "FileAccessor::Pinned"), - Self::DescriptorTable(_) => { + Self::DescriptorTable { .. } => { write!(f, "FileAccessor::Cached") } } diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs new file mode 100644 index 000000000..b14219461 --- /dev/null +++ b/src/fs/mem_fs.rs @@ -0,0 +1,910 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! In-memory [`Fs`] implementation for testing and ephemeral trees. +//! +//! All file data lives in memory — there are no durability guarantees. +//! `sync_all`, `sync_data`, and `sync_directory` are deliberate no-ops. +//! +//! # Known limitations +//! +//! - **Tree reopen**: `Tree::open` checks for the `CURRENT` version file +//! via `Path::try_exists()` (bypasses `Fs`). New trees work correctly +//! (the check returns `false` → `create_new`), but reopening an +//! in-memory tree after drop is not supported. +//! - **Version GC**: Old version file cleanup in `SuperVersions::gc` uses +//! `std::fs` directly — stale version entries accumulate in memory +//! until the `MemFs` is dropped. Acceptable for testing / ephemeral use. +//! - **Compaction**: Some code paths in the compaction finalization still +//! bypass the `Fs` trait. Write + flush + point-read works; compaction +//! may fail with `ENOENT` on virtual paths. + +use super::{Fs, FsDirEntry, FsFile, FsMetadata, FsOpenOptions}; +use std::collections::{HashMap, HashSet}; +use std::io::{self, Read, Seek, SeekFrom, Write}; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex, RwLock}; + +// --------------------------------------------------------------------------- +// MemFs +// --------------------------------------------------------------------------- + +/// In-memory [`Fs`] backend for testing and ephemeral in-memory trees. +/// +/// Backed by a `HashMap>>>` — no disk I/O is +/// performed. Clones share the same backing store, and individual file +/// contents are synchronized through a per-file [`Mutex`]. +/// +/// # Example +/// +/// ``` +/// use lsm_tree::fs::MemFs; +/// use std::sync::Arc; +/// +/// let fs = MemFs::new(); +/// let dyn_fs: Arc = Arc::new(fs); +/// ``` +#[derive(Clone, Debug)] +pub struct MemFs { + state: Arc>, +} + +#[derive(Debug, Default)] +struct State { + files: HashMap>>>, + dirs: HashSet, +} + +impl MemFs { + /// Creates a new, empty in-memory filesystem. + #[must_use] + pub fn new() -> Self { + Self { + state: Arc::new(RwLock::new(State::default())), + } + } +} + +impl Default for MemFs { + fn default() -> Self { + Self::new() + } +} + +// --------------------------------------------------------------------------- +// MemFile +// --------------------------------------------------------------------------- + +/// An open file handle backed by an in-memory buffer. +struct MemFile { + data: Arc>>, + cursor: u64, + readable: bool, + writable: bool, + is_append: bool, +} + +/// Copies bytes from `data[pos..]` into `buf`, returning byte count. +fn copy_from_data(buf: &mut [u8], data: &[u8], pos: usize) -> usize { + let available = data.get(pos..).unwrap_or_default(); + let n = buf.len().min(available.len()); + if let (Some(dst), Some(src)) = (buf.get_mut(..n), available.get(..n)) { + dst.copy_from_slice(src); + } + n +} + +#[expect( + clippy::cast_possible_truncation, + reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" +)] +impl Read for MemFile { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if !self.readable { + return Err(io::Error::other("file not opened for reading")); + } + let data = lock(&self.data)?; + let pos = self.cursor as usize; + let n = copy_from_data(buf, &data, pos); + drop(data); + self.cursor += n as u64; + Ok(n) + } +} + +#[expect( + clippy::cast_possible_truncation, + reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" +)] +impl Write for MemFile { + fn write(&mut self, buf: &[u8]) -> io::Result { + if !self.writable { + return Err(io::Error::other("file not opened for writing")); + } + let mut data = lock(&self.data)?; + + let pos = if self.is_append { + data.len() + } else { + self.cursor as usize + }; + + let end = pos + buf.len(); + if end > data.len() { + data.resize(end, 0); + } + if let Some(dst) = data.get_mut(pos..end) { + dst.copy_from_slice(buf); + } + drop(data); + self.cursor = end as u64; + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +#[expect( + clippy::cast_possible_wrap, + clippy::cast_sign_loss, + reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" +)] +impl Seek for MemFile { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + let len = { + let data = lock(&self.data)?; + data.len() as i64 + }; + + let new_pos = match pos { + SeekFrom::Start(n) => i64::try_from(n).unwrap_or(i64::MAX), + SeekFrom::End(n) => len.saturating_add(n), + SeekFrom::Current(n) => (self.cursor as i64).saturating_add(n), + }; + + if new_pos < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "seek to negative position", + )); + } + + self.cursor = new_pos as u64; + Ok(self.cursor) + } +} + +impl FsFile for MemFile { + fn sync_all(&self) -> io::Result<()> { + Ok(()) + } + + fn sync_data(&self) -> io::Result<()> { + Ok(()) + } + + fn metadata(&self) -> io::Result { + let data = lock(&self.data)?; + Ok(FsMetadata { + len: data.len() as u64, + is_dir: false, + is_file: true, + }) + } + + #[expect( + clippy::cast_possible_truncation, + reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" + )] + fn set_len(&self, size: u64) -> io::Result<()> { + lock(&self.data)?.resize(size as usize, 0); + Ok(()) + } + + #[expect( + clippy::cast_possible_truncation, + reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" + )] + fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result { + let data = lock(&self.data)?; + Ok(copy_from_data(buf, &data, offset as usize)) + } + + /// No-op: in-memory files are not shared across processes. `MemFs` is a + /// test/ephemeral backend — cross-process exclusivity is not meaningful. + fn lock_exclusive(&self) -> io::Result<()> { + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// Fs for MemFs +// --------------------------------------------------------------------------- + +#[expect( + clippy::significant_drop_tightening, + reason = "RwLock guards are intentionally held for the duration of each method" +)] +impl Fs for MemFs { + fn open(&self, path: &Path, opts: &FsOpenOptions) -> io::Result> { + let mut state = write_state(&self.state)?; + let path = path.to_path_buf(); + + // Verify parent directory exists (mirrors std::fs behaviour). + if let Some(parent) = path.parent() + && !parent.as_os_str().is_empty() + && parent != Path::new("/") + && !state.dirs.contains(parent) + { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("parent directory does not exist: {}", parent.display()), + )); + } + + let exists = state.files.contains_key(&path); + let is_dir = state.dirs.contains(&path); + let wants_write = opts.write || opts.append; + + // Mirror std::fs::OpenOptions: truncate/create require write access. + if opts.truncate && !wants_write { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "truncate requires write or append access", + )); + } + if (opts.create || opts.create_new) && !wants_write { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "create/create_new requires write or append access", + )); + } + + // Reject creating a file at a path that is already a directory. + if is_dir && (opts.create || opts.create_new) { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("path is a directory: {}", path.display()), + )); + } + + if opts.create_new { + if exists { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("file already exists: {}", path.display()), + )); + } + let data = Arc::new(Mutex::new(Vec::new())); + state.files.insert(path, Arc::clone(&data)); + return Ok(Box::new(MemFile { + data, + cursor: 0, + readable: opts.read, + writable: opts.write || opts.append, + is_append: opts.append, + })); + } + + if exists { + let data = state + .files + .get(&path) + .map(Arc::clone) + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "concurrent removal"))?; + + if opts.truncate { + lock(&data)?.clear(); + } + + let cursor = if opts.append { + lock(&data)?.len() as u64 + } else { + 0 + }; + + Ok(Box::new(MemFile { + data, + cursor, + readable: opts.read, + writable: opts.write || opts.append, + is_append: opts.append, + })) + } else if opts.create { + let data = Arc::new(Mutex::new(Vec::new())); + state.files.insert(path, Arc::clone(&data)); + Ok(Box::new(MemFile { + data, + cursor: 0, + readable: opts.read, + writable: opts.write || opts.append, + is_append: opts.append, + })) + } else { + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("file not found: {}", path.display()), + )) + } + } + + fn create_dir_all(&self, path: &Path) -> io::Result<()> { + let mut state = write_state(&self.state)?; + + // Collect all components first, then validate, then insert. + // This avoids partial insertion if an ancestor is a regular file. + let mut to_create = Vec::new(); + let mut current = path.to_path_buf(); + loop { + if state.files.contains_key(¤t) { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("ancestor is a file: {}", current.display()), + )); + } + to_create.push(current.clone()); + if !current.pop() || current.as_os_str().is_empty() { + break; + } + } + + for dir in to_create { + state.dirs.insert(dir); + } + Ok(()) + } + + fn read_dir(&self, path: &Path) -> io::Result> { + let state = read_state(&self.state)?; + + if !state.dirs.contains(path) { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("directory not found: {}", path.display()), + )); + } + + let mut entries = Vec::new(); + + for file_path in state.files.keys() { + if file_path.parent() == Some(path) + && let Some(name) = file_path.file_name() + { + // Match StdFs contract: reject non-UTF-8 names with InvalidData. + let file_name = name.to_str().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "non-UTF-8 filename in directory {}: {}", + path.display(), + name.display() + ), + ) + })?; + entries.push(FsDirEntry { + path: file_path.clone(), + file_name: file_name.to_owned(), + is_dir: false, + }); + } + } + + for dir_path in &state.dirs { + if dir_path.parent() == Some(path) + && dir_path != path + && let Some(name) = dir_path.file_name() + { + let file_name = name.to_str().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "non-UTF-8 filename in directory {}: {}", + path.display(), + name.display() + ), + ) + })?; + entries.push(FsDirEntry { + path: dir_path.clone(), + file_name: file_name.to_owned(), + is_dir: true, + }); + } + } + + Ok(entries) + } + + fn remove_file(&self, path: &Path) -> io::Result<()> { + let mut state = write_state(&self.state)?; + if state.files.remove(path).is_none() { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("file not found: {}", path.display()), + )); + } + Ok(()) + } + + fn remove_dir_all(&self, path: &Path) -> io::Result<()> { + let mut state = write_state(&self.state)?; + + // Reject files — std::fs::remove_dir_all errors on non-directories. + if state.files.contains_key(path) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("path is not a directory: {}", path.display()), + )); + } + + if !state.dirs.contains(path) { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("path not found: {}", path.display()), + )); + } + + state.files.retain(|p, _| !p.starts_with(path)); + state.dirs.retain(|p| !p.starts_with(path)); + Ok(()) + } + + fn rename(&self, from: &Path, to: &Path) -> io::Result<()> { + let mut state = write_state(&self.state)?; + + // Validate destination parent exists (mirrors std::fs behaviour). + if let Some(parent) = to.parent() + && !parent.as_os_str().is_empty() + && parent != Path::new("/") + && !state.dirs.contains(parent) + { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("destination parent not found: {}", parent.display()), + )); + } + + // Reject renaming onto an existing directory. Otherwise `to` would end + // up present in both `files` and `dirs`, corrupting MemFs state. + if state.dirs.contains(to) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("destination is a directory: {}", to.display()), + )); + } + + // Directory renames are not implemented in MemFs because they require + // updating descendant paths in both `dirs` and `files`. + if state.dirs.contains(from) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("path is a directory: {}", from.display()), + )); + } + + if let Some(data) = state.files.remove(from) { + state.files.insert(to.to_path_buf(), data); + Ok(()) + } else { + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("file not found: {}", from.display()), + )) + } + } + + fn metadata(&self, path: &Path) -> io::Result { + let state = read_state(&self.state)?; + + if let Some(data) = state.files.get(path) { + let d = lock(data)?; + Ok(FsMetadata { + len: d.len() as u64, + is_dir: false, + is_file: true, + }) + } else if state.dirs.contains(path) { + Ok(FsMetadata { + len: 0, + is_dir: true, + is_file: false, + }) + } else { + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("path not found: {}", path.display()), + )) + } + } + + fn sync_directory(&self, _path: &Path) -> io::Result<()> { + Ok(()) + } + + fn exists(&self, path: &Path) -> io::Result { + let state = read_state(&self.state)?; + Ok(state.files.contains_key(path) || state.dirs.contains(path)) + } +} + +// --------------------------------------------------------------------------- +// Lock helpers — convert PoisonError to io::Error +// --------------------------------------------------------------------------- + +fn lock(m: &Mutex) -> io::Result> { + m.lock().map_err(|_| io::Error::other("mutex poisoned")) +} + +fn read_state(rw: &RwLock) -> io::Result> { + rw.read().map_err(|_| io::Error::other("rwlock poisoned")) +} + +fn write_state(rw: &RwLock) -> io::Result> { + rw.write().map_err(|_| io::Error::other("rwlock poisoned")) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +#[expect( + clippy::unwrap_used, + clippy::indexing_slicing, + clippy::unnecessary_wraps, + reason = "test code" +)] +mod tests { + use super::*; + use std::io::{Read, Write}; + use std::sync::Arc; + use test_log::test; + + #[test] + fn create_read_write() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/data"))?; + + let path = Path::new("/data/test.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello world")?; + drop(file); + + let opts = FsOpenOptions::new().read(true); + let mut file = fs.open(path, &opts)?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + assert_eq!(buf, "hello world"); + + Ok(()) + } + + #[test] + fn directory_operations() -> io::Result<()> { + let fs = MemFs::new(); + let nested = PathBuf::from("/a/b/c"); + fs.create_dir_all(&nested)?; + assert!(fs.exists(&nested)?); + assert!(fs.exists(Path::new("/a/b"))?); + + let file_path = nested.join("data.bin"); + let opts = FsOpenOptions::new().write(true).create_new(true); + let mut file = fs.open(&file_path, &opts)?; + file.write_all(b"data")?; + drop(file); + + let entries = fs.read_dir(&nested)?; + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].file_name, "data.bin"); + assert!(!entries[0].is_dir); + + let meta = fs.metadata(&file_path)?; + assert!(meta.is_file); + assert!(!meta.is_dir); + assert_eq!(meta.len, 4); + + fs.remove_file(&file_path)?; + assert!(!fs.exists(&file_path)?); + + fs.remove_dir_all(Path::new("/a"))?; + assert!(!fs.exists(Path::new("/a"))?); + assert!(!fs.exists(&nested)?); + + Ok(()) + } + + #[test] + fn rename_file() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let src = Path::new("/dir/src.txt"); + let dst = Path::new("/dir/dst.txt"); + + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(src, &opts)?; + file.write_all(b"content")?; + drop(file); + + fs.rename(src, dst)?; + assert!(!fs.exists(src)?); + assert!(fs.exists(dst)?); + + Ok(()) + } + + #[test] + fn sync_directory_is_noop() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + fs.sync_directory(Path::new("/dir"))?; + Ok(()) + } + + #[test] + fn file_metadata_and_set_len() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/meta.bin"); + let opts = FsOpenOptions::new().write(true).create(true).read(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"12345")?; + + let meta = file.metadata()?; + assert!(meta.is_file); + assert_eq!(meta.len, 5); + + file.set_len(3)?; + let meta = file.metadata()?; + assert_eq!(meta.len, 3); + + Ok(()) + } + + #[test] + fn read_at_positional() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/pread.bin"); + let opts = FsOpenOptions::new().write(true).create(true).read(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello world")?; + + let mut buf = [0u8; 5]; + let n = file.read_at(&mut buf, 6)?; + assert_eq!(n, 5); + assert_eq!(&buf, b"world"); + + let n = file.read_at(&mut buf, 0)?; + assert_eq!(n, 5); + assert_eq!(&buf, b"hello"); + + // Past EOF + let n = file.read_at(&mut buf, 100)?; + assert_eq!(n, 0); + + Ok(()) + } + + #[test] + fn lock_exclusive_is_noop() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/lock"); + let opts = FsOpenOptions::new().write(true).create(true); + let file = fs.open(path, &opts)?; + file.lock_exclusive()?; + Ok(()) + } + + #[test] + fn open_create_new_fails_on_existing() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/file"); + let opts = FsOpenOptions::new().write(true).create_new(true); + fs.open(path, &opts)?; + + let err = fs.open(path, &opts).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::AlreadyExists); + Ok(()) + } + + #[test] + fn open_nonexistent_without_create_fails() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/missing"); + let opts = FsOpenOptions::new().read(true); + let err = fs.open(path, &opts).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn open_fails_when_parent_missing() -> io::Result<()> { + let fs = MemFs::new(); + let path = Path::new("/no/such/dir/file"); + let opts = FsOpenOptions::new().write(true).create(true); + let err = fs.open(path, &opts).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn truncate_on_open() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/trunc.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello world")?; + drop(file); + + let opts = FsOpenOptions::new().write(true).truncate(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hi")?; + drop(file); + + let meta = fs.metadata(path)?; + assert_eq!(meta.len, 2); + Ok(()) + } + + #[test] + fn append_mode() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/append.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello")?; + drop(file); + + let opts = FsOpenOptions::new().write(true).append(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b" world")?; + drop(file); + + let opts = FsOpenOptions::new().read(true); + let mut file = fs.open(path, &opts)?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + assert_eq!(buf, "hello world"); + Ok(()) + } + + #[test] + fn seek_and_overwrite() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/seek.bin"); + let opts = FsOpenOptions::new().write(true).create(true).read(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello world")?; + + file.seek(SeekFrom::Start(6))?; + file.write_all(b"rust!")?; + + file.seek(SeekFrom::Start(0))?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + assert_eq!(buf, "hello rust!"); + + Ok(()) + } + + #[test] + fn object_safety() -> io::Result<()> { + let fs: Arc = Arc::new(MemFs::new()); + let bogus = Path::new("/nonexistent"); + assert!(!fs.exists(bogus)?); + Ok(()) + } + + #[test] + fn metadata_directory() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/mydir"))?; + let meta = fs.metadata(Path::new("/mydir"))?; + assert!(meta.is_dir); + assert!(!meta.is_file); + Ok(()) + } + + #[test] + fn read_dir_with_subdirectory() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/root/subdir"))?; + + let file_path = Path::new("/root/file.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(file_path, &opts)?; + + let mut entries = fs.read_dir(Path::new("/root"))?; + entries.sort_by(|a, b| a.file_name.cmp(&b.file_name)); + assert_eq!(entries.len(), 2); + assert_eq!(entries[0].file_name, "file.txt"); + assert!(!entries[0].is_dir); + assert_eq!(entries[1].file_name, "subdir"); + assert!(entries[1].is_dir); + Ok(()) + } + + #[test] + fn remove_file_nonexistent_fails() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs.remove_file(Path::new("/missing")).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn rename_nonexistent_fails() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs + .rename(Path::new("/missing"), Path::new("/dst")) + .err() + .unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn read_dir_nonexistent_fails() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs.read_dir(Path::new("/missing")).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn metadata_nonexistent_fails() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs.metadata(Path::new("/missing")).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn sync_data_is_noop() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let path = Path::new("/dir/file"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"data")?; + file.sync_data()?; + Ok(()) + } + + #[test] + fn clones_share_state() -> io::Result<()> { + let fs1 = MemFs::new(); + let fs2 = fs1.clone(); + + fs1.create_dir_all(Path::new("/shared"))?; + let path = Path::new("/shared/file.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs1.open(path, &opts)?; + file.write_all(b"shared data")?; + drop(file); + + assert!(fs2.exists(path)?); + let meta = fs2.metadata(path)?; + assert_eq!(meta.len, 11); + Ok(()) + } +} diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 12a4cba5b..2d5ce26d5 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -23,11 +23,13 @@ //! - **macOS / BSD**: no batched I/O API exists (`dispatch_io` and `kqueue` //! do not help for storage I/O patterns); [`StdFs`] is the correct choice +mod mem_fs; mod std_fs; #[cfg(all(target_os = "linux", feature = "io-uring"))] mod io_uring_fs; +pub use mem_fs::MemFs; pub use std_fs::StdFs; #[cfg(all(target_os = "linux", feature = "io-uring"))] @@ -274,6 +276,10 @@ pub trait Fs: Send + Sync + 'static { /// Renames a file or directory from `from` to `to`. /// + /// If `to` already exists as a regular file, it is atomically replaced. + /// This is required by [`rewrite_atomic`](crate::file::rewrite_atomic) + /// for crash-safe version pointer updates. + /// /// # Errors /// /// Returns an I/O error if the rename fails. diff --git a/src/table/inner.rs b/src/table/inner.rs index 96074a916..4d9a63e01 100644 --- a/src/table/inner.rs +++ b/src/table/inner.rs @@ -12,6 +12,7 @@ use crate::{ comparator::SharedComparator, encryption::EncryptionProvider, file_accessor::FileAccessor, + fs::Fs, range_tombstone::RangeTombstone, table::{IndexBlock, filter::block::FilterBlock}, tree::inner::TreeId, @@ -29,6 +30,9 @@ pub struct Inner { #[doc(hidden)] pub(crate) file_accessor: FileAccessor, + /// Filesystem backend for file operations (open, remove, etc.). + pub(crate) fs: Arc, + /// Parsed metadata #[doc(hidden)] pub metadata: ParsedMeta, @@ -97,7 +101,7 @@ impl Drop for Inner { if self.is_deleted.load(std::sync::atomic::Ordering::Acquire) { log::trace!("Cleanup deleted table {global_id:?} at {:?}", self.path); - if let Err(e) = std::fs::remove_file(&*self.path) { + if let Err(e) = self.fs.remove_file(&self.path) { log::warn!( "Failed to cleanup deleted table {global_id:?} at {:?}: {e:?}", self.path, diff --git a/src/table/mod.rs b/src/table/mod.rs index b19379b26..843248278 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -40,7 +40,7 @@ use crate::{ comparator::SharedComparator, descriptor_table::DescriptorTable, file_accessor::FileAccessor, - fs::FsFile, + fs::{Fs, FsFile, FsOpenOptions}, range_tombstone::RangeTombstone, table::{ block::{BlockType, ParsedItem}, @@ -119,23 +119,14 @@ impl Table { Ok(if let Some(handle) = &self.regions.linked_blob_files { let table_id = self.global_id(); - let (fd, fd_cache_miss) = - if let Some(fd) = self.file_accessor.access_for_table(&table_id) { - (fd, false) - } else { - let fd: Arc = Arc::new(std::fs::File::open(&*self.path)?); - (fd, true) - }; + let (fd, _cache_hit) = self + .file_accessor + .get_or_open_table(&table_id, &self.path)?; // Read the exact region using pread-style helper let buf = crate::file::read_exact(fd.as_ref(), *handle.offset(), handle.size() as usize)?; - // If we opened the file here, cache the FD for future accesses - if fd_cache_miss { - self.file_accessor.insert_for_table(table_id, fd); - } - // Parse the buffer let mut reader = &buf[..]; let len = reader.read_u32::()?; @@ -489,6 +480,7 @@ impl Table { tree_id: TreeId, cache: Arc, descriptor_table: Option>, + fs: Arc, pin_filter: bool, pin_index: bool, encryption: Option>, @@ -501,7 +493,7 @@ impl Table { use std::sync::atomic::AtomicBool; log::debug!("Recovering table from file {}", file_path.display()); - let mut file = std::fs::File::open(&file_path)?; + let mut file = fs.open(&file_path, &FsOpenOptions::new().read(true))?; let file_path = Arc::new(file_path); #[cfg(feature = "metrics")] @@ -514,7 +506,7 @@ impl Table { log::trace!("Reading meta block, with meta_ptr={:?}", regions.metadata); let metadata = - ParsedMeta::load_with_handle(&file, ®ions.metadata, encryption.as_deref())?; + ParsedMeta::load_with_handle(&*file, ®ions.metadata, encryption.as_deref())?; // Fail-fast: if this table was written with dictionary compression, // verify the caller provided the matching dictionary. Without this @@ -531,10 +523,13 @@ impl Table { } } - let file_handle: Arc = Arc::new(file); + let file_handle: Arc = Arc::from(file); let file_accessor = if let Some(dt) = descriptor_table { - FileAccessor::DescriptorTable(dt) + FileAccessor::DescriptorTable { + table: dt, + fs: fs.clone(), + } } else { FileAccessor::File(file_handle.clone()) }; @@ -705,6 +700,7 @@ impl Table { cache, file_accessor, + fs, block_index: Arc::new(block_index), diff --git a/src/table/multi_writer.rs b/src/table/multi_writer.rs index fd029ea91..6b8e61028 100644 --- a/src/table/multi_writer.rs +++ b/src/table/multi_writer.rs @@ -15,7 +15,7 @@ use std::{path::PathBuf, sync::Arc}; /// /// This results in a sorted "run" of tables pub struct MultiWriter { - fs: Arc, + pub(crate) fs: Arc, pub(crate) base_path: PathBuf, @@ -647,6 +647,7 @@ mod tests { 0, cache.clone(), None, + Arc::new(StdFs), false, false, None, @@ -737,6 +738,7 @@ mod tests { 0, cache.clone(), None, + Arc::new(StdFs), false, false, None, diff --git a/src/table/tests.rs b/src/table/tests.rs index f77321631..fe4467343 100644 --- a/src/table/tests.rs +++ b/src/table/tests.rs @@ -82,6 +82,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, false, None, @@ -99,7 +100,7 @@ fn test_with_table_impl( assert_eq!(0, table.pinned_filter_size(), "should not pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -116,6 +117,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), true, false, None, @@ -133,7 +135,7 @@ fn test_with_table_impl( // assert!(table.pinned_filter_size() > 0, "should pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -150,6 +152,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, true, None, @@ -167,7 +170,7 @@ fn test_with_table_impl( assert_eq!(0, table.pinned_filter_size(), "should not pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -184,6 +187,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -201,7 +205,7 @@ fn test_with_table_impl( // assert!(table.pinned_filter_size() > 0, "should pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -218,6 +222,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), None, + Arc::new(StdFs), true, true, None, @@ -275,6 +280,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, false, None, @@ -291,7 +297,7 @@ fn test_with_table_impl( assert_eq!(0, table.pinned_filter_size(), "should not pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -308,6 +314,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), true, false, None, @@ -324,7 +331,7 @@ fn test_with_table_impl( // assert!(table.pinned_filter_size() > 0, "should pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -341,6 +348,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, true, None, @@ -358,7 +366,7 @@ fn test_with_table_impl( // assert_eq!(0, table.pinned_filter_size(), "should not pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -375,6 +383,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -392,7 +401,7 @@ fn test_with_table_impl( // assert!(table.pinned_filter_size() > 0, "should pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -409,6 +418,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), None, + Arc::new(StdFs), true, true, None, @@ -1420,6 +1430,7 @@ fn table_read_fuzz_1() -> crate::Result<()> { 0, Arc::new(crate::Cache::with_capacity_bytes(0)), Some(Arc::new(crate::DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -1498,6 +1509,7 @@ fn table_partitioned_index() -> crate::Result<()> { 0, Arc::new(crate::Cache::with_capacity_bytes(0)), Some(Arc::new(crate::DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -1612,6 +1624,7 @@ fn table_global_seqno() -> crate::Result<()> { 0, Arc::new(crate::Cache::with_capacity_bytes(0)), Some(Arc::new(crate::DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -1823,6 +1836,7 @@ fn load_block_range_tombstone_metrics() -> crate::Result<()> { // with how filter and index recovery reads are handled. Arc::new(Cache::with_capacity_bytes(10_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, false, None, @@ -1930,6 +1944,7 @@ fn load_block_cache_hit_rejects_wrong_block_type() -> crate::Result<()> { 0, Arc::new(Cache::with_capacity_bytes(10_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, false, None, @@ -2253,6 +2268,7 @@ fn two_level_index_scan_skips_empty_child_partition() -> crate::Result<()> { 0, Arc::new(crate::Cache::with_capacity_bytes(0)), Some(Arc::new(crate::DescriptorTable::new(10))), + Arc::new(StdFs), true, false, None, diff --git a/src/table/util.rs b/src/table/util.rs index cd9f480d3..2cf0a02ac 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -5,9 +5,9 @@ use super::{Block, BlockHandle, GlobalTableId}; use crate::{ Cache, CompressionType, KeyRange, Table, encryption::EncryptionProvider, - file_accessor::FileAccessor, fs::FsFile, table::block::BlockType, version::run::Ranged, + file_accessor::FileAccessor, table::block::BlockType, version::run::Ranged, }; -use std::{path::Path, sync::Arc}; +use std::path::Path; #[cfg(feature = "metrics")] use crate::metrics::Metrics; @@ -83,23 +83,17 @@ pub fn load_block( return Ok(block); } - let (fd, fd_cache_miss) = if let Some(cached_fd) = file_accessor.access_for_table(&table_id) { - #[cfg(feature = "metrics")] - metrics.table_file_opened_cached.fetch_add(1, Relaxed); + #[cfg(feature = "metrics")] + let (fd, cache_hit) = file_accessor.get_or_open_table(&table_id, path)?; + #[cfg(not(feature = "metrics"))] + let (fd, _) = file_accessor.get_or_open_table(&table_id, path)?; - (cached_fd, false) + #[cfg(feature = "metrics")] + if cache_hit { + metrics.table_file_opened_cached.fetch_add(1, Relaxed); } else { - let file = std::fs::File::open(path)?; - - #[cfg(feature = "metrics")] metrics.table_file_opened_uncached.fetch_add(1, Relaxed); - - // The if-branch returns Arc from the descriptor - // table, so the else-branch needs an explicit type annotation - // to trigger unsizing coercion. - let fd: Arc = Arc::new(file); - (fd, true) - }; + } let block = Block::from_file( fd.as_ref(), @@ -149,11 +143,6 @@ pub fn load_block( } } - // Cache FD - if fd_cache_miss { - file_accessor.insert_for_table(table_id, fd); - } - cache.insert_block(table_id, handle.offset(), block.clone()); Ok(block) diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index 467aaf81b..3405d0d4e 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -4,11 +4,12 @@ use super::Tree; use crate::{ - BlobIndirection, SeqNo, UserKey, UserValue, config::FilterPolicyEntry, + BlobIndirection, SeqNo, UserKey, UserValue, config::FilterPolicyEntry, fs::Fs, table::multi_writer::MultiWriter, }; use std::cmp::Ordering; use std::path::PathBuf; +use std::sync::Arc; pub const INITIAL_CANONICAL_LEVEL: usize = 1; @@ -20,6 +21,8 @@ pub const INITIAL_CANONICAL_LEVEL: usize = 1; /// using the same table writer configuration that is used for flush and compaction. pub struct Ingestion<'a> { pub(crate) folder: PathBuf, + /// Level-routed filesystem backend for the target level. + pub(crate) level_fs: Arc, tree: &'a Tree, pub(crate) writer: MultiWriter, seqno: SeqNo, @@ -54,7 +57,7 @@ impl<'a> Ingestion<'a> { tree.table_id_counter.clone(), 64 * 1_024 * 1_024, 6, - level_fs, + level_fs.clone(), )? .set_comparator(tree.config.comparator.clone()) .use_bloom_policy({ @@ -116,6 +119,7 @@ impl<'a> Ingestion<'a> { Ok(Self { folder, + level_fs, tree, writer, seqno: 0, @@ -311,6 +315,7 @@ impl<'a> Ingestion<'a> { self.tree.id, self.tree.config.cache.clone(), self.tree.config.descriptor_table.clone(), + self.level_fs.clone(), false, false, self.tree.config.encryption.clone(), diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 997bea00d..f532399cd 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -392,7 +392,7 @@ impl AbstractTree for Tree { self.table_id_counter.clone(), 64 * 1_024 * 1_024, 0, - level_fs, + level_fs.clone(), )? .set_comparator(self.config.comparator.clone()) .use_data_block_restart_interval(data_block_restart_interval) @@ -453,6 +453,7 @@ impl AbstractTree for Tree { self.id, self.config.cache.clone(), self.config.descriptor_table.clone(), + level_fs.clone(), pin_filter, pin_index, self.config.encryption.clone(), @@ -1444,7 +1445,6 @@ impl Tree { /// Creates a new LSM-tree in a directory. fn create_new(config: Config) -> crate::Result { use crate::file::fsync_directory; - use crate::fs::Fs; let path = config.path.clone(); log::trace!("Creating LSM-tree at {}", path.display()); @@ -1488,7 +1488,7 @@ impl Tree { config: &Config, #[cfg(feature = "metrics")] metrics: &Arc, ) -> crate::Result { - use crate::{TableId, file::fsync_directory, fs::Fs}; + use crate::{TableId, file::fsync_directory}; let tree_path = tree_path.as_ref(); @@ -1598,6 +1598,7 @@ impl Tree { tree_id, config.cache.clone(), config.descriptor_table.clone(), + folder_fs.clone(), pin_filter, pin_index, config.encryption.clone(), @@ -1680,6 +1681,7 @@ impl Tree { &recovery.blob_file_ids, tree_id, config.descriptor_table.as_ref(), + &config.fs, )?; let version = Version::from_recovery(recovery, &tables, &blob_files)?; diff --git a/src/vlog/accessor.rs b/src/vlog/accessor.rs index 7757ebdd7..50a22c34b 100644 --- a/src/vlog/accessor.rs +++ b/src/vlog/accessor.rs @@ -4,11 +4,10 @@ use crate::{ Cache, GlobalTableId, TreeId, UserValue, - fs::FsFile, version::BlobFileList, vlog::{ValueHandle, blob_file::reader::Reader}, }; -use std::{path::Path, sync::Arc}; +use std::path::Path; pub struct Accessor<'a>(&'a BlobFileList); @@ -35,23 +34,13 @@ impl<'a> Accessor<'a> { let bf_id = GlobalTableId::from((tree_id, blob_file.id())); - let (file, fd_cache_miss) = - if let Some(cached_fd) = blob_file.file_accessor().access_for_blob_file(&bf_id) { - (cached_fd, false) - } else { - let file: Arc = Arc::new(std::fs::File::open( - base_path.join(vhandle.blob_file_id.to_string()), - )?); - (file, true) - }; + let (file, _) = blob_file + .file_accessor() + .get_or_open_blob_file(&bf_id, &base_path.join(vhandle.blob_file_id.to_string()))?; let value = Reader::new(blob_file, file.as_ref()).get(key, vhandle)?; cache.insert_blob(tree_id, vhandle, value.clone()); - if fd_cache_miss { - blob_file.file_accessor().insert_for_blob_file(bf_id, file); - } - Ok(Some(value)) } } diff --git a/src/vlog/blob_file/multi_writer.rs b/src/vlog/blob_file/multi_writer.rs index 4af702f76..14fae2475 100644 --- a/src/vlog/blob_file/multi_writer.rs +++ b/src/vlog/blob_file/multi_writer.rs @@ -116,7 +116,7 @@ impl MultiWriter { old_writer, self.passthrough_compression, self.descriptor_table.clone(), - &*self.fs, + &self.fs, )?; self.results.extend(blob_file); @@ -127,7 +127,7 @@ impl MultiWriter { writer: Writer, passthrough_compression: CompressionType, descriptor_table: Option>, - fs: &dyn Fs, + fs: &Arc, ) -> crate::Result> { if writer.item_count > 0 { let blob_file_id = writer.blob_file_id; @@ -143,13 +143,16 @@ impl MultiWriter { let (metadata, checksum) = writer.finish()?; - // NOTE: Read-back uses std::fs::File because FileAccessor/DescriptorTable - // expect Arc. Migrating the read path to Fs is a separate scope. - let file: Arc = Arc::new(std::fs::File::open(&path)?); - let file_accessor = descriptor_table.map_or_else( - || FileAccessor::File(file.clone()), - FileAccessor::DescriptorTable, - ); + let file: Arc = + Arc::from(fs.open(&path, &crate::fs::FsOpenOptions::new().read(true))?); + let file_accessor = if let Some(dt) = descriptor_table { + FileAccessor::DescriptorTable { + table: dt, + fs: fs.clone(), + } + } else { + FileAccessor::File(file.clone()) + }; file_accessor.insert_for_blob_file((tree_id, blob_file_id).into(), file); let blob_file = BlobFile(Arc::new(BlobFileInner { @@ -258,7 +261,7 @@ impl MultiWriter { self.active_writer, self.passthrough_compression, self.descriptor_table.clone(), - &*self.fs, + &self.fs, )?; self.results.extend(blob_file); Ok(self.results) diff --git a/src/vlog/mod.rs b/src/vlog/mod.rs index 383ce0e51..da638250d 100644 --- a/src/vlog/mod.rs +++ b/src/vlog/mod.rs @@ -16,6 +16,7 @@ pub use { use crate::{ Checksum, DescriptorTable, TreeId, file_accessor::FileAccessor, + fs::Fs, vlog::blob_file::{Inner as BlobFileInner, Metadata}, }; use std::{ @@ -28,10 +29,19 @@ pub fn recover_blob_files( ids: &[(BlobFileId, Checksum)], tree_id: TreeId, descriptor_table: Option<&Arc>, + fs: &Arc, ) -> crate::Result<(Vec, Vec)> { - if !folder.try_exists()? { - return Ok((vec![], vec![])); - } + // Recover directly from read_dir; treat NotFound as empty (avoids TOCTOU + // with a separate exists() check). This is correct even when `ids` is + // non-empty: the blobs folder may not exist for standard (non-blob) trees, + // and callers handle missing blob files via orphan detection. + let entries = match fs.read_dir(folder) { + Ok(entries) => entries, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + return Ok((vec![], vec![])); + } + Err(e) => return Err(e.into()), + }; let cnt = ids.len(); @@ -46,9 +56,8 @@ pub fn recover_blob_files( let mut blob_files = Vec::with_capacity(ids.len()); let mut orphaned_blob_files = vec![]; - for (idx, dirent) in std::fs::read_dir(folder)?.enumerate() { - let dirent = dirent?; - let file_name = dirent.file_name(); + for (idx, dirent) in entries.into_iter().enumerate() { + let file_name = &dirent.file_name; // https://en.wikipedia.org/wiki/.DS_Store if file_name == ".DS_Store" { @@ -56,22 +65,22 @@ pub fn recover_blob_files( } // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats - if file_name.to_string_lossy().starts_with("._") { + if file_name.starts_with("._") { continue; } - let blob_file_name = file_name.to_str().ok_or_else(|| { - log::error!("invalid table file name {}", file_name.display()); - crate::Error::Unrecoverable - })?; + // Skip directories before parsing — non-numeric directory names would + // fail the parse and abort recovery. + if dirent.is_dir { + continue; + } - let blob_file_id = blob_file_name.parse::().map_err(|e| { - log::error!("invalid table file name {blob_file_name:?}: {e:?}"); + let blob_file_id = file_name.parse::().map_err(|e| { + log::error!("invalid blob file name {file_name:?}: {e:?}"); crate::Error::Unrecoverable })?; - let blob_file_path = dirent.path(); - assert!(!blob_file_path.is_dir()); + let blob_file_path = &dirent.path; if let Some(&(_, checksum)) = ids.iter().find(|(id, _)| id == &blob_file_id) { log::trace!( @@ -79,10 +88,10 @@ pub fn recover_blob_files( blob_file_path.display(), ); - let file = std::fs::File::open(&blob_file_path)?; + let mut file = fs.open(blob_file_path, &crate::fs::FsOpenOptions::new().read(true))?; let meta = { - let reader = sfa::Reader::new(&blob_file_path)?; + let reader = sfa::Reader::from_reader(&mut file)?; let toc = reader.toc(); let metadata_section = toc.section(b"meta") @@ -94,20 +103,27 @@ pub fn recover_blob_files( let metadata_len = usize::try_from(metadata_section.len()) .map_err(|_| crate::Error::Unrecoverable)?; let metadata_slice = - crate::file::read_exact(&file, metadata_section.pos(), metadata_len)?; + crate::file::read_exact(&*file, metadata_section.pos(), metadata_len)?; Metadata::from_slice(&metadata_slice)? }; let file_accessor = if let Some(dt) = descriptor_table.cloned() { - FileAccessor::DescriptorTable(dt) + // Pre-populate the FD cache with the handle we already opened + // so the first read doesn't need to reopen. + let global_id = (tree_id, blob_file_id).into(); + dt.insert_for_blob_file(global_id, Arc::from(file)); + FileAccessor::DescriptorTable { + table: dt, + fs: fs.clone(), + } } else { - FileAccessor::File(Arc::new(file)) + FileAccessor::File(Arc::from(file)) }; blob_files.push(BlobFile(Arc::new(BlobFileInner { id: blob_file_id, - path: blob_file_path, + path: blob_file_path.clone(), meta, is_deleted: AtomicBool::new(false), checksum, @@ -136,15 +152,39 @@ pub fn recover_blob_files( pub type BlobFileId = u64; #[cfg(test)] +#[expect(clippy::unwrap_used, reason = "test code")] mod tests { use super::*; use test_log::test; #[test] - fn vlog_recovery_missing_blob_file() { - assert!(matches!( - recover_blob_files(Path::new("."), &[(0, Checksum::from_raw(0))], 0, None), - Err(crate::Error::Unrecoverable), - )); + fn vlog_recovery_missing_blob_file_returns_unrecoverable() { + // Manifest says blob id=0 exists, but the blobs folder is empty. + // Recovery should fail with Unrecoverable because blob_files.len() < ids.len(). + let dir = tempfile::tempdir().unwrap(); + let result = recover_blob_files( + dir.path(), + &[(0, Checksum::from_raw(0))], + 0, + None, + &(Arc::new(crate::fs::StdFs) as Arc), + ); + assert!(matches!(result, Err(crate::Error::Unrecoverable))); + } + + #[test] + fn vlog_recovery_nonexistent_folder_returns_empty() { + let dir = tempfile::tempdir().unwrap(); + let missing = dir.path().join("no_such_dir"); + let (blob_files, orphans) = recover_blob_files( + &missing, + &[(0, Checksum::from_raw(0))], + 0, + None, + &(Arc::new(crate::fs::StdFs) as Arc), + ) + .unwrap(); + assert!(blob_files.is_empty()); + assert!(orphans.is_empty()); } } diff --git a/tests/mem_fs_tree.rs b/tests/mem_fs_tree.rs new file mode 100644 index 000000000..dca08e7b0 --- /dev/null +++ b/tests/mem_fs_tree.rs @@ -0,0 +1,164 @@ +use lsm_tree::fs::MemFs; +// Guard trait import required for IterGuardImpl::into_inner() method dispatch. +use lsm_tree::{AbstractTree, Config, Guard, SeqNo, SequenceNumberCounter}; +use test_log::test; + +#[test] +fn open_tree_with_memfs() -> lsm_tree::Result<()> { + let tree = Config::new( + "/virtual/tree", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(MemFs::new()) + .open()?; + + assert!(tree.is_empty(SeqNo::MAX, None)?); + + tree.insert("key1", "value1", 0); + tree.insert("key2", "value2", 1); + tree.insert("key3", "value3", 2); + + assert_eq!(tree.len(SeqNo::MAX, None)?, 3); + + let val = tree.get("key2", SeqNo::MAX)?.expect("key2 should exist"); + assert_eq!(&*val, b"value2"); + + Ok(()) +} + +#[test] +fn memfs_tree_flush_and_read() -> lsm_tree::Result<()> { + let tree = Config::new( + "/virtual/flush", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(MemFs::new()) + .open()?; + + for i in 0u64..100 { + tree.insert(format!("key_{i:05}"), format!("val_{i}"), i); + } + + // Flush memtable to SST (in-memory via MemFs) + tree.flush_active_memtable(0)?; + + // Reads should still work after flush (from SST) + let val = tree + .get("key_00050", SeqNo::MAX)? + .expect("key should exist after flush"); + assert_eq!(&*val, b"val_50"); + + assert_eq!(tree.len(SeqNo::MAX, None)?, 100); + + Ok(()) +} + +#[test] +fn memfs_tree_delete_and_range() -> lsm_tree::Result<()> { + let tree = Config::new( + "/virtual/range", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(MemFs::new()) + .open()?; + + tree.insert("a", "1", 0); + tree.insert("b", "2", 1); + tree.insert("c", "3", 2); + tree.remove("b", 3); + + let items: Vec<_> = tree + .iter(SeqNo::MAX, None) + .map(|guard| { + let (k, v) = guard.into_inner().unwrap(); + ( + String::from_utf8(k.to_vec()).unwrap(), + String::from_utf8(v.to_vec()).unwrap(), + ) + }) + .collect(); + + assert_eq!( + items, + vec![("a".into(), "1".into()), ("c".into(), "3".into())] + ); + + Ok(()) +} + +#[test] +fn memfs_tree_multiple_flushes() -> lsm_tree::Result<()> { + let tree = Config::new( + "/virtual/multi_flush", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(MemFs::new()) + .open()?; + + // First batch + for i in 0u64..50 { + tree.insert(format!("key_{i:05}"), format!("batch1_{i}"), i); + } + tree.flush_active_memtable(0)?; + + // Second batch — some overwrites, some new + for i in 25u64..75 { + tree.insert(format!("key_{i:05}"), format!("batch2_{i}"), 50 + i); + } + tree.flush_active_memtable(0)?; + + // Verify latest values + let val = tree + .get("key_00030", SeqNo::MAX)? + .expect("overwritten key should exist"); + assert_eq!(&*val, b"batch2_30"); + + let val = tree + .get("key_00010", SeqNo::MAX)? + .expect("original key should exist"); + assert_eq!(&*val, b"batch1_10"); + + assert_eq!(tree.len(SeqNo::MAX, None)?, 75); + + Ok(()) +} + +#[test] +fn memfs_shared_across_trees() -> lsm_tree::Result<()> { + let fs = MemFs::new(); + + let tree1 = Config::new( + "/virtual/tree1", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(fs.clone()) + .open()?; + + let tree2 = Config::new( + "/virtual/tree2", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(fs) + .open()?; + + tree1.insert("from_tree1", "hello", 0); + tree2.insert("from_tree2", "world", 0); + + assert!(tree1.get("from_tree1", SeqNo::MAX)?.is_some()); + assert!(tree1.get("from_tree2", SeqNo::MAX)?.is_none()); + assert!(tree2.get("from_tree2", SeqNo::MAX)?.is_some()); + assert!(tree2.get("from_tree1", SeqNo::MAX)?.is_none()); + + Ok(()) +} + +// NOTE: Compaction is not yet fully supported with MemFs. +// There are remaining `std::fs` bypass points in the compaction +// finalization path that produce ENOENT when running fully in-memory. +// Tracked as a known limitation — see mem_fs.rs module docs.