From d051b117533ad2891ce8ff2b64ed816f5232a67a Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Fri, 3 Apr 2026 19:55:21 +0300 Subject: [PATCH 01/26] feat(fs): add MemFs in-memory filesystem and pipe Fs through all I/O paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement MemFs — a HashMap-backed in-memory Fs implementation for testing and ephemeral trees. Wire Arc through the entire I/O stack so that all file operations go through the pluggable Fs trait. MemFs implementation: - MemFile with Read/Write/Seek/read_at backed by Arc>> - Full Fs trait: open, create_dir_all, read_dir, remove_file, remove_dir_all, rename, metadata, sync_directory, exists - Validates parent dirs, rejects dir-file conflicts, mirrors std::fs semantics (InvalidInput for truncate/create without write access) - 20 unit tests + 5 integration tests (open tree, flush, read, range) Config changes: - De-generify Config to Config with Arc - Add Config::with_fs() builder for alternative backends Fs piping (all file opens now go through Fs): - rewrite_atomic: Fs-native temp write with PID+seq naming and best-effort cleanup on all failure paths (replaces tempfile crate) - Table::recover accepts Arc, opens files through Fs - FileAccessor::get_or_open_table returns (fd, cache_hit) for metrics - Wire correct level-routed Fs to all 6 Table::recover call sites - vlog recovery: Fs-based read_dir, exists, sfa::Reader::from_reader - Blob recovery: pre-populate DescriptorTable FD cache, skip dirs before parse, eliminate TOCTOU with direct read_dir - Document Fs::rename atomic-replace semantics Closes #187 Closes #188 --- benches/index_block.rs | 1 + src/blob_tree/ingest.rs | 1 + src/blob_tree/mod.rs | 4 +- src/compaction/flavour.rs | 2 + src/config/mod.rs | 41 +- src/file.rs | 62 +- src/file_accessor.rs | 86 ++- src/fs/mem_fs.rs | 910 +++++++++++++++++++++++++++++ src/fs/mod.rs | 6 + src/table/inner.rs | 6 +- src/table/mod.rs | 30 +- src/table/multi_writer.rs | 4 +- src/table/tests.rs | 32 +- src/table/util.rs | 31 +- src/tree/ingest.rs | 9 +- src/tree/mod.rs | 8 +- src/vlog/accessor.rs | 19 +- src/vlog/blob_file/multi_writer.rs | 23 +- src/vlog/mod.rs | 92 ++- tests/mem_fs_tree.rs | 164 ++++++ 20 files changed, 1369 insertions(+), 162 deletions(-) create mode 100644 src/fs/mem_fs.rs create mode 100644 tests/mem_fs_tree.rs diff --git a/benches/index_block.rs b/benches/index_block.rs index c2f1b6a35..490644ef9 100644 --- a/benches/index_block.rs +++ b/benches/index_block.rs @@ -116,6 +116,7 @@ fn build_table_for_point_read(restart_interval: u8) -> BenchTable { 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(8))), + Arc::new(lsm_tree::fs::StdFs), false, false, None, diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs index ff1c05d7d..6826779d0 100644 --- a/src/blob_tree/ingest.rs +++ b/src/blob_tree/ingest.rs @@ -227,6 +227,7 @@ impl<'a> BlobIngestion<'a> { index.id, index.config.cache.clone(), index.config.descriptor_table.clone(), + self.table.level_fs.clone(), false, false, index.config.encryption.clone(), diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index ccb0489ce..53946d760 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -139,7 +139,6 @@ pub struct BlobTree { impl BlobTree { pub(crate) fn open(config: Config) -> crate::Result { use crate::file::{BLOBS_FOLDER, fsync_directory}; - use crate::fs::Fs; let index = crate::Tree::open(config)?; @@ -425,7 +424,7 @@ impl AbstractTree for BlobTree { self.index.table_id_counter.clone(), 64 * 1_024 * 1_024, 0, - level_fs, + level_fs.clone(), )? .set_comparator(self.index.config.comparator.clone()) .use_data_block_restart_interval(data_block_restart_interval) @@ -545,6 +544,7 @@ impl AbstractTree for BlobTree { self.index.id, self.index.config.cache.clone(), self.index.config.descriptor_table.clone(), + level_fs.clone(), pin_filter, pin_index, self.index.config.encryption.clone(), diff --git a/src/compaction/flavour.rs b/src/compaction/flavour.rs index 5d6960d6f..0b82237f7 100644 --- a/src/compaction/flavour.rs +++ b/src/compaction/flavour.rs @@ -371,6 +371,7 @@ impl StandardCompaction { fn consume_writer(self, opts: &Options, dst_lvl: usize) -> crate::Result> { let table_base_folder = self.table_writer.base_path.clone(); + let level_fs = self.table_writer.fs.clone(); let pin_filter = opts.config.filter_block_pinning_policy.get(dst_lvl); let pin_index = opts.config.index_block_pinning_policy.get(dst_lvl); @@ -386,6 +387,7 @@ impl StandardCompaction { opts.tree_id, opts.config.cache.clone(), opts.config.descriptor_table.clone(), + level_fs.clone(), pin_filter, pin_index, opts.config.encryption.clone(), diff --git a/src/config/mod.rs b/src/config/mod.rs index c3ecbd2d5..03ac58b85 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -216,22 +216,17 @@ impl KvSeparationOptions { } /// Tree configuration builder -/// -/// The generic parameter `F` selects the filesystem backend. -/// It defaults to [`StdFs`], so existing code that writes `Config` -/// without a type parameter continues to work unchanged. -pub struct Config { +pub struct Config { /// Folder path #[doc(hidden)] pub path: PathBuf, /// Filesystem backend /// - // All Config fields are `#[doc(hidden)] pub` by convention — callers use - // builder methods or `..Default::default()`, not struct literals directly. - // A `with_fs()` builder will be added when call-site refactoring lands. + /// Defaults to [`StdFs`]. Use [`Config::with_fs`] to plug in an + /// alternative backend such as [`MemFs`](crate::fs::MemFs). #[doc(hidden)] - pub fs: Arc, + pub fs: Arc, /// Per-level filesystem routing for tiered storage. /// @@ -447,6 +442,32 @@ impl Config { } } + /// Sets the filesystem backend. + /// + /// Defaults to [`StdFs`]. Use [`MemFs`](crate::fs::MemFs) for + /// in-memory trees (testing, ephemeral indexes). + /// + /// # Example + /// + /// ``` + /// use lsm_tree::{Config, SequenceNumberCounter}; + /// use lsm_tree::fs::MemFs; + /// + /// let tree = Config::new( + /// "/virtual/tree", + /// SequenceNumberCounter::default(), + /// SequenceNumberCounter::default(), + /// ) + /// .with_fs(MemFs::new()) + /// .open() + /// .unwrap(); + /// ``` + #[must_use] + pub fn with_fs(mut self, fs: F) -> Self { + self.fs = Arc::new(fs); + self + } + /// Opens a tree using the config. /// /// # Errors @@ -582,7 +603,7 @@ mod tests { } } -impl Config { +impl Config { /// Returns the tables folder path and [`Fs`] backend for the given level. /// /// If [`level_routes`](Self::level_routes) has an entry covering this diff --git a/src/file.rs b/src/file.rs index 81bb7f0ca..302581aab 100644 --- a/src/file.rs +++ b/src/file.rs @@ -47,40 +47,54 @@ pub fn read_exact(file: &dyn FsFile, offset: u64, size: usize) -> std::io::Resul Ok(builder.freeze().into()) } -/// Atomically rewrites a file. +/// Atomically rewrites a file via the [`Fs`] trait. +/// +/// Writes `content` to a temporary file in the same directory, fsyncs it, +/// then renames over `path`. This ensures readers never see a partial write. pub fn rewrite_atomic(path: &Path, content: &[u8], fs: &dyn Fs) -> std::io::Result<()> { + use crate::fs::FsOpenOptions; + use std::sync::atomic::{AtomicU64, Ordering}; + + static TEMP_SEQ: AtomicU64 = AtomicU64::new(0); + #[expect( clippy::expect_used, reason = "every file should have a parent directory" )] let folder = path.parent().expect("should have a parent"); - // NOTE: tempfile crate uses std::fs internally; migrating temp-file - // creation to Fs would require a custom implementation. - let mut temp_file = tempfile::NamedTempFile::new_in(folder)?; - temp_file.write_all(content)?; - temp_file.flush()?; - temp_file.as_file_mut().sync_all()?; - temp_file.persist(path)?; - - // Suppress unused-variable warning on Windows where the post-persist - // sync block is skipped (directory fsync is unsupported). - let _ = &fs; - - #[cfg(not(target_os = "windows"))] - { - use crate::fs::FsOpenOptions; - - let file = fs.open(path, &FsOpenOptions::new().read(true))?; + // PID + monotonic seq gives uniqueness within a process and across + // concurrent processes. A crash-then-PID-reuse collision is theoretically + // possible but vanishingly unlikely (requires exact PID reuse AND seq + // counter restart to same value). lsm-tree uses exclusive file locking + // so the same data directory is never written by two processes. + let seq = TEMP_SEQ.fetch_add(1, Ordering::Relaxed); + let pid = std::process::id(); + let tmp_path = folder.join(format!(".tmp_{pid}_{seq}")); + + let result = (|| -> std::io::Result<()> { + let mut file = fs.open( + &tmp_path, + &FsOpenOptions::new().write(true).create_new(true), + )?; + file.write_all(content)?; + file.flush()?; FsFile::sync_all(&*file)?; + drop(file); + // std::fs::rename overwrites existing destinations on all platforms + // (Rust uses MoveFileExW with MOVEFILE_REPLACE_EXISTING on Windows). + fs.rename(&tmp_path, path)?; + Ok(()) + })(); - #[expect( - clippy::expect_used, - reason = "files should always have a parent directory" - )] - let folder = path.parent().expect("should have parent folder"); - fs.sync_directory(folder)?; + if result.is_err() { + // Best-effort cleanup of the temp file on any failure path. + // Safe to call even if fs.open() failed (file never created) — + // remove_file will return NotFound which we ignore. + let _ = fs.remove_file(&tmp_path); } + result?; + fsync_directory(folder, fs)?; Ok(()) } diff --git a/src/file_accessor.rs b/src/file_accessor.rs index c617882b4..2d6316c3f 100644 --- a/src/file_accessor.rs +++ b/src/file_accessor.rs @@ -4,7 +4,8 @@ use crate::GlobalTableId; use crate::descriptor_table::DescriptorTable; -use crate::fs::FsFile; +use crate::fs::{Fs, FsFile, FsOpenOptions}; +use std::path::Path; use std::sync::Arc; /// Allows accessing a file (either cached or pinned) @@ -15,46 +16,87 @@ pub enum FileAccessor { /// This is used in case file descriptor cache is `None` (to skip cache lookups) File(Arc), - /// Access to file descriptor cache - DescriptorTable(Arc), + /// Access to file descriptor cache with [`Fs`]-based fallback for + /// cache misses. + DescriptorTable { + /// The FD cache. + table: Arc, + /// Filesystem backend for opening files on cache miss. + fs: Arc, + }, } impl FileAccessor { #[must_use] pub fn as_descriptor_table(&self) -> Option<&DescriptorTable> { match self { - Self::DescriptorTable(d) => Some(d), + Self::DescriptorTable { table, .. } => Some(table), Self::File(_) => None, } } - #[must_use] - pub fn access_for_table(&self, table_id: &GlobalTableId) -> Option> { + /// Returns a cached table FD or opens the file via [`Fs`] on cache miss. + /// + /// The returned `bool` indicates whether the file descriptor was already + /// cached (`true`) or freshly opened (`false`). + pub fn get_or_open_table( + &self, + table_id: &GlobalTableId, + path: &Path, + ) -> std::io::Result<(Arc, bool)> { match self { - Self::File(fd) => Some(fd.clone()), - Self::DescriptorTable(descriptor_table) => descriptor_table.access_for_table(table_id), - } - } - - pub fn insert_for_table(&self, table_id: GlobalTableId, fd: Arc) { - if let Self::DescriptorTable(descriptor_table) = self { - descriptor_table.insert_for_table(table_id, fd); + Self::File(fd) => Ok((fd.clone(), true)), + Self::DescriptorTable { table, fs } => { + if let Some(fd) = table.access_for_table(table_id) { + return Ok((fd, true)); + } + let fd: Arc = + Arc::from(fs.open(path, &FsOpenOptions::new().read(true))?); + table.insert_for_table(*table_id, fd.clone()); + Ok((fd, false)) + } } } - #[must_use] - pub fn access_for_blob_file(&self, table_id: &GlobalTableId) -> Option> { + /// Returns a cached blob file FD or opens it via [`Fs`] on cache miss. + /// + /// The returned `bool` indicates whether the file descriptor was already + /// cached (`true`) or freshly opened (`false`). + pub fn get_or_open_blob_file( + &self, + table_id: &GlobalTableId, + path: &Path, + ) -> std::io::Result<(Arc, bool)> { match self { - Self::File(fd) => Some(fd.clone()), - Self::DescriptorTable(descriptor_table) => { - descriptor_table.access_for_blob_file(table_id) + Self::File(fd) => Ok((fd.clone(), true)), + Self::DescriptorTable { table, fs } => { + if let Some(fd) = table.access_for_blob_file(table_id) { + return Ok((fd, true)); + } + let fd: Arc = + Arc::from(fs.open(path, &FsOpenOptions::new().read(true))?); + table.insert_for_blob_file(*table_id, fd.clone()); + Ok((fd, false)) } } } + /// Pre-populates the blob file FD cache after creating a new blob file. pub fn insert_for_blob_file(&self, table_id: GlobalTableId, fd: Arc) { - if let Self::DescriptorTable(descriptor_table) = self { - descriptor_table.insert_for_blob_file(table_id, fd); + if let Self::DescriptorTable { table, .. } = self { + table.insert_for_blob_file(table_id, fd); + } + } + + pub fn remove_for_table(&self, table_id: &GlobalTableId) { + if let Self::DescriptorTable { table, .. } = self { + table.remove_for_table(table_id); + } + } + + pub fn remove_for_blob_file(&self, table_id: &GlobalTableId) { + if let Self::DescriptorTable { table, .. } = self { + table.remove_for_blob_file(table_id); } } } @@ -63,7 +105,7 @@ impl std::fmt::Debug for FileAccessor { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { Self::File(_) => write!(f, "FileAccessor::Pinned"), - Self::DescriptorTable(_) => { + Self::DescriptorTable { .. } => { write!(f, "FileAccessor::Cached") } } diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs new file mode 100644 index 000000000..b14219461 --- /dev/null +++ b/src/fs/mem_fs.rs @@ -0,0 +1,910 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! In-memory [`Fs`] implementation for testing and ephemeral trees. +//! +//! All file data lives in memory — there are no durability guarantees. +//! `sync_all`, `sync_data`, and `sync_directory` are deliberate no-ops. +//! +//! # Known limitations +//! +//! - **Tree reopen**: `Tree::open` checks for the `CURRENT` version file +//! via `Path::try_exists()` (bypasses `Fs`). New trees work correctly +//! (the check returns `false` → `create_new`), but reopening an +//! in-memory tree after drop is not supported. +//! - **Version GC**: Old version file cleanup in `SuperVersions::gc` uses +//! `std::fs` directly — stale version entries accumulate in memory +//! until the `MemFs` is dropped. Acceptable for testing / ephemeral use. +//! - **Compaction**: Some code paths in the compaction finalization still +//! bypass the `Fs` trait. Write + flush + point-read works; compaction +//! may fail with `ENOENT` on virtual paths. + +use super::{Fs, FsDirEntry, FsFile, FsMetadata, FsOpenOptions}; +use std::collections::{HashMap, HashSet}; +use std::io::{self, Read, Seek, SeekFrom, Write}; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex, RwLock}; + +// --------------------------------------------------------------------------- +// MemFs +// --------------------------------------------------------------------------- + +/// In-memory [`Fs`] backend for testing and ephemeral in-memory trees. +/// +/// Backed by a `HashMap>>>` — no disk I/O is +/// performed. Clones share the same backing store, and individual file +/// contents are synchronized through a per-file [`Mutex`]. +/// +/// # Example +/// +/// ``` +/// use lsm_tree::fs::MemFs; +/// use std::sync::Arc; +/// +/// let fs = MemFs::new(); +/// let dyn_fs: Arc = Arc::new(fs); +/// ``` +#[derive(Clone, Debug)] +pub struct MemFs { + state: Arc>, +} + +#[derive(Debug, Default)] +struct State { + files: HashMap>>>, + dirs: HashSet, +} + +impl MemFs { + /// Creates a new, empty in-memory filesystem. + #[must_use] + pub fn new() -> Self { + Self { + state: Arc::new(RwLock::new(State::default())), + } + } +} + +impl Default for MemFs { + fn default() -> Self { + Self::new() + } +} + +// --------------------------------------------------------------------------- +// MemFile +// --------------------------------------------------------------------------- + +/// An open file handle backed by an in-memory buffer. +struct MemFile { + data: Arc>>, + cursor: u64, + readable: bool, + writable: bool, + is_append: bool, +} + +/// Copies bytes from `data[pos..]` into `buf`, returning byte count. +fn copy_from_data(buf: &mut [u8], data: &[u8], pos: usize) -> usize { + let available = data.get(pos..).unwrap_or_default(); + let n = buf.len().min(available.len()); + if let (Some(dst), Some(src)) = (buf.get_mut(..n), available.get(..n)) { + dst.copy_from_slice(src); + } + n +} + +#[expect( + clippy::cast_possible_truncation, + reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" +)] +impl Read for MemFile { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if !self.readable { + return Err(io::Error::other("file not opened for reading")); + } + let data = lock(&self.data)?; + let pos = self.cursor as usize; + let n = copy_from_data(buf, &data, pos); + drop(data); + self.cursor += n as u64; + Ok(n) + } +} + +#[expect( + clippy::cast_possible_truncation, + reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" +)] +impl Write for MemFile { + fn write(&mut self, buf: &[u8]) -> io::Result { + if !self.writable { + return Err(io::Error::other("file not opened for writing")); + } + let mut data = lock(&self.data)?; + + let pos = if self.is_append { + data.len() + } else { + self.cursor as usize + }; + + let end = pos + buf.len(); + if end > data.len() { + data.resize(end, 0); + } + if let Some(dst) = data.get_mut(pos..end) { + dst.copy_from_slice(buf); + } + drop(data); + self.cursor = end as u64; + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +#[expect( + clippy::cast_possible_wrap, + clippy::cast_sign_loss, + reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" +)] +impl Seek for MemFile { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + let len = { + let data = lock(&self.data)?; + data.len() as i64 + }; + + let new_pos = match pos { + SeekFrom::Start(n) => i64::try_from(n).unwrap_or(i64::MAX), + SeekFrom::End(n) => len.saturating_add(n), + SeekFrom::Current(n) => (self.cursor as i64).saturating_add(n), + }; + + if new_pos < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "seek to negative position", + )); + } + + self.cursor = new_pos as u64; + Ok(self.cursor) + } +} + +impl FsFile for MemFile { + fn sync_all(&self) -> io::Result<()> { + Ok(()) + } + + fn sync_data(&self) -> io::Result<()> { + Ok(()) + } + + fn metadata(&self) -> io::Result { + let data = lock(&self.data)?; + Ok(FsMetadata { + len: data.len() as u64, + is_dir: false, + is_file: true, + }) + } + + #[expect( + clippy::cast_possible_truncation, + reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" + )] + fn set_len(&self, size: u64) -> io::Result<()> { + lock(&self.data)?.resize(size as usize, 0); + Ok(()) + } + + #[expect( + clippy::cast_possible_truncation, + reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" + )] + fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result { + let data = lock(&self.data)?; + Ok(copy_from_data(buf, &data, offset as usize)) + } + + /// No-op: in-memory files are not shared across processes. `MemFs` is a + /// test/ephemeral backend — cross-process exclusivity is not meaningful. + fn lock_exclusive(&self) -> io::Result<()> { + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// Fs for MemFs +// --------------------------------------------------------------------------- + +#[expect( + clippy::significant_drop_tightening, + reason = "RwLock guards are intentionally held for the duration of each method" +)] +impl Fs for MemFs { + fn open(&self, path: &Path, opts: &FsOpenOptions) -> io::Result> { + let mut state = write_state(&self.state)?; + let path = path.to_path_buf(); + + // Verify parent directory exists (mirrors std::fs behaviour). + if let Some(parent) = path.parent() + && !parent.as_os_str().is_empty() + && parent != Path::new("/") + && !state.dirs.contains(parent) + { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("parent directory does not exist: {}", parent.display()), + )); + } + + let exists = state.files.contains_key(&path); + let is_dir = state.dirs.contains(&path); + let wants_write = opts.write || opts.append; + + // Mirror std::fs::OpenOptions: truncate/create require write access. + if opts.truncate && !wants_write { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "truncate requires write or append access", + )); + } + if (opts.create || opts.create_new) && !wants_write { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "create/create_new requires write or append access", + )); + } + + // Reject creating a file at a path that is already a directory. + if is_dir && (opts.create || opts.create_new) { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("path is a directory: {}", path.display()), + )); + } + + if opts.create_new { + if exists { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("file already exists: {}", path.display()), + )); + } + let data = Arc::new(Mutex::new(Vec::new())); + state.files.insert(path, Arc::clone(&data)); + return Ok(Box::new(MemFile { + data, + cursor: 0, + readable: opts.read, + writable: opts.write || opts.append, + is_append: opts.append, + })); + } + + if exists { + let data = state + .files + .get(&path) + .map(Arc::clone) + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "concurrent removal"))?; + + if opts.truncate { + lock(&data)?.clear(); + } + + let cursor = if opts.append { + lock(&data)?.len() as u64 + } else { + 0 + }; + + Ok(Box::new(MemFile { + data, + cursor, + readable: opts.read, + writable: opts.write || opts.append, + is_append: opts.append, + })) + } else if opts.create { + let data = Arc::new(Mutex::new(Vec::new())); + state.files.insert(path, Arc::clone(&data)); + Ok(Box::new(MemFile { + data, + cursor: 0, + readable: opts.read, + writable: opts.write || opts.append, + is_append: opts.append, + })) + } else { + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("file not found: {}", path.display()), + )) + } + } + + fn create_dir_all(&self, path: &Path) -> io::Result<()> { + let mut state = write_state(&self.state)?; + + // Collect all components first, then validate, then insert. + // This avoids partial insertion if an ancestor is a regular file. + let mut to_create = Vec::new(); + let mut current = path.to_path_buf(); + loop { + if state.files.contains_key(¤t) { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("ancestor is a file: {}", current.display()), + )); + } + to_create.push(current.clone()); + if !current.pop() || current.as_os_str().is_empty() { + break; + } + } + + for dir in to_create { + state.dirs.insert(dir); + } + Ok(()) + } + + fn read_dir(&self, path: &Path) -> io::Result> { + let state = read_state(&self.state)?; + + if !state.dirs.contains(path) { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("directory not found: {}", path.display()), + )); + } + + let mut entries = Vec::new(); + + for file_path in state.files.keys() { + if file_path.parent() == Some(path) + && let Some(name) = file_path.file_name() + { + // Match StdFs contract: reject non-UTF-8 names with InvalidData. + let file_name = name.to_str().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "non-UTF-8 filename in directory {}: {}", + path.display(), + name.display() + ), + ) + })?; + entries.push(FsDirEntry { + path: file_path.clone(), + file_name: file_name.to_owned(), + is_dir: false, + }); + } + } + + for dir_path in &state.dirs { + if dir_path.parent() == Some(path) + && dir_path != path + && let Some(name) = dir_path.file_name() + { + let file_name = name.to_str().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "non-UTF-8 filename in directory {}: {}", + path.display(), + name.display() + ), + ) + })?; + entries.push(FsDirEntry { + path: dir_path.clone(), + file_name: file_name.to_owned(), + is_dir: true, + }); + } + } + + Ok(entries) + } + + fn remove_file(&self, path: &Path) -> io::Result<()> { + let mut state = write_state(&self.state)?; + if state.files.remove(path).is_none() { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("file not found: {}", path.display()), + )); + } + Ok(()) + } + + fn remove_dir_all(&self, path: &Path) -> io::Result<()> { + let mut state = write_state(&self.state)?; + + // Reject files — std::fs::remove_dir_all errors on non-directories. + if state.files.contains_key(path) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("path is not a directory: {}", path.display()), + )); + } + + if !state.dirs.contains(path) { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("path not found: {}", path.display()), + )); + } + + state.files.retain(|p, _| !p.starts_with(path)); + state.dirs.retain(|p| !p.starts_with(path)); + Ok(()) + } + + fn rename(&self, from: &Path, to: &Path) -> io::Result<()> { + let mut state = write_state(&self.state)?; + + // Validate destination parent exists (mirrors std::fs behaviour). + if let Some(parent) = to.parent() + && !parent.as_os_str().is_empty() + && parent != Path::new("/") + && !state.dirs.contains(parent) + { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("destination parent not found: {}", parent.display()), + )); + } + + // Reject renaming onto an existing directory. Otherwise `to` would end + // up present in both `files` and `dirs`, corrupting MemFs state. + if state.dirs.contains(to) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("destination is a directory: {}", to.display()), + )); + } + + // Directory renames are not implemented in MemFs because they require + // updating descendant paths in both `dirs` and `files`. + if state.dirs.contains(from) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("path is a directory: {}", from.display()), + )); + } + + if let Some(data) = state.files.remove(from) { + state.files.insert(to.to_path_buf(), data); + Ok(()) + } else { + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("file not found: {}", from.display()), + )) + } + } + + fn metadata(&self, path: &Path) -> io::Result { + let state = read_state(&self.state)?; + + if let Some(data) = state.files.get(path) { + let d = lock(data)?; + Ok(FsMetadata { + len: d.len() as u64, + is_dir: false, + is_file: true, + }) + } else if state.dirs.contains(path) { + Ok(FsMetadata { + len: 0, + is_dir: true, + is_file: false, + }) + } else { + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("path not found: {}", path.display()), + )) + } + } + + fn sync_directory(&self, _path: &Path) -> io::Result<()> { + Ok(()) + } + + fn exists(&self, path: &Path) -> io::Result { + let state = read_state(&self.state)?; + Ok(state.files.contains_key(path) || state.dirs.contains(path)) + } +} + +// --------------------------------------------------------------------------- +// Lock helpers — convert PoisonError to io::Error +// --------------------------------------------------------------------------- + +fn lock(m: &Mutex) -> io::Result> { + m.lock().map_err(|_| io::Error::other("mutex poisoned")) +} + +fn read_state(rw: &RwLock) -> io::Result> { + rw.read().map_err(|_| io::Error::other("rwlock poisoned")) +} + +fn write_state(rw: &RwLock) -> io::Result> { + rw.write().map_err(|_| io::Error::other("rwlock poisoned")) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +#[expect( + clippy::unwrap_used, + clippy::indexing_slicing, + clippy::unnecessary_wraps, + reason = "test code" +)] +mod tests { + use super::*; + use std::io::{Read, Write}; + use std::sync::Arc; + use test_log::test; + + #[test] + fn create_read_write() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/data"))?; + + let path = Path::new("/data/test.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello world")?; + drop(file); + + let opts = FsOpenOptions::new().read(true); + let mut file = fs.open(path, &opts)?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + assert_eq!(buf, "hello world"); + + Ok(()) + } + + #[test] + fn directory_operations() -> io::Result<()> { + let fs = MemFs::new(); + let nested = PathBuf::from("/a/b/c"); + fs.create_dir_all(&nested)?; + assert!(fs.exists(&nested)?); + assert!(fs.exists(Path::new("/a/b"))?); + + let file_path = nested.join("data.bin"); + let opts = FsOpenOptions::new().write(true).create_new(true); + let mut file = fs.open(&file_path, &opts)?; + file.write_all(b"data")?; + drop(file); + + let entries = fs.read_dir(&nested)?; + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].file_name, "data.bin"); + assert!(!entries[0].is_dir); + + let meta = fs.metadata(&file_path)?; + assert!(meta.is_file); + assert!(!meta.is_dir); + assert_eq!(meta.len, 4); + + fs.remove_file(&file_path)?; + assert!(!fs.exists(&file_path)?); + + fs.remove_dir_all(Path::new("/a"))?; + assert!(!fs.exists(Path::new("/a"))?); + assert!(!fs.exists(&nested)?); + + Ok(()) + } + + #[test] + fn rename_file() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let src = Path::new("/dir/src.txt"); + let dst = Path::new("/dir/dst.txt"); + + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(src, &opts)?; + file.write_all(b"content")?; + drop(file); + + fs.rename(src, dst)?; + assert!(!fs.exists(src)?); + assert!(fs.exists(dst)?); + + Ok(()) + } + + #[test] + fn sync_directory_is_noop() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + fs.sync_directory(Path::new("/dir"))?; + Ok(()) + } + + #[test] + fn file_metadata_and_set_len() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/meta.bin"); + let opts = FsOpenOptions::new().write(true).create(true).read(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"12345")?; + + let meta = file.metadata()?; + assert!(meta.is_file); + assert_eq!(meta.len, 5); + + file.set_len(3)?; + let meta = file.metadata()?; + assert_eq!(meta.len, 3); + + Ok(()) + } + + #[test] + fn read_at_positional() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/pread.bin"); + let opts = FsOpenOptions::new().write(true).create(true).read(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello world")?; + + let mut buf = [0u8; 5]; + let n = file.read_at(&mut buf, 6)?; + assert_eq!(n, 5); + assert_eq!(&buf, b"world"); + + let n = file.read_at(&mut buf, 0)?; + assert_eq!(n, 5); + assert_eq!(&buf, b"hello"); + + // Past EOF + let n = file.read_at(&mut buf, 100)?; + assert_eq!(n, 0); + + Ok(()) + } + + #[test] + fn lock_exclusive_is_noop() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/lock"); + let opts = FsOpenOptions::new().write(true).create(true); + let file = fs.open(path, &opts)?; + file.lock_exclusive()?; + Ok(()) + } + + #[test] + fn open_create_new_fails_on_existing() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/file"); + let opts = FsOpenOptions::new().write(true).create_new(true); + fs.open(path, &opts)?; + + let err = fs.open(path, &opts).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::AlreadyExists); + Ok(()) + } + + #[test] + fn open_nonexistent_without_create_fails() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/missing"); + let opts = FsOpenOptions::new().read(true); + let err = fs.open(path, &opts).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn open_fails_when_parent_missing() -> io::Result<()> { + let fs = MemFs::new(); + let path = Path::new("/no/such/dir/file"); + let opts = FsOpenOptions::new().write(true).create(true); + let err = fs.open(path, &opts).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn truncate_on_open() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/trunc.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello world")?; + drop(file); + + let opts = FsOpenOptions::new().write(true).truncate(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hi")?; + drop(file); + + let meta = fs.metadata(path)?; + assert_eq!(meta.len, 2); + Ok(()) + } + + #[test] + fn append_mode() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/append.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello")?; + drop(file); + + let opts = FsOpenOptions::new().write(true).append(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b" world")?; + drop(file); + + let opts = FsOpenOptions::new().read(true); + let mut file = fs.open(path, &opts)?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + assert_eq!(buf, "hello world"); + Ok(()) + } + + #[test] + fn seek_and_overwrite() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/seek.bin"); + let opts = FsOpenOptions::new().write(true).create(true).read(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello world")?; + + file.seek(SeekFrom::Start(6))?; + file.write_all(b"rust!")?; + + file.seek(SeekFrom::Start(0))?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + assert_eq!(buf, "hello rust!"); + + Ok(()) + } + + #[test] + fn object_safety() -> io::Result<()> { + let fs: Arc = Arc::new(MemFs::new()); + let bogus = Path::new("/nonexistent"); + assert!(!fs.exists(bogus)?); + Ok(()) + } + + #[test] + fn metadata_directory() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/mydir"))?; + let meta = fs.metadata(Path::new("/mydir"))?; + assert!(meta.is_dir); + assert!(!meta.is_file); + Ok(()) + } + + #[test] + fn read_dir_with_subdirectory() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/root/subdir"))?; + + let file_path = Path::new("/root/file.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(file_path, &opts)?; + + let mut entries = fs.read_dir(Path::new("/root"))?; + entries.sort_by(|a, b| a.file_name.cmp(&b.file_name)); + assert_eq!(entries.len(), 2); + assert_eq!(entries[0].file_name, "file.txt"); + assert!(!entries[0].is_dir); + assert_eq!(entries[1].file_name, "subdir"); + assert!(entries[1].is_dir); + Ok(()) + } + + #[test] + fn remove_file_nonexistent_fails() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs.remove_file(Path::new("/missing")).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn rename_nonexistent_fails() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs + .rename(Path::new("/missing"), Path::new("/dst")) + .err() + .unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn read_dir_nonexistent_fails() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs.read_dir(Path::new("/missing")).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn metadata_nonexistent_fails() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs.metadata(Path::new("/missing")).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn sync_data_is_noop() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let path = Path::new("/dir/file"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"data")?; + file.sync_data()?; + Ok(()) + } + + #[test] + fn clones_share_state() -> io::Result<()> { + let fs1 = MemFs::new(); + let fs2 = fs1.clone(); + + fs1.create_dir_all(Path::new("/shared"))?; + let path = Path::new("/shared/file.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs1.open(path, &opts)?; + file.write_all(b"shared data")?; + drop(file); + + assert!(fs2.exists(path)?); + let meta = fs2.metadata(path)?; + assert_eq!(meta.len, 11); + Ok(()) + } +} diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 12a4cba5b..2d5ce26d5 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -23,11 +23,13 @@ //! - **macOS / BSD**: no batched I/O API exists (`dispatch_io` and `kqueue` //! do not help for storage I/O patterns); [`StdFs`] is the correct choice +mod mem_fs; mod std_fs; #[cfg(all(target_os = "linux", feature = "io-uring"))] mod io_uring_fs; +pub use mem_fs::MemFs; pub use std_fs::StdFs; #[cfg(all(target_os = "linux", feature = "io-uring"))] @@ -274,6 +276,10 @@ pub trait Fs: Send + Sync + 'static { /// Renames a file or directory from `from` to `to`. /// + /// If `to` already exists as a regular file, it is atomically replaced. + /// This is required by [`rewrite_atomic`](crate::file::rewrite_atomic) + /// for crash-safe version pointer updates. + /// /// # Errors /// /// Returns an I/O error if the rename fails. diff --git a/src/table/inner.rs b/src/table/inner.rs index 96074a916..4d9a63e01 100644 --- a/src/table/inner.rs +++ b/src/table/inner.rs @@ -12,6 +12,7 @@ use crate::{ comparator::SharedComparator, encryption::EncryptionProvider, file_accessor::FileAccessor, + fs::Fs, range_tombstone::RangeTombstone, table::{IndexBlock, filter::block::FilterBlock}, tree::inner::TreeId, @@ -29,6 +30,9 @@ pub struct Inner { #[doc(hidden)] pub(crate) file_accessor: FileAccessor, + /// Filesystem backend for file operations (open, remove, etc.). + pub(crate) fs: Arc, + /// Parsed metadata #[doc(hidden)] pub metadata: ParsedMeta, @@ -97,7 +101,7 @@ impl Drop for Inner { if self.is_deleted.load(std::sync::atomic::Ordering::Acquire) { log::trace!("Cleanup deleted table {global_id:?} at {:?}", self.path); - if let Err(e) = std::fs::remove_file(&*self.path) { + if let Err(e) = self.fs.remove_file(&self.path) { log::warn!( "Failed to cleanup deleted table {global_id:?} at {:?}: {e:?}", self.path, diff --git a/src/table/mod.rs b/src/table/mod.rs index b19379b26..843248278 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -40,7 +40,7 @@ use crate::{ comparator::SharedComparator, descriptor_table::DescriptorTable, file_accessor::FileAccessor, - fs::FsFile, + fs::{Fs, FsFile, FsOpenOptions}, range_tombstone::RangeTombstone, table::{ block::{BlockType, ParsedItem}, @@ -119,23 +119,14 @@ impl Table { Ok(if let Some(handle) = &self.regions.linked_blob_files { let table_id = self.global_id(); - let (fd, fd_cache_miss) = - if let Some(fd) = self.file_accessor.access_for_table(&table_id) { - (fd, false) - } else { - let fd: Arc = Arc::new(std::fs::File::open(&*self.path)?); - (fd, true) - }; + let (fd, _cache_hit) = self + .file_accessor + .get_or_open_table(&table_id, &self.path)?; // Read the exact region using pread-style helper let buf = crate::file::read_exact(fd.as_ref(), *handle.offset(), handle.size() as usize)?; - // If we opened the file here, cache the FD for future accesses - if fd_cache_miss { - self.file_accessor.insert_for_table(table_id, fd); - } - // Parse the buffer let mut reader = &buf[..]; let len = reader.read_u32::()?; @@ -489,6 +480,7 @@ impl Table { tree_id: TreeId, cache: Arc, descriptor_table: Option>, + fs: Arc, pin_filter: bool, pin_index: bool, encryption: Option>, @@ -501,7 +493,7 @@ impl Table { use std::sync::atomic::AtomicBool; log::debug!("Recovering table from file {}", file_path.display()); - let mut file = std::fs::File::open(&file_path)?; + let mut file = fs.open(&file_path, &FsOpenOptions::new().read(true))?; let file_path = Arc::new(file_path); #[cfg(feature = "metrics")] @@ -514,7 +506,7 @@ impl Table { log::trace!("Reading meta block, with meta_ptr={:?}", regions.metadata); let metadata = - ParsedMeta::load_with_handle(&file, ®ions.metadata, encryption.as_deref())?; + ParsedMeta::load_with_handle(&*file, ®ions.metadata, encryption.as_deref())?; // Fail-fast: if this table was written with dictionary compression, // verify the caller provided the matching dictionary. Without this @@ -531,10 +523,13 @@ impl Table { } } - let file_handle: Arc = Arc::new(file); + let file_handle: Arc = Arc::from(file); let file_accessor = if let Some(dt) = descriptor_table { - FileAccessor::DescriptorTable(dt) + FileAccessor::DescriptorTable { + table: dt, + fs: fs.clone(), + } } else { FileAccessor::File(file_handle.clone()) }; @@ -705,6 +700,7 @@ impl Table { cache, file_accessor, + fs, block_index: Arc::new(block_index), diff --git a/src/table/multi_writer.rs b/src/table/multi_writer.rs index fd029ea91..6b8e61028 100644 --- a/src/table/multi_writer.rs +++ b/src/table/multi_writer.rs @@ -15,7 +15,7 @@ use std::{path::PathBuf, sync::Arc}; /// /// This results in a sorted "run" of tables pub struct MultiWriter { - fs: Arc, + pub(crate) fs: Arc, pub(crate) base_path: PathBuf, @@ -647,6 +647,7 @@ mod tests { 0, cache.clone(), None, + Arc::new(StdFs), false, false, None, @@ -737,6 +738,7 @@ mod tests { 0, cache.clone(), None, + Arc::new(StdFs), false, false, None, diff --git a/src/table/tests.rs b/src/table/tests.rs index f77321631..fe4467343 100644 --- a/src/table/tests.rs +++ b/src/table/tests.rs @@ -82,6 +82,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, false, None, @@ -99,7 +100,7 @@ fn test_with_table_impl( assert_eq!(0, table.pinned_filter_size(), "should not pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -116,6 +117,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), true, false, None, @@ -133,7 +135,7 @@ fn test_with_table_impl( // assert!(table.pinned_filter_size() > 0, "should pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -150,6 +152,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, true, None, @@ -167,7 +170,7 @@ fn test_with_table_impl( assert_eq!(0, table.pinned_filter_size(), "should not pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -184,6 +187,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -201,7 +205,7 @@ fn test_with_table_impl( // assert!(table.pinned_filter_size() > 0, "should pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -218,6 +222,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), None, + Arc::new(StdFs), true, true, None, @@ -275,6 +280,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, false, None, @@ -291,7 +297,7 @@ fn test_with_table_impl( assert_eq!(0, table.pinned_filter_size(), "should not pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -308,6 +314,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), true, false, None, @@ -324,7 +331,7 @@ fn test_with_table_impl( // assert!(table.pinned_filter_size() > 0, "should pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -341,6 +348,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, true, None, @@ -358,7 +366,7 @@ fn test_with_table_impl( // assert_eq!(0, table.pinned_filter_size(), "should not pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -375,6 +383,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -392,7 +401,7 @@ fn test_with_table_impl( // assert!(table.pinned_filter_size() > 0, "should pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -409,6 +418,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), None, + Arc::new(StdFs), true, true, None, @@ -1420,6 +1430,7 @@ fn table_read_fuzz_1() -> crate::Result<()> { 0, Arc::new(crate::Cache::with_capacity_bytes(0)), Some(Arc::new(crate::DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -1498,6 +1509,7 @@ fn table_partitioned_index() -> crate::Result<()> { 0, Arc::new(crate::Cache::with_capacity_bytes(0)), Some(Arc::new(crate::DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -1612,6 +1624,7 @@ fn table_global_seqno() -> crate::Result<()> { 0, Arc::new(crate::Cache::with_capacity_bytes(0)), Some(Arc::new(crate::DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -1823,6 +1836,7 @@ fn load_block_range_tombstone_metrics() -> crate::Result<()> { // with how filter and index recovery reads are handled. Arc::new(Cache::with_capacity_bytes(10_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, false, None, @@ -1930,6 +1944,7 @@ fn load_block_cache_hit_rejects_wrong_block_type() -> crate::Result<()> { 0, Arc::new(Cache::with_capacity_bytes(10_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, false, None, @@ -2253,6 +2268,7 @@ fn two_level_index_scan_skips_empty_child_partition() -> crate::Result<()> { 0, Arc::new(crate::Cache::with_capacity_bytes(0)), Some(Arc::new(crate::DescriptorTable::new(10))), + Arc::new(StdFs), true, false, None, diff --git a/src/table/util.rs b/src/table/util.rs index cd9f480d3..2cf0a02ac 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -5,9 +5,9 @@ use super::{Block, BlockHandle, GlobalTableId}; use crate::{ Cache, CompressionType, KeyRange, Table, encryption::EncryptionProvider, - file_accessor::FileAccessor, fs::FsFile, table::block::BlockType, version::run::Ranged, + file_accessor::FileAccessor, table::block::BlockType, version::run::Ranged, }; -use std::{path::Path, sync::Arc}; +use std::path::Path; #[cfg(feature = "metrics")] use crate::metrics::Metrics; @@ -83,23 +83,17 @@ pub fn load_block( return Ok(block); } - let (fd, fd_cache_miss) = if let Some(cached_fd) = file_accessor.access_for_table(&table_id) { - #[cfg(feature = "metrics")] - metrics.table_file_opened_cached.fetch_add(1, Relaxed); + #[cfg(feature = "metrics")] + let (fd, cache_hit) = file_accessor.get_or_open_table(&table_id, path)?; + #[cfg(not(feature = "metrics"))] + let (fd, _) = file_accessor.get_or_open_table(&table_id, path)?; - (cached_fd, false) + #[cfg(feature = "metrics")] + if cache_hit { + metrics.table_file_opened_cached.fetch_add(1, Relaxed); } else { - let file = std::fs::File::open(path)?; - - #[cfg(feature = "metrics")] metrics.table_file_opened_uncached.fetch_add(1, Relaxed); - - // The if-branch returns Arc from the descriptor - // table, so the else-branch needs an explicit type annotation - // to trigger unsizing coercion. - let fd: Arc = Arc::new(file); - (fd, true) - }; + } let block = Block::from_file( fd.as_ref(), @@ -149,11 +143,6 @@ pub fn load_block( } } - // Cache FD - if fd_cache_miss { - file_accessor.insert_for_table(table_id, fd); - } - cache.insert_block(table_id, handle.offset(), block.clone()); Ok(block) diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index 467aaf81b..3405d0d4e 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -4,11 +4,12 @@ use super::Tree; use crate::{ - BlobIndirection, SeqNo, UserKey, UserValue, config::FilterPolicyEntry, + BlobIndirection, SeqNo, UserKey, UserValue, config::FilterPolicyEntry, fs::Fs, table::multi_writer::MultiWriter, }; use std::cmp::Ordering; use std::path::PathBuf; +use std::sync::Arc; pub const INITIAL_CANONICAL_LEVEL: usize = 1; @@ -20,6 +21,8 @@ pub const INITIAL_CANONICAL_LEVEL: usize = 1; /// using the same table writer configuration that is used for flush and compaction. pub struct Ingestion<'a> { pub(crate) folder: PathBuf, + /// Level-routed filesystem backend for the target level. + pub(crate) level_fs: Arc, tree: &'a Tree, pub(crate) writer: MultiWriter, seqno: SeqNo, @@ -54,7 +57,7 @@ impl<'a> Ingestion<'a> { tree.table_id_counter.clone(), 64 * 1_024 * 1_024, 6, - level_fs, + level_fs.clone(), )? .set_comparator(tree.config.comparator.clone()) .use_bloom_policy({ @@ -116,6 +119,7 @@ impl<'a> Ingestion<'a> { Ok(Self { folder, + level_fs, tree, writer, seqno: 0, @@ -311,6 +315,7 @@ impl<'a> Ingestion<'a> { self.tree.id, self.tree.config.cache.clone(), self.tree.config.descriptor_table.clone(), + self.level_fs.clone(), false, false, self.tree.config.encryption.clone(), diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 997bea00d..f532399cd 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -392,7 +392,7 @@ impl AbstractTree for Tree { self.table_id_counter.clone(), 64 * 1_024 * 1_024, 0, - level_fs, + level_fs.clone(), )? .set_comparator(self.config.comparator.clone()) .use_data_block_restart_interval(data_block_restart_interval) @@ -453,6 +453,7 @@ impl AbstractTree for Tree { self.id, self.config.cache.clone(), self.config.descriptor_table.clone(), + level_fs.clone(), pin_filter, pin_index, self.config.encryption.clone(), @@ -1444,7 +1445,6 @@ impl Tree { /// Creates a new LSM-tree in a directory. fn create_new(config: Config) -> crate::Result { use crate::file::fsync_directory; - use crate::fs::Fs; let path = config.path.clone(); log::trace!("Creating LSM-tree at {}", path.display()); @@ -1488,7 +1488,7 @@ impl Tree { config: &Config, #[cfg(feature = "metrics")] metrics: &Arc, ) -> crate::Result { - use crate::{TableId, file::fsync_directory, fs::Fs}; + use crate::{TableId, file::fsync_directory}; let tree_path = tree_path.as_ref(); @@ -1598,6 +1598,7 @@ impl Tree { tree_id, config.cache.clone(), config.descriptor_table.clone(), + folder_fs.clone(), pin_filter, pin_index, config.encryption.clone(), @@ -1680,6 +1681,7 @@ impl Tree { &recovery.blob_file_ids, tree_id, config.descriptor_table.as_ref(), + &config.fs, )?; let version = Version::from_recovery(recovery, &tables, &blob_files)?; diff --git a/src/vlog/accessor.rs b/src/vlog/accessor.rs index 7757ebdd7..50a22c34b 100644 --- a/src/vlog/accessor.rs +++ b/src/vlog/accessor.rs @@ -4,11 +4,10 @@ use crate::{ Cache, GlobalTableId, TreeId, UserValue, - fs::FsFile, version::BlobFileList, vlog::{ValueHandle, blob_file::reader::Reader}, }; -use std::{path::Path, sync::Arc}; +use std::path::Path; pub struct Accessor<'a>(&'a BlobFileList); @@ -35,23 +34,13 @@ impl<'a> Accessor<'a> { let bf_id = GlobalTableId::from((tree_id, blob_file.id())); - let (file, fd_cache_miss) = - if let Some(cached_fd) = blob_file.file_accessor().access_for_blob_file(&bf_id) { - (cached_fd, false) - } else { - let file: Arc = Arc::new(std::fs::File::open( - base_path.join(vhandle.blob_file_id.to_string()), - )?); - (file, true) - }; + let (file, _) = blob_file + .file_accessor() + .get_or_open_blob_file(&bf_id, &base_path.join(vhandle.blob_file_id.to_string()))?; let value = Reader::new(blob_file, file.as_ref()).get(key, vhandle)?; cache.insert_blob(tree_id, vhandle, value.clone()); - if fd_cache_miss { - blob_file.file_accessor().insert_for_blob_file(bf_id, file); - } - Ok(Some(value)) } } diff --git a/src/vlog/blob_file/multi_writer.rs b/src/vlog/blob_file/multi_writer.rs index 4af702f76..14fae2475 100644 --- a/src/vlog/blob_file/multi_writer.rs +++ b/src/vlog/blob_file/multi_writer.rs @@ -116,7 +116,7 @@ impl MultiWriter { old_writer, self.passthrough_compression, self.descriptor_table.clone(), - &*self.fs, + &self.fs, )?; self.results.extend(blob_file); @@ -127,7 +127,7 @@ impl MultiWriter { writer: Writer, passthrough_compression: CompressionType, descriptor_table: Option>, - fs: &dyn Fs, + fs: &Arc, ) -> crate::Result> { if writer.item_count > 0 { let blob_file_id = writer.blob_file_id; @@ -143,13 +143,16 @@ impl MultiWriter { let (metadata, checksum) = writer.finish()?; - // NOTE: Read-back uses std::fs::File because FileAccessor/DescriptorTable - // expect Arc. Migrating the read path to Fs is a separate scope. - let file: Arc = Arc::new(std::fs::File::open(&path)?); - let file_accessor = descriptor_table.map_or_else( - || FileAccessor::File(file.clone()), - FileAccessor::DescriptorTable, - ); + let file: Arc = + Arc::from(fs.open(&path, &crate::fs::FsOpenOptions::new().read(true))?); + let file_accessor = if let Some(dt) = descriptor_table { + FileAccessor::DescriptorTable { + table: dt, + fs: fs.clone(), + } + } else { + FileAccessor::File(file.clone()) + }; file_accessor.insert_for_blob_file((tree_id, blob_file_id).into(), file); let blob_file = BlobFile(Arc::new(BlobFileInner { @@ -258,7 +261,7 @@ impl MultiWriter { self.active_writer, self.passthrough_compression, self.descriptor_table.clone(), - &*self.fs, + &self.fs, )?; self.results.extend(blob_file); Ok(self.results) diff --git a/src/vlog/mod.rs b/src/vlog/mod.rs index 383ce0e51..da638250d 100644 --- a/src/vlog/mod.rs +++ b/src/vlog/mod.rs @@ -16,6 +16,7 @@ pub use { use crate::{ Checksum, DescriptorTable, TreeId, file_accessor::FileAccessor, + fs::Fs, vlog::blob_file::{Inner as BlobFileInner, Metadata}, }; use std::{ @@ -28,10 +29,19 @@ pub fn recover_blob_files( ids: &[(BlobFileId, Checksum)], tree_id: TreeId, descriptor_table: Option<&Arc>, + fs: &Arc, ) -> crate::Result<(Vec, Vec)> { - if !folder.try_exists()? { - return Ok((vec![], vec![])); - } + // Recover directly from read_dir; treat NotFound as empty (avoids TOCTOU + // with a separate exists() check). This is correct even when `ids` is + // non-empty: the blobs folder may not exist for standard (non-blob) trees, + // and callers handle missing blob files via orphan detection. + let entries = match fs.read_dir(folder) { + Ok(entries) => entries, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + return Ok((vec![], vec![])); + } + Err(e) => return Err(e.into()), + }; let cnt = ids.len(); @@ -46,9 +56,8 @@ pub fn recover_blob_files( let mut blob_files = Vec::with_capacity(ids.len()); let mut orphaned_blob_files = vec![]; - for (idx, dirent) in std::fs::read_dir(folder)?.enumerate() { - let dirent = dirent?; - let file_name = dirent.file_name(); + for (idx, dirent) in entries.into_iter().enumerate() { + let file_name = &dirent.file_name; // https://en.wikipedia.org/wiki/.DS_Store if file_name == ".DS_Store" { @@ -56,22 +65,22 @@ pub fn recover_blob_files( } // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats - if file_name.to_string_lossy().starts_with("._") { + if file_name.starts_with("._") { continue; } - let blob_file_name = file_name.to_str().ok_or_else(|| { - log::error!("invalid table file name {}", file_name.display()); - crate::Error::Unrecoverable - })?; + // Skip directories before parsing — non-numeric directory names would + // fail the parse and abort recovery. + if dirent.is_dir { + continue; + } - let blob_file_id = blob_file_name.parse::().map_err(|e| { - log::error!("invalid table file name {blob_file_name:?}: {e:?}"); + let blob_file_id = file_name.parse::().map_err(|e| { + log::error!("invalid blob file name {file_name:?}: {e:?}"); crate::Error::Unrecoverable })?; - let blob_file_path = dirent.path(); - assert!(!blob_file_path.is_dir()); + let blob_file_path = &dirent.path; if let Some(&(_, checksum)) = ids.iter().find(|(id, _)| id == &blob_file_id) { log::trace!( @@ -79,10 +88,10 @@ pub fn recover_blob_files( blob_file_path.display(), ); - let file = std::fs::File::open(&blob_file_path)?; + let mut file = fs.open(blob_file_path, &crate::fs::FsOpenOptions::new().read(true))?; let meta = { - let reader = sfa::Reader::new(&blob_file_path)?; + let reader = sfa::Reader::from_reader(&mut file)?; let toc = reader.toc(); let metadata_section = toc.section(b"meta") @@ -94,20 +103,27 @@ pub fn recover_blob_files( let metadata_len = usize::try_from(metadata_section.len()) .map_err(|_| crate::Error::Unrecoverable)?; let metadata_slice = - crate::file::read_exact(&file, metadata_section.pos(), metadata_len)?; + crate::file::read_exact(&*file, metadata_section.pos(), metadata_len)?; Metadata::from_slice(&metadata_slice)? }; let file_accessor = if let Some(dt) = descriptor_table.cloned() { - FileAccessor::DescriptorTable(dt) + // Pre-populate the FD cache with the handle we already opened + // so the first read doesn't need to reopen. + let global_id = (tree_id, blob_file_id).into(); + dt.insert_for_blob_file(global_id, Arc::from(file)); + FileAccessor::DescriptorTable { + table: dt, + fs: fs.clone(), + } } else { - FileAccessor::File(Arc::new(file)) + FileAccessor::File(Arc::from(file)) }; blob_files.push(BlobFile(Arc::new(BlobFileInner { id: blob_file_id, - path: blob_file_path, + path: blob_file_path.clone(), meta, is_deleted: AtomicBool::new(false), checksum, @@ -136,15 +152,39 @@ pub fn recover_blob_files( pub type BlobFileId = u64; #[cfg(test)] +#[expect(clippy::unwrap_used, reason = "test code")] mod tests { use super::*; use test_log::test; #[test] - fn vlog_recovery_missing_blob_file() { - assert!(matches!( - recover_blob_files(Path::new("."), &[(0, Checksum::from_raw(0))], 0, None), - Err(crate::Error::Unrecoverable), - )); + fn vlog_recovery_missing_blob_file_returns_unrecoverable() { + // Manifest says blob id=0 exists, but the blobs folder is empty. + // Recovery should fail with Unrecoverable because blob_files.len() < ids.len(). + let dir = tempfile::tempdir().unwrap(); + let result = recover_blob_files( + dir.path(), + &[(0, Checksum::from_raw(0))], + 0, + None, + &(Arc::new(crate::fs::StdFs) as Arc), + ); + assert!(matches!(result, Err(crate::Error::Unrecoverable))); + } + + #[test] + fn vlog_recovery_nonexistent_folder_returns_empty() { + let dir = tempfile::tempdir().unwrap(); + let missing = dir.path().join("no_such_dir"); + let (blob_files, orphans) = recover_blob_files( + &missing, + &[(0, Checksum::from_raw(0))], + 0, + None, + &(Arc::new(crate::fs::StdFs) as Arc), + ) + .unwrap(); + assert!(blob_files.is_empty()); + assert!(orphans.is_empty()); } } diff --git a/tests/mem_fs_tree.rs b/tests/mem_fs_tree.rs new file mode 100644 index 000000000..dca08e7b0 --- /dev/null +++ b/tests/mem_fs_tree.rs @@ -0,0 +1,164 @@ +use lsm_tree::fs::MemFs; +// Guard trait import required for IterGuardImpl::into_inner() method dispatch. +use lsm_tree::{AbstractTree, Config, Guard, SeqNo, SequenceNumberCounter}; +use test_log::test; + +#[test] +fn open_tree_with_memfs() -> lsm_tree::Result<()> { + let tree = Config::new( + "/virtual/tree", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(MemFs::new()) + .open()?; + + assert!(tree.is_empty(SeqNo::MAX, None)?); + + tree.insert("key1", "value1", 0); + tree.insert("key2", "value2", 1); + tree.insert("key3", "value3", 2); + + assert_eq!(tree.len(SeqNo::MAX, None)?, 3); + + let val = tree.get("key2", SeqNo::MAX)?.expect("key2 should exist"); + assert_eq!(&*val, b"value2"); + + Ok(()) +} + +#[test] +fn memfs_tree_flush_and_read() -> lsm_tree::Result<()> { + let tree = Config::new( + "/virtual/flush", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(MemFs::new()) + .open()?; + + for i in 0u64..100 { + tree.insert(format!("key_{i:05}"), format!("val_{i}"), i); + } + + // Flush memtable to SST (in-memory via MemFs) + tree.flush_active_memtable(0)?; + + // Reads should still work after flush (from SST) + let val = tree + .get("key_00050", SeqNo::MAX)? + .expect("key should exist after flush"); + assert_eq!(&*val, b"val_50"); + + assert_eq!(tree.len(SeqNo::MAX, None)?, 100); + + Ok(()) +} + +#[test] +fn memfs_tree_delete_and_range() -> lsm_tree::Result<()> { + let tree = Config::new( + "/virtual/range", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(MemFs::new()) + .open()?; + + tree.insert("a", "1", 0); + tree.insert("b", "2", 1); + tree.insert("c", "3", 2); + tree.remove("b", 3); + + let items: Vec<_> = tree + .iter(SeqNo::MAX, None) + .map(|guard| { + let (k, v) = guard.into_inner().unwrap(); + ( + String::from_utf8(k.to_vec()).unwrap(), + String::from_utf8(v.to_vec()).unwrap(), + ) + }) + .collect(); + + assert_eq!( + items, + vec![("a".into(), "1".into()), ("c".into(), "3".into())] + ); + + Ok(()) +} + +#[test] +fn memfs_tree_multiple_flushes() -> lsm_tree::Result<()> { + let tree = Config::new( + "/virtual/multi_flush", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(MemFs::new()) + .open()?; + + // First batch + for i in 0u64..50 { + tree.insert(format!("key_{i:05}"), format!("batch1_{i}"), i); + } + tree.flush_active_memtable(0)?; + + // Second batch — some overwrites, some new + for i in 25u64..75 { + tree.insert(format!("key_{i:05}"), format!("batch2_{i}"), 50 + i); + } + tree.flush_active_memtable(0)?; + + // Verify latest values + let val = tree + .get("key_00030", SeqNo::MAX)? + .expect("overwritten key should exist"); + assert_eq!(&*val, b"batch2_30"); + + let val = tree + .get("key_00010", SeqNo::MAX)? + .expect("original key should exist"); + assert_eq!(&*val, b"batch1_10"); + + assert_eq!(tree.len(SeqNo::MAX, None)?, 75); + + Ok(()) +} + +#[test] +fn memfs_shared_across_trees() -> lsm_tree::Result<()> { + let fs = MemFs::new(); + + let tree1 = Config::new( + "/virtual/tree1", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(fs.clone()) + .open()?; + + let tree2 = Config::new( + "/virtual/tree2", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(fs) + .open()?; + + tree1.insert("from_tree1", "hello", 0); + tree2.insert("from_tree2", "world", 0); + + assert!(tree1.get("from_tree1", SeqNo::MAX)?.is_some()); + assert!(tree1.get("from_tree2", SeqNo::MAX)?.is_none()); + assert!(tree2.get("from_tree2", SeqNo::MAX)?.is_some()); + assert!(tree2.get("from_tree1", SeqNo::MAX)?.is_none()); + + Ok(()) +} + +// NOTE: Compaction is not yet fully supported with MemFs. +// There are remaining `std::fs` bypass points in the compaction +// finalization path that produce ENOENT when running fully in-memory. +// Tracked as a known limitation — see mem_fs.rs module docs. From 303361eda5f02e7c038b1c58d2b9905cc2b90663 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Fri, 3 Apr 2026 19:55:32 +0300 Subject: [PATCH 02/26] build: bump rust-toolchain.toml from 1.94.0 to 1.94.1 --- rust-toolchain.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 0d8ed42d9..32c68eec1 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.94.0" +channel = "1.94.1" components = ["rustfmt", "clippy"] From 52f701a632c9b9e02ddbfa0c290549ceaa1db657 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Fri, 3 Apr 2026 20:42:01 +0300 Subject: [PATCH 03/26] fix(fs): retry temp file on AlreadyExists, validate open flags, correct cache metrics - rewrite_atomic retries with next seq on AlreadyExists (handles crash leftovers) - MemFs::open rejects no-flag opens with InvalidInput (matches std::fs) - FileAccessor: pinned FDs not counted as descriptor-cache hits - Add rename-replace atomicity test - Track Tree::open Fs migration in #209 with inline code comment --- src/file.rs | 50 +++++++++++++++++++++----------------------- src/file_accessor.rs | 6 ++++-- src/fs/mem_fs.rs | 39 ++++++++++++++++++++++++++++++++++ src/tree/mod.rs | 5 +++++ 4 files changed, 72 insertions(+), 28 deletions(-) diff --git a/src/file.rs b/src/file.rs index 302581aab..8ddb75be3 100644 --- a/src/file.rs +++ b/src/file.rs @@ -3,8 +3,8 @@ // (found in the LICENSE-* files in the repository) use crate::{ - Slice, fs::{Fs, FsFile}, + Slice, }; use std::{io::Write, path::Path}; @@ -63,37 +63,35 @@ pub fn rewrite_atomic(path: &Path, content: &[u8], fs: &dyn Fs) -> std::io::Resu )] let folder = path.parent().expect("should have a parent"); - // PID + monotonic seq gives uniqueness within a process and across - // concurrent processes. A crash-then-PID-reuse collision is theoretically - // possible but vanishingly unlikely (requires exact PID reuse AND seq - // counter restart to same value). lsm-tree uses exclusive file locking - // so the same data directory is never written by two processes. - let seq = TEMP_SEQ.fetch_add(1, Ordering::Relaxed); let pid = std::process::id(); - let tmp_path = folder.join(format!(".tmp_{pid}_{seq}")); - let result = (|| -> std::io::Result<()> { - let mut file = fs.open( - &tmp_path, + // Retry with incrementing seq on AlreadyExists — handles leftover temp + // files from a previous crash (PID can be reused, especially in containers). + let tmp_path = loop { + let seq = TEMP_SEQ.fetch_add(1, Ordering::Relaxed); + let candidate = folder.join(format!(".tmp_{pid}_{seq}")); + match fs.open( + &candidate, &FsOpenOptions::new().write(true).create_new(true), - )?; - file.write_all(content)?; - file.flush()?; - FsFile::sync_all(&*file)?; - drop(file); - // std::fs::rename overwrites existing destinations on all platforms - // (Rust uses MoveFileExW with MOVEFILE_REPLACE_EXISTING on Windows). - fs.rename(&tmp_path, path)?; - Ok(()) - })(); + ) { + Ok(mut file) => { + file.write_all(content)?; + file.flush()?; + FsFile::sync_all(&*file)?; + break candidate; + } + // Leftover temp file from a previous crash — retry with next seq. + Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {} + Err(e) => return Err(e), + } + }; - if result.is_err() { - // Best-effort cleanup of the temp file on any failure path. - // Safe to call even if fs.open() failed (file never created) — - // remove_file will return NotFound which we ignore. + // std::fs::rename overwrites existing destinations on all platforms + // (Rust uses MoveFileExW with MOVEFILE_REPLACE_EXISTING on Windows). + if let Err(e) = fs.rename(&tmp_path, path) { let _ = fs.remove_file(&tmp_path); + return Err(e); } - result?; fsync_directory(folder, fs)?; Ok(()) diff --git a/src/file_accessor.rs b/src/file_accessor.rs index 2d6316c3f..547364f57 100644 --- a/src/file_accessor.rs +++ b/src/file_accessor.rs @@ -45,7 +45,9 @@ impl FileAccessor { path: &Path, ) -> std::io::Result<(Arc, bool)> { match self { - Self::File(fd) => Ok((fd.clone(), true)), + // Pinned FD — not a descriptor-table cache event; report as miss + // so metrics reflect only actual cache traffic. + Self::File(fd) => Ok((fd.clone(), false)), Self::DescriptorTable { table, fs } => { if let Some(fd) = table.access_for_table(table_id) { return Ok((fd, true)); @@ -68,7 +70,7 @@ impl FileAccessor { path: &Path, ) -> std::io::Result<(Arc, bool)> { match self { - Self::File(fd) => Ok((fd.clone(), true)), + Self::File(fd) => Ok((fd.clone(), false)), Self::DescriptorTable { table, fs } => { if let Some(fd) = table.access_for_blob_file(table_id) { return Ok((fd, true)); diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index b14219461..6c9f4d11a 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -249,6 +249,14 @@ impl Fs for MemFs { let is_dir = state.dirs.contains(&path); let wants_write = opts.write || opts.append; + // Mirror std::fs::OpenOptions: at least one access mode is required. + if !opts.read && !wants_write { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "open requires at least read, write, or append access", + )); + } + // Mirror std::fs::OpenOptions: truncate/create require write access. if opts.truncate && !wants_write { return Err(io::Error::new( @@ -637,6 +645,37 @@ mod tests { Ok(()) } + #[test] + fn rename_atomically_replaces_existing_destination() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let src = Path::new("/dir/new.txt"); + let dst = Path::new("/dir/existing.txt"); + + // Create destination with old content + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(dst, &opts)?; + file.write_all(b"old")?; + drop(file); + + // Create source with new content + let mut file = fs.open(src, &opts)?; + file.write_all(b"new")?; + drop(file); + + // Rename should atomically replace destination + fs.rename(src, dst)?; + assert!(!fs.exists(src)?); + + let mut file = fs.open(dst, &FsOpenOptions::new().read(true))?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + assert_eq!(buf, "new"); + + Ok(()) + } + #[test] fn sync_directory_is_noop() -> io::Result<()> { let fs = MemFs::new(); diff --git a/src/tree/mod.rs b/src/tree/mod.rs index f532399cd..7fa6a80b4 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -1201,6 +1201,11 @@ impl Tree { pub(crate) fn open(config: Config) -> crate::Result { log::debug!("Opening LSM-tree at {}", config.path.display()); + // NOTE: try_exists() and recover() below use std::fs directly, bypassing + // the pluggable Fs trait. This means MemFs (and other non-StdFs backends) + // cannot reopen a tree after drop — only new tree creation works. + // Tracked in: #209 + // Check for old version if config.path.join("version").try_exists()? { log::error!( From 8d6033eac536c0a5ceb23749f824a3e64431d44a Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Fri, 3 Apr 2026 20:47:03 +0300 Subject: [PATCH 04/26] style(file): fix import ordering for rustfmt 1.94.1 --- src/file.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/file.rs b/src/file.rs index 8ddb75be3..993914fe1 100644 --- a/src/file.rs +++ b/src/file.rs @@ -3,8 +3,8 @@ // (found in the LICENSE-* files in the repository) use crate::{ - fs::{Fs, FsFile}, Slice, + fs::{Fs, FsFile}, }; use std::{io::Write, path::Path}; From 3520e08a82de703c755b2120dea34ee1c269912f Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Fri, 3 Apr 2026 21:06:54 +0300 Subject: [PATCH 05/26] fix(fs): return Option from FileAccessor for precise cache metrics - FileAccessor::get_or_open returns Option: None for pinned FDs (no cache event), Some(true) for hit, Some(false) for miss - Metrics in load_block only fire on descriptor-table cache events - rewrite_atomic cleans up temp file on write/flush/sync failure - Add Config::with_shared_fs(Arc) for sharing backends --- src/config/mod.rs | 10 ++++++++++ src/file.rs | 12 +++++++++--- src/file_accessor.rs | 30 ++++++++++++++---------------- src/table/mod.rs | 2 +- src/table/util.rs | 19 +++++++++++-------- 5 files changed, 45 insertions(+), 28 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 03ac58b85..dd4638511 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -468,6 +468,16 @@ impl Config { self } + /// Sets the filesystem backend from an existing shared handle. + /// + /// Useful when multiple configs should reuse the same backend + /// instance, including trait objects and backends that are not `Clone`. + #[must_use] + pub fn with_shared_fs(mut self, fs: Arc) -> Self { + self.fs = fs; + self + } + /// Opens a tree using the config. /// /// # Errors diff --git a/src/file.rs b/src/file.rs index 993914fe1..246744518 100644 --- a/src/file.rs +++ b/src/file.rs @@ -75,9 +75,15 @@ pub fn rewrite_atomic(path: &Path, content: &[u8], fs: &dyn Fs) -> std::io::Resu &FsOpenOptions::new().write(true).create_new(true), ) { Ok(mut file) => { - file.write_all(content)?; - file.flush()?; - FsFile::sync_all(&*file)?; + let write_result = file + .write_all(content) + .and_then(|()| file.flush()) + .and_then(|()| FsFile::sync_all(&*file)); + if let Err(e) = write_result { + drop(file); + let _ = fs.remove_file(&candidate); + return Err(e); + } break candidate; } // Leftover temp file from a previous crash — retry with next seq. diff --git a/src/file_accessor.rs b/src/file_accessor.rs index 547364f57..1b8a2c959 100644 --- a/src/file_accessor.rs +++ b/src/file_accessor.rs @@ -35,50 +35,48 @@ impl FileAccessor { } } - /// Returns a cached table FD or opens the file via [`Fs`] on cache miss. + /// Returns a table FD, opening via [`Fs`] on descriptor-table cache miss. /// - /// The returned `bool` indicates whether the file descriptor was already - /// cached (`true`) or freshly opened (`false`). + /// Returns `(fd, None)` for pinned FDs (no cache involved), + /// `(fd, Some(true))` for descriptor-table cache hit, + /// `(fd, Some(false))` for cache miss (freshly opened and cached). pub fn get_or_open_table( &self, table_id: &GlobalTableId, path: &Path, - ) -> std::io::Result<(Arc, bool)> { + ) -> std::io::Result<(Arc, Option)> { match self { - // Pinned FD — not a descriptor-table cache event; report as miss - // so metrics reflect only actual cache traffic. - Self::File(fd) => Ok((fd.clone(), false)), + Self::File(fd) => Ok((fd.clone(), None)), Self::DescriptorTable { table, fs } => { if let Some(fd) = table.access_for_table(table_id) { - return Ok((fd, true)); + return Ok((fd, Some(true))); } let fd: Arc = Arc::from(fs.open(path, &FsOpenOptions::new().read(true))?); table.insert_for_table(*table_id, fd.clone()); - Ok((fd, false)) + Ok((fd, Some(false))) } } } /// Returns a cached blob file FD or opens it via [`Fs`] on cache miss. - /// - /// The returned `bool` indicates whether the file descriptor was already - /// cached (`true`) or freshly opened (`false`). + /// Returns a blob file FD. See [`get_or_open_table`](Self::get_or_open_table) for + /// semantics of the `Option` cache-hit indicator. pub fn get_or_open_blob_file( &self, table_id: &GlobalTableId, path: &Path, - ) -> std::io::Result<(Arc, bool)> { + ) -> std::io::Result<(Arc, Option)> { match self { - Self::File(fd) => Ok((fd.clone(), false)), + Self::File(fd) => Ok((fd.clone(), None)), Self::DescriptorTable { table, fs } => { if let Some(fd) = table.access_for_blob_file(table_id) { - return Ok((fd, true)); + return Ok((fd, Some(true))); } let fd: Arc = Arc::from(fs.open(path, &FsOpenOptions::new().read(true))?); table.insert_for_blob_file(*table_id, fd.clone()); - Ok((fd, false)) + Ok((fd, Some(false))) } } } diff --git a/src/table/mod.rs b/src/table/mod.rs index 843248278..72c906d8d 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -119,7 +119,7 @@ impl Table { Ok(if let Some(handle) = &self.regions.linked_blob_files { let table_id = self.global_id(); - let (fd, _cache_hit) = self + let (fd, _) = self .file_accessor .get_or_open_table(&table_id, &self.path)?; diff --git a/src/table/util.rs b/src/table/util.rs index 2cf0a02ac..ca7fda41a 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -83,18 +83,21 @@ pub fn load_block( return Ok(block); } - #[cfg(feature = "metrics")] - let (fd, cache_hit) = file_accessor.get_or_open_table(&table_id, path)?; - #[cfg(not(feature = "metrics"))] - let (fd, _) = file_accessor.get_or_open_table(&table_id, path)?; + let (fd, cache_event) = file_accessor.get_or_open_table(&table_id, path)?; + // Only track descriptor-table cache metrics; pinned FDs (None) are not cache events. #[cfg(feature = "metrics")] - if cache_hit { - metrics.table_file_opened_cached.fetch_add(1, Relaxed); - } else { - metrics.table_file_opened_uncached.fetch_add(1, Relaxed); + if let Some(hit) = cache_event { + if hit { + metrics.table_file_opened_cached.fetch_add(1, Relaxed); + } else { + metrics.table_file_opened_uncached.fetch_add(1, Relaxed); + } } + #[cfg(not(feature = "metrics"))] + let _ = cache_event; + let block = Block::from_file( fd.as_ref(), *handle, From afeccbdfca3b32e59e122a31814bc211428efe1f Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Fri, 3 Apr 2026 21:33:22 +0300 Subject: [PATCH 06/26] fix(test): use byte comparisons in iter test, exercise with_shared_fs --- tests/mem_fs_tree.rs | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/tests/mem_fs_tree.rs b/tests/mem_fs_tree.rs index dca08e7b0..5896ce081 100644 --- a/tests/mem_fs_tree.rs +++ b/tests/mem_fs_tree.rs @@ -72,19 +72,14 @@ fn memfs_tree_delete_and_range() -> lsm_tree::Result<()> { let items: Vec<_> = tree .iter(SeqNo::MAX, None) - .map(|guard| { - let (k, v) = guard.into_inner().unwrap(); - ( - String::from_utf8(k.to_vec()).unwrap(), - String::from_utf8(v.to_vec()).unwrap(), - ) - }) - .collect(); - - assert_eq!( - items, - vec![("a".into(), "1".into()), ("c".into(), "3".into())] - ); + .map(|guard| guard.into_inner()) + .collect::>>()?; + + assert_eq!(items.len(), 2); + assert_eq!(&*items[0].0, b"a"); + assert_eq!(&*items[0].1, b"1"); + assert_eq!(&*items[1].0, b"c"); + assert_eq!(&*items[1].1, b"3"); Ok(()) } @@ -129,14 +124,17 @@ fn memfs_tree_multiple_flushes() -> lsm_tree::Result<()> { #[test] fn memfs_shared_across_trees() -> lsm_tree::Result<()> { - let fs = MemFs::new(); + use std::sync::Arc; + + // Exercise Config::with_shared_fs(Arc) for shared backend reuse. + let fs: Arc = Arc::new(MemFs::new()); let tree1 = Config::new( "/virtual/tree1", SequenceNumberCounter::default(), SequenceNumberCounter::default(), ) - .with_fs(fs.clone()) + .with_shared_fs(Arc::clone(&fs)) .open()?; let tree2 = Config::new( @@ -144,7 +142,7 @@ fn memfs_shared_across_trees() -> lsm_tree::Result<()> { SequenceNumberCounter::default(), SequenceNumberCounter::default(), ) - .with_fs(fs) + .with_shared_fs(fs) .open()?; tree1.insert("from_tree1", "hello", 0); From 53d24f0a15d7b044456f27c3666ac3cabb33659f Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Fri, 3 Apr 2026 22:02:43 +0300 Subject: [PATCH 07/26] fix(fs): MemFs::open returns error when opening a directory path --- src/fs/mem_fs.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index 6c9f4d11a..86674a638 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -257,6 +257,14 @@ impl Fs for MemFs { )); } + // Opening a directory path without create flags is an error (mirrors EISDIR). + if is_dir && !opts.create && !opts.create_new { + return Err(io::Error::other(format!( + "path is a directory: {}", + path.display() + ))); + } + // Mirror std::fs::OpenOptions: truncate/create require write access. if opts.truncate && !wants_write { return Err(io::Error::new( From e8ce4f2e5c51328b5a8fe91cd26b5e1f614afb17 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Fri, 3 Apr 2026 22:45:23 +0300 Subject: [PATCH 08/26] fix(fs): seed root dir, correct Seek arithmetic, validate sync_directory MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - MemFs::new seeds "/" in dirs so exists/read_dir are consistent - Seek uses i128 arithmetic — no lossy i64 clamping, proper overflow errors - Append mode: cursor starts at 0 (writes still go to EOF), matches std::fs - sync_directory validates path is an existing directory --- src/fs/mem_fs.rs | 79 ++++++++++++++++++++++++++++++------------------ 1 file changed, 50 insertions(+), 29 deletions(-) diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index 86674a638..23d609dd6 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -60,8 +60,11 @@ impl MemFs { /// Creates a new, empty in-memory filesystem. #[must_use] pub fn new() -> Self { + let mut state = State::default(); + // Seed the root directory so exists("/") and read_dir("/") work. + state.dirs.insert(PathBuf::from("/")); Self { - state: Arc::new(RwLock::new(State::default())), + state: Arc::new(RwLock::new(state)), } } } @@ -147,32 +150,43 @@ impl Write for MemFile { } } -#[expect( - clippy::cast_possible_wrap, - clippy::cast_sign_loss, - reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" -)] impl Seek for MemFile { fn seek(&mut self, pos: SeekFrom) -> io::Result { - let len = { - let data = lock(&self.data)?; - data.len() as i64 - }; - - let new_pos = match pos { - SeekFrom::Start(n) => i64::try_from(n).unwrap_or(i64::MAX), - SeekFrom::End(n) => len.saturating_add(n), - SeekFrom::Current(n) => (self.cursor as i64).saturating_add(n), + let new_pos: u64 = match pos { + SeekFrom::Start(n) => n, + SeekFrom::End(n) => { + let len = { + let data = lock(&self.data)?; + u64::try_from(data.len()).map_err(|_| { + io::Error::other("in-memory file length does not fit in u64") + })? + }; + let result = i128::from(len) + i128::from(n); + if result < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "seek to negative position", + )); + } + u64::try_from(result).map_err(|_| { + io::Error::new(io::ErrorKind::InvalidInput, "seek position overflow") + })? + } + SeekFrom::Current(n) => { + let result = i128::from(self.cursor) + i128::from(n); + if result < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "seek to negative position", + )); + } + u64::try_from(result).map_err(|_| { + io::Error::new(io::ErrorKind::InvalidInput, "seek position overflow") + })? + } }; - if new_pos < 0 { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "seek to negative position", - )); - } - - self.cursor = new_pos as u64; + self.cursor = new_pos; Ok(self.cursor) } } @@ -316,11 +330,10 @@ impl Fs for MemFs { lock(&data)?.clear(); } - let cursor = if opts.append { - lock(&data)?.len() as u64 - } else { - 0 - }; + // Cursor starts at 0 even in append mode — append only affects + // where writes land (Write::write checks is_append), not the + // read cursor. This matches std::fs::File behaviour. + let cursor = 0; Ok(Box::new(MemFile { data, @@ -536,7 +549,15 @@ impl Fs for MemFs { } } - fn sync_directory(&self, _path: &Path) -> io::Result<()> { + fn sync_directory(&self, path: &Path) -> io::Result<()> { + // Durability is a no-op, but validate the path is an existing directory. + let state = read_state(&self.state)?; + if !state.dirs.contains(path) { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("sync_directory: not a directory: {}", path.display()), + )); + } Ok(()) } From f5b1030c6c1de3432bdeae4459d478b151ab3f85 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Fri, 3 Apr 2026 23:14:17 +0300 Subject: [PATCH 09/26] fix(fs): MemFile::set_len requires write access, read_at requires read access --- src/fs/mem_fs.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index 23d609dd6..1699b076d 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -214,6 +214,9 @@ impl FsFile for MemFile { reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" )] fn set_len(&self, size: u64) -> io::Result<()> { + if !self.writable { + return Err(io::Error::other("set_len requires write access")); + } lock(&self.data)?.resize(size as usize, 0); Ok(()) } @@ -223,6 +226,9 @@ impl FsFile for MemFile { reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" )] fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result { + if !self.readable { + return Err(io::Error::other("read_at requires read access")); + } let data = lock(&self.data)?; Ok(copy_from_data(buf, &data, offset as usize)) } From 0f599da47993d2743d322cb8fcfea80f7730f871 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 00:19:08 +0300 Subject: [PATCH 10/26] fix(fs): protect root dir invariant, distinguish file-vs-dir in read_dir errors - remove_dir_all re-seeds "/" after removal to preserve root invariant - read_dir returns "not a directory" for file paths (not generic NotFound) - Fs::rename doc narrowed to files only (lsm-tree never renames dirs) - FileAccessor Debug label aligned with variant name --- src/file_accessor.rs | 2 +- src/fs/mem_fs.rs | 10 ++++++++++ src/fs/mod.rs | 5 ++++- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/src/file_accessor.rs b/src/file_accessor.rs index 1b8a2c959..f9867df25 100644 --- a/src/file_accessor.rs +++ b/src/file_accessor.rs @@ -106,7 +106,7 @@ impl std::fmt::Debug for FileAccessor { match self { Self::File(_) => write!(f, "FileAccessor::Pinned"), Self::DescriptorTable { .. } => { - write!(f, "FileAccessor::Cached") + write!(f, "FileAccessor::DescriptorTable") } } } diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index 1699b076d..201d584b3 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -396,6 +396,13 @@ impl Fs for MemFs { let state = read_state(&self.state)?; if !state.dirs.contains(path) { + // Distinguish "path is a file" from "path does not exist". + if state.files.contains_key(path) { + return Err(io::Error::other(format!( + "not a directory: {}", + path.display() + ))); + } return Err(io::Error::new( io::ErrorKind::NotFound, format!("directory not found: {}", path.display()), @@ -484,6 +491,9 @@ impl Fs for MemFs { state.files.retain(|p, _| !p.starts_with(path)); state.dirs.retain(|p| !p.starts_with(path)); + + // Re-seed root so exists("/") and read_dir("/") remain valid. + state.dirs.insert(PathBuf::from("/")); Ok(()) } diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 2d5ce26d5..6f84c53b1 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -274,12 +274,15 @@ pub trait Fs: Send + Sync + 'static { /// Returns an I/O error if the directory cannot be removed. fn remove_dir_all(&self, path: &Path) -> io::Result<()>; - /// Renames a file or directory from `from` to `to`. + /// Renames a file from `from` to `to`. /// /// If `to` already exists as a regular file, it is atomically replaced. /// This is required by [`rewrite_atomic`](crate::file::rewrite_atomic) /// for crash-safe version pointer updates. /// + /// lsm-tree only renames files (table files, version pointers), never + /// directories. [`MemFs`] rejects directory renames with `InvalidInput`. + /// /// # Errors /// /// Returns an I/O error if the rename fails. From 74352eee9ea1bf0c2ab11fe53d7819c40be36cf9 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 09:13:49 +0300 Subject: [PATCH 11/26] fix(fs): guard write position overflow, distinguish file-as-parent errors --- src/fs/mem_fs.rs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index 201d584b3..ea98d1374 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -133,7 +133,9 @@ impl Write for MemFile { self.cursor as usize }; - let end = pos + buf.len(); + let end = pos.checked_add(buf.len()).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "write position overflow") + })?; if end > data.len() { data.resize(end, 0); } @@ -249,6 +251,7 @@ impl FsFile for MemFile { reason = "RwLock guards are intentionally held for the duration of each method" )] impl Fs for MemFs { + #[expect(clippy::too_many_lines, reason = "open() validates many flag combinations")] fn open(&self, path: &Path, opts: &FsOpenOptions) -> io::Result> { let mut state = write_state(&self.state)?; let path = path.to_path_buf(); @@ -259,6 +262,13 @@ impl Fs for MemFs { && parent != Path::new("/") && !state.dirs.contains(parent) { + // Distinguish "parent is a file" from "parent doesn't exist". + if state.files.contains_key(parent) { + return Err(io::Error::other(format!( + "parent is not a directory: {}", + parent.display() + ))); + } return Err(io::Error::new( io::ErrorKind::NotFound, format!("parent directory does not exist: {}", parent.display()), @@ -506,6 +516,12 @@ impl Fs for MemFs { && parent != Path::new("/") && !state.dirs.contains(parent) { + if state.files.contains_key(parent) { + return Err(io::Error::other(format!( + "destination parent is not a directory: {}", + parent.display() + ))); + } return Err(io::Error::new( io::ErrorKind::NotFound, format!("destination parent not found: {}", parent.display()), From f9c2a21bfcfd738a75b0fbf2d19768d2f9b72eb2 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 09:20:32 +0300 Subject: [PATCH 12/26] style(fs): multi-line expect attribute for CI rustfmt --- src/fs/mem_fs.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index ea98d1374..c99d06ff1 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -251,7 +251,10 @@ impl FsFile for MemFile { reason = "RwLock guards are intentionally held for the duration of each method" )] impl Fs for MemFs { - #[expect(clippy::too_many_lines, reason = "open() validates many flag combinations")] + #[expect( + clippy::too_many_lines, + reason = "open() validates many flag combinations" + )] fn open(&self, path: &Path, opts: &FsOpenOptions) -> io::Result> { let mut state = write_state(&self.state)?; let path = path.to_path_buf(); From 94519823540238e2878813209791128f9fd0867e Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 09:44:50 +0300 Subject: [PATCH 13/26] fix(fs): validate open flags before parent lookup, remove_file rejects dirs --- src/fs/mem_fs.rs | 53 ++++++++++++++++++++++++++---------------------- 1 file changed, 29 insertions(+), 24 deletions(-) diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index c99d06ff1..1136ab5a4 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -258,6 +258,29 @@ impl Fs for MemFs { fn open(&self, path: &Path, opts: &FsOpenOptions) -> io::Result> { let mut state = write_state(&self.state)?; let path = path.to_path_buf(); + let wants_write = opts.write || opts.append; + + // Validate flag combinations first (path-independent), before any + // filesystem lookups. This ensures consistent InvalidInput errors + // regardless of whether the parent directory exists. + if !opts.read && !wants_write { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "open requires at least read, write, or append access", + )); + } + if opts.truncate && !wants_write { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "truncate requires write or append access", + )); + } + if (opts.create || opts.create_new) && !wants_write { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "create/create_new requires write or append access", + )); + } // Verify parent directory exists (mirrors std::fs behaviour). if let Some(parent) = path.parent() @@ -265,7 +288,6 @@ impl Fs for MemFs { && parent != Path::new("/") && !state.dirs.contains(parent) { - // Distinguish "parent is a file" from "parent doesn't exist". if state.files.contains_key(parent) { return Err(io::Error::other(format!( "parent is not a directory: {}", @@ -280,15 +302,6 @@ impl Fs for MemFs { let exists = state.files.contains_key(&path); let is_dir = state.dirs.contains(&path); - let wants_write = opts.write || opts.append; - - // Mirror std::fs::OpenOptions: at least one access mode is required. - if !opts.read && !wants_write { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "open requires at least read, write, or append access", - )); - } // Opening a directory path without create flags is an error (mirrors EISDIR). if is_dir && !opts.create && !opts.create_new { @@ -298,20 +311,6 @@ impl Fs for MemFs { ))); } - // Mirror std::fs::OpenOptions: truncate/create require write access. - if opts.truncate && !wants_write { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "truncate requires write or append access", - )); - } - if (opts.create || opts.create_new) && !wants_write { - return Err(io::Error::new( - io::ErrorKind::InvalidInput, - "create/create_new requires write or append access", - )); - } - // Reject creating a file at a path that is already a directory. if is_dir && (opts.create || opts.create_new) { return Err(io::Error::new( @@ -475,6 +474,12 @@ impl Fs for MemFs { fn remove_file(&self, path: &Path) -> io::Result<()> { let mut state = write_state(&self.state)?; + if state.dirs.contains(path) { + return Err(io::Error::other(format!( + "cannot remove_file on directory: {}", + path.display() + ))); + } if state.files.remove(path).is_none() { return Err(io::Error::new( io::ErrorKind::NotFound, From e0b897377330dce22947a7aae3fbfa5dd995e041 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 10:26:33 +0300 Subject: [PATCH 14/26] fix(fs): improve create_dir_all error message for file-path conflicts --- src/fs/mem_fs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index 1136ab5a4..ec6d54997 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -389,7 +389,7 @@ impl Fs for MemFs { if state.files.contains_key(¤t) { return Err(io::Error::new( io::ErrorKind::AlreadyExists, - format!("ancestor is a file: {}", current.display()), + format!("path conflicts with existing file: {}", current.display()), )); } to_create.push(current.clone()); From 7e0fe3ca04118c9fe67311e27527cd8e6e5ded2f Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 10:42:12 +0300 Subject: [PATCH 15/26] fix(fs): delegate fsync_directory to Fs backend on all platforms --- src/config/mod.rs | 5 ++++- src/file.rs | 12 +++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index dd4638511..09bc55225 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -643,7 +643,10 @@ impl Config { for route in routes { let folder = route.path.join(TABLES_FOLDER); // Dedup by path: scanning the same directory twice would cause - // already-recovered tables to be classified as orphans and deleted. + // already-recovered tables to be classified as orphans and + // deleted. Routing the same path through different Fs backends + // is a configuration error (level_routes validation in + // Config::level_routes rejects overlapping ranges). if !folders.iter().any(|(p, _)| *p == folder) { folders.push((folder, route.fs.clone())); } diff --git a/src/file.rs b/src/file.rs index 246744518..3ed6c0b15 100644 --- a/src/file.rs +++ b/src/file.rs @@ -103,17 +103,15 @@ pub fn rewrite_atomic(path: &Path, content: &[u8], fs: &dyn Fs) -> std::io::Resu Ok(()) } -#[cfg(not(target_os = "windows"))] +/// Delegates directory sync to the backend. +/// +/// On Windows, `StdFs::sync_directory` already returns `Ok(())` (directory +/// fsync is unsupported), but non-`StdFs` backends (e.g., `MemFs`) may use +/// this call for path validation. Always delegate rather than short-circuiting. pub fn fsync_directory(path: &Path, fs: &dyn Fs) -> std::io::Result<()> { fs.sync_directory(path) } -#[cfg(target_os = "windows")] -pub fn fsync_directory(_path: &Path, _fs: &dyn Fs) -> std::io::Result<()> { - // Cannot fsync directory on Windows - Ok(()) -} - #[cfg(test)] #[allow( clippy::unwrap_used, From 8c33e29e68e47217921705b0f0c225127ce93515 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 10:47:24 +0300 Subject: [PATCH 16/26] test(fs): add read+append cursor regression test for MemFs --- src/fs/mem_fs.rs | 36 ++++++++++++++++++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index ec6d54997..8bd016a7c 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -893,6 +893,42 @@ mod tests { Ok(()) } + #[test] + fn read_append_cursor_starts_at_zero() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/rw_append.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"existing")?; + drop(file); + + // Open with read + append — cursor should start at 0 for reads, + // but writes go to EOF. + let opts = FsOpenOptions::new().read(true).append(true); + let mut file = fs.open(path, &opts)?; + + // Read should return existing content from offset 0. + let mut buf = [0u8; 8]; + let n = file.read(&mut buf)?; + assert_eq!(n, 8); + assert_eq!(&buf, b"existing"); + + // Write appends to EOF. + file.write_all(b"+new")?; + drop(file); + + // Verify full content. + let opts = FsOpenOptions::new().read(true); + let mut file = fs.open(path, &opts)?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + assert_eq!(buf, "existing+new"); + + Ok(()) + } + #[test] fn seek_and_overwrite() -> io::Result<()> { let fs = MemFs::new(); From adad93e9bac53769806fb52efa9846586c2722bc Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 14:18:18 +0300 Subject: [PATCH 17/26] fix(fs): reject truncate+append combo, drop unwrap from Config::with_fs doctest --- src/config/mod.rs | 6 ++++-- src/fs/mem_fs.rs | 10 ++++++++-- 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 09bc55225..3c495e175 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -450,6 +450,7 @@ impl Config { /// # Example /// /// ``` + /// # fn main() -> lsm_tree::Result<()> { /// use lsm_tree::{Config, SequenceNumberCounter}; /// use lsm_tree::fs::MemFs; /// @@ -459,8 +460,9 @@ impl Config { /// SequenceNumberCounter::default(), /// ) /// .with_fs(MemFs::new()) - /// .open() - /// .unwrap(); + /// .open()?; + /// # Ok(()) + /// # } /// ``` #[must_use] pub fn with_fs(mut self, fs: F) -> Self { diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index 8bd016a7c..e575e5583 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -269,10 +269,16 @@ impl Fs for MemFs { "open requires at least read, write, or append access", )); } - if opts.truncate && !wants_write { + if opts.truncate && opts.append { return Err(io::Error::new( io::ErrorKind::InvalidInput, - "truncate requires write or append access", + "truncate and append cannot be used together", + )); + } + if opts.truncate && !opts.write { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "truncate requires write access", )); } if (opts.create || opts.create_new) && !wants_write { From 76f3b6763a767b33400ccd36bbe2cbf65f9821c7 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 19:17:59 +0300 Subject: [PATCH 18/26] test(fs): wrong-type error paths and StdFs rename-replace - MemFs: 8 tests for type-mismatch errors (read_dir on file, remove_file on dir, sync_directory on file, open with file parent, rename dir, rename onto dir, rename with file dest parent, remove_dir_all on file) - MemFs: sync_directory distinguishes file-exists from not-found - StdFs: rename atomically replaces existing destination file --- src/file.rs | 30 ++++++++++++ src/fs/mem_fs.rs | 117 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 146 insertions(+), 1 deletion(-) diff --git a/src/file.rs b/src/file.rs index 3ed6c0b15..e4905e8ed 100644 --- a/src/file.rs +++ b/src/file.rs @@ -160,4 +160,34 @@ mod tests { Ok(()) } + + /// Verifies that `StdFs::rename` atomically replaces an existing + /// destination file — the contract required by `rewrite_atomic`. + #[test] + fn std_fs_rename_replaces_existing_file() -> crate::Result<()> { + use crate::fs::{Fs, FsOpenOptions}; + + let dir = tempfile::tempdir()?; + let src = dir.path().join("src.txt"); + let dst = dir.path().join("dst.txt"); + + // Create both files via Fs trait. + let opts = FsOpenOptions::new().write(true).create(true); + let mut f = StdFs.open(&src, &opts)?; + f.write_all(b"new")?; + drop(f); + + let mut f = StdFs.open(&dst, &opts)?; + f.write_all(b"old")?; + drop(f); + + StdFs.rename(&src, &dst)?; + + // dst now has src content, src is gone. + let content = std::fs::read_to_string(&dst)?; + assert_eq!("new", content); + assert!(!src.exists()); + + Ok(()) + } } diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index e575e5583..dc00523b5 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -599,9 +599,15 @@ impl Fs for MemFs { // Durability is a no-op, but validate the path is an existing directory. let state = read_state(&self.state)?; if !state.dirs.contains(path) { + if state.files.contains_key(path) { + return Err(io::Error::other(format!( + "sync_directory: not a directory: {}", + path.display() + ))); + } return Err(io::Error::new( io::ErrorKind::NotFound, - format!("sync_directory: not a directory: {}", path.display()), + format!("sync_directory: path not found: {}", path.display()), )); } Ok(()) @@ -1057,4 +1063,113 @@ mod tests { assert_eq!(meta.len, 11); Ok(()) } + + // ── Wrong-type error-path tests ───────────────────────────────────── + + #[test] + fn read_dir_on_file_returns_not_a_directory() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(Path::new("/dir/file"), &opts)?; + + let err = fs.read_dir(Path::new("/dir/file")).unwrap_err(); + // Must NOT be NotFound — the path exists but is a file. + assert_ne!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn remove_file_on_dir_returns_error() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/somedir"))?; + + let err = fs.remove_file(Path::new("/somedir")).unwrap_err(); + assert_ne!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn sync_directory_on_file_returns_not_a_directory() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(Path::new("/dir/file"), &opts)?; + + let err = fs.sync_directory(Path::new("/dir/file")).unwrap_err(); + assert_ne!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn open_with_parent_as_file_returns_error() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(Path::new("/dir/file"), &opts)?; + + // Try to create a file whose "parent" is actually a file. + let err = fs + .open(Path::new("/dir/file/child"), &opts) + .err() + .expect("open should fail"); + assert_ne!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn rename_directory_returns_invalid_input() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/src_dir"))?; + fs.create_dir_all(Path::new("/dst_parent"))?; + + let err = fs + .rename(Path::new("/src_dir"), Path::new("/dst_parent/moved")) + .unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + Ok(()) + } + + #[test] + fn rename_onto_directory_returns_invalid_input() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(Path::new("/dir/file"), &opts)?; + fs.create_dir_all(Path::new("/dir/dst_dir"))?; + + let err = fs + .rename(Path::new("/dir/file"), Path::new("/dir/dst_dir")) + .unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + Ok(()) + } + + #[test] + fn rename_with_file_as_dest_parent_returns_error() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(Path::new("/dir/src"), &opts)?; + fs.open(Path::new("/dir/blocker"), &opts)?; + + // /dir/blocker is a file, not a directory — cannot be parent of dst. + let err = fs + .rename(Path::new("/dir/src"), Path::new("/dir/blocker/child")) + .unwrap_err(); + assert_ne!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn remove_dir_all_on_file_returns_invalid_input() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(Path::new("/dir/file"), &opts)?; + + let err = fs.remove_dir_all(Path::new("/dir/file")).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + Ok(()) + } } From cf465695e2aef8ff7df24293fcc33bae0574c173 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 19:20:47 +0300 Subject: [PATCH 19/26] fix(fs): use map(|_| ()) for FsFile open error unwrap in test Box lacks Debug, so unwrap_err() is unavailable. Map Ok to () first to satisfy the Debug bound. --- src/fs/mem_fs.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index dc00523b5..64e5eb844 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -1111,8 +1111,8 @@ mod tests { // Try to create a file whose "parent" is actually a file. let err = fs .open(Path::new("/dir/file/child"), &opts) - .err() - .expect("open should fail"); + .map(|_| ()) + .unwrap_err(); assert_ne!(err.kind(), io::ErrorKind::NotFound); Ok(()) } From fee8a8906de48b5ce5ebe191d06f0eecb6082674 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 19:43:28 +0300 Subject: [PATCH 20/26] refactor(fs): extract ensure_parent_dir helper, empty write no-op - MemFile::write returns Ok(0) immediately for empty buffers without locking data or mutating cursor/file size - Extract parent-directory validation into ensure_parent_dir() shared by open() and rename(), removing duplicated check logic --- src/fs/mem_fs.rs | 68 +++++++++++++++++++++--------------------------- 1 file changed, 30 insertions(+), 38 deletions(-) diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index 64e5eb844..fa8e25380 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -125,6 +125,9 @@ impl Write for MemFile { if !self.writable { return Err(io::Error::other("file not opened for writing")); } + if buf.is_empty() { + return Ok(0); + } let mut data = lock(&self.data)?; let pos = if self.is_append { @@ -242,6 +245,31 @@ impl FsFile for MemFile { } } +/// Validates that the parent directory of `path` exists and is a directory. +/// +/// Returns `Ok(())` when the parent is root, empty, or an existing directory. +/// Returns `Err(Other)` when the parent is a file, or `Err(NotFound)` when +/// it does not exist at all. +fn ensure_parent_dir(path: &Path, state: &State) -> io::Result<()> { + if let Some(parent) = path.parent() + && !parent.as_os_str().is_empty() + && parent != Path::new("/") + && !state.dirs.contains(parent) + { + if state.files.contains_key(parent) { + return Err(io::Error::other(format!( + "parent is not a directory: {}", + parent.display() + ))); + } + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("parent directory does not exist: {}", parent.display()), + )); + } + Ok(()) +} + // --------------------------------------------------------------------------- // Fs for MemFs // --------------------------------------------------------------------------- @@ -251,10 +279,6 @@ impl FsFile for MemFile { reason = "RwLock guards are intentionally held for the duration of each method" )] impl Fs for MemFs { - #[expect( - clippy::too_many_lines, - reason = "open() validates many flag combinations" - )] fn open(&self, path: &Path, opts: &FsOpenOptions) -> io::Result> { let mut state = write_state(&self.state)?; let path = path.to_path_buf(); @@ -288,23 +312,7 @@ impl Fs for MemFs { )); } - // Verify parent directory exists (mirrors std::fs behaviour). - if let Some(parent) = path.parent() - && !parent.as_os_str().is_empty() - && parent != Path::new("/") - && !state.dirs.contains(parent) - { - if state.files.contains_key(parent) { - return Err(io::Error::other(format!( - "parent is not a directory: {}", - parent.display() - ))); - } - return Err(io::Error::new( - io::ErrorKind::NotFound, - format!("parent directory does not exist: {}", parent.display()), - )); - } + ensure_parent_dir(&path, &state)?; let exists = state.files.contains_key(&path); let is_dir = state.dirs.contains(&path); @@ -524,23 +532,7 @@ impl Fs for MemFs { fn rename(&self, from: &Path, to: &Path) -> io::Result<()> { let mut state = write_state(&self.state)?; - // Validate destination parent exists (mirrors std::fs behaviour). - if let Some(parent) = to.parent() - && !parent.as_os_str().is_empty() - && parent != Path::new("/") - && !state.dirs.contains(parent) - { - if state.files.contains_key(parent) { - return Err(io::Error::other(format!( - "destination parent is not a directory: {}", - parent.display() - ))); - } - return Err(io::Error::new( - io::ErrorKind::NotFound, - format!("destination parent not found: {}", parent.display()), - )); - } + ensure_parent_dir(to, &state)?; // Reject renaming onto an existing directory. Otherwise `to` would end // up present in both `files` and `dirs`, corrupting MemFs state. From 62e1fcf20ede37d2531702976c47716115605f07 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 22:17:31 +0300 Subject: [PATCH 21/26] fix(fs): reject empty paths, permission guard tests, doc clarity - MemFs: add ensure_non_empty_path() guard on open/create_dir_all/rename to prevent entries keyed by empty PathBuf escaping /-rooted namespace - MemFs: add set_len/read_at permission-denied tests (read-only file rejects set_len, write-only file rejects read_at) - MemFs: add empty-path rejection tests for open/create_dir_all/rename - Config: clarify fs field and setters as "default backend for levels without an explicit route" - FileAccessor: de-duplicate blob file doc comment --- src/config/mod.rs | 6 ++-- src/file_accessor.rs | 5 +-- src/fs/mem_fs.rs | 77 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 5 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 3c495e175..86ac5fb9c 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -221,7 +221,7 @@ pub struct Config { #[doc(hidden)] pub path: PathBuf, - /// Filesystem backend + /// Default filesystem backend for levels without an explicit route. /// /// Defaults to [`StdFs`]. Use [`Config::with_fs`] to plug in an /// alternative backend such as [`MemFs`](crate::fs::MemFs). @@ -442,7 +442,7 @@ impl Config { } } - /// Sets the filesystem backend. + /// Sets the default filesystem backend used for levels without an explicit route. /// /// Defaults to [`StdFs`]. Use [`MemFs`](crate::fs::MemFs) for /// in-memory trees (testing, ephemeral indexes). @@ -470,7 +470,7 @@ impl Config { self } - /// Sets the filesystem backend from an existing shared handle. + /// Sets the default filesystem backend from an existing shared handle. /// /// Useful when multiple configs should reuse the same backend /// instance, including trait objects and backends that are not `Clone`. diff --git a/src/file_accessor.rs b/src/file_accessor.rs index f9867df25..76bf3a07c 100644 --- a/src/file_accessor.rs +++ b/src/file_accessor.rs @@ -59,8 +59,9 @@ impl FileAccessor { } } - /// Returns a cached blob file FD or opens it via [`Fs`] on cache miss. - /// Returns a blob file FD. See [`get_or_open_table`](Self::get_or_open_table) for + /// Returns a blob file FD, opening via [`Fs`] on descriptor-table cache miss. + /// + /// See [`get_or_open_table`](Self::get_or_open_table) for /// semantics of the `Option` cache-hit indicator. pub fn get_or_open_blob_file( &self, diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index fa8e25380..e2d91ccdd 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -245,6 +245,14 @@ impl FsFile for MemFile { } } +/// Rejects empty paths before they can create entries in the `/`-rooted namespace. +fn ensure_non_empty_path(path: &Path) -> io::Result<()> { + if path.as_os_str().is_empty() { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty path")); + } + Ok(()) +} + /// Validates that the parent directory of `path` exists and is a directory. /// /// Returns `Ok(())` when the parent is root, empty, or an existing directory. @@ -280,6 +288,7 @@ fn ensure_parent_dir(path: &Path, state: &State) -> io::Result<()> { )] impl Fs for MemFs { fn open(&self, path: &Path, opts: &FsOpenOptions) -> io::Result> { + ensure_non_empty_path(path)?; let mut state = write_state(&self.state)?; let path = path.to_path_buf(); let wants_write = opts.write || opts.append; @@ -393,6 +402,7 @@ impl Fs for MemFs { } fn create_dir_all(&self, path: &Path) -> io::Result<()> { + ensure_non_empty_path(path)?; let mut state = write_state(&self.state)?; // Collect all components first, then validate, then insert. @@ -530,6 +540,8 @@ impl Fs for MemFs { } fn rename(&self, from: &Path, to: &Path) -> io::Result<()> { + ensure_non_empty_path(from)?; + ensure_non_empty_path(to)?; let mut state = write_state(&self.state)?; ensure_parent_dir(to, &state)?; @@ -1164,4 +1176,69 @@ mod tests { assert_eq!(err.kind(), io::ErrorKind::InvalidInput); Ok(()) } + + #[test] + fn set_len_without_write_access_returns_error() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/file.bin"); + let mut file = fs.open(path, &FsOpenOptions::new().write(true).create(true))?; + file.write_all(b"data")?; + drop(file); + + let file = fs.open(path, &FsOpenOptions::new().read(true))?; + assert!(file.set_len(1).is_err()); + Ok(()) + } + + #[test] + fn read_at_without_read_access_returns_error() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/file.bin"); + let mut file = fs.open(path, &FsOpenOptions::new().write(true).create(true))?; + file.write_all(b"data")?; + + let mut buf = [0u8; 1]; + assert!(file.read_at(&mut buf, 0).is_err()); + Ok(()) + } + + #[test] + fn open_empty_path_returns_invalid_input() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs + .open(Path::new(""), &FsOpenOptions::new().read(true)) + .map(|_| ()) + .unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + Ok(()) + } + + #[test] + fn create_dir_all_empty_path_returns_invalid_input() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs.create_dir_all(Path::new("")).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + Ok(()) + } + + #[test] + fn rename_empty_path_returns_invalid_input() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(Path::new("/dir/file"), &opts)?; + + let err = fs.rename(Path::new(""), Path::new("/dir/dst")).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + + let err = fs + .rename(Path::new("/dir/file"), Path::new("")) + .unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + Ok(()) + } } From c20521eb363418c30fe379a2eef7c455714e96b2 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 22:33:00 +0300 Subject: [PATCH 22/26] docs(fs): document no-op behavior of FileAccessor remove methods remove_for_table and remove_for_blob_file are intentional no-ops for the pinned File variant (no descriptor cache to evict from). --- src/file_accessor.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/file_accessor.rs b/src/file_accessor.rs index 76bf3a07c..b475b66e1 100644 --- a/src/file_accessor.rs +++ b/src/file_accessor.rs @@ -89,12 +89,18 @@ impl FileAccessor { } } + /// Removes a table FD from the descriptor cache. + /// + /// No-op for pinned `Self::File` variant (no cache to evict from). pub fn remove_for_table(&self, table_id: &GlobalTableId) { if let Self::DescriptorTable { table, .. } = self { table.remove_for_table(table_id); } } + /// Removes a blob file FD from the descriptor cache. + /// + /// No-op for pinned `Self::File` variant (no cache to evict from). pub fn remove_for_blob_file(&self, table_id: &GlobalTableId) { if let Self::DescriptorTable { table, .. } = self { table.remove_for_blob_file(table_id); From 1cc66b4229315266953a6b5e32564c23f82cd797 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 22:49:14 +0300 Subject: [PATCH 23/26] fix(vlog): fail-fast on missing blobs folder when manifest references blob files recover_blob_files now returns Unrecoverable when the blobs folder does not exist but ids is non-empty (manifest references blob files). Only treat NotFound as empty when ids is empty (standard non-blob trees). --- src/vlog/mod.rs | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/src/vlog/mod.rs b/src/vlog/mod.rs index da638250d..85a0a7e30 100644 --- a/src/vlog/mod.rs +++ b/src/vlog/mod.rs @@ -31,14 +31,17 @@ pub fn recover_blob_files( descriptor_table: Option<&Arc>, fs: &Arc, ) -> crate::Result<(Vec, Vec)> { - // Recover directly from read_dir; treat NotFound as empty (avoids TOCTOU - // with a separate exists() check). This is correct even when `ids` is - // non-empty: the blobs folder may not exist for standard (non-blob) trees, - // and callers handle missing blob files via orphan detection. + // Recover directly from read_dir; treat NotFound as empty only for + // standard (non-blob) trees where no blob folder is expected. + // If the manifest references blob files (ids non-empty) but the folder + // is missing, that is unrecoverable corruption — fail fast. let entries = match fs.read_dir(folder) { Ok(entries) => entries, Err(e) if e.kind() == std::io::ErrorKind::NotFound => { - return Ok((vec![], vec![])); + if ids.is_empty() { + return Ok((vec![], vec![])); + } + return Err(crate::Error::Unrecoverable); } Err(e) => return Err(e.into()), }; @@ -173,12 +176,12 @@ mod tests { } #[test] - fn vlog_recovery_nonexistent_folder_returns_empty() { + fn vlog_recovery_nonexistent_folder_no_ids_returns_empty() { let dir = tempfile::tempdir().unwrap(); let missing = dir.path().join("no_such_dir"); let (blob_files, orphans) = recover_blob_files( &missing, - &[(0, Checksum::from_raw(0))], + &[], 0, None, &(Arc::new(crate::fs::StdFs) as Arc), @@ -187,4 +190,18 @@ mod tests { assert!(blob_files.is_empty()); assert!(orphans.is_empty()); } + + #[test] + fn vlog_recovery_nonexistent_folder_with_ids_returns_unrecoverable() { + let dir = tempfile::tempdir().unwrap(); + let missing = dir.path().join("no_such_dir"); + let result = recover_blob_files( + &missing, + &[(0, Checksum::from_raw(0))], + 0, + None, + &(Arc::new(crate::fs::StdFs) as Arc), + ); + assert!(matches!(result, Err(crate::Error::Unrecoverable))); + } } From 5662b67c93c83f6d275bdfcdc15f82ed2f087a6f Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 23:34:03 +0300 Subject: [PATCH 24/26] fix(table,vlog): evict FD before remove_file, defer blob cache inserts - Inner::Drop: evict descriptor-table entry before fs.remove_file so Windows doesn't fail on open-file removal - recover_blob_files: defer insert_for_blob_file until all blobs parse successfully, preventing partial FD leaks on recovery failure --- src/table/inner.rs | 10 ++++++---- src/vlog/mod.rs | 16 ++++++++++++---- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/src/table/inner.rs b/src/table/inner.rs index 4d9a63e01..9fcf0d9fb 100644 --- a/src/table/inner.rs +++ b/src/table/inner.rs @@ -101,16 +101,18 @@ impl Drop for Inner { if self.is_deleted.load(std::sync::atomic::Ordering::Acquire) { log::trace!("Cleanup deleted table {global_id:?} at {:?}", self.path); + // Evict cached FD before removing the file — on Windows, + // remove_file fails while a handle is open. + self.file_accessor.as_descriptor_table().inspect(|d| { + d.remove_for_table(&global_id); + }); + if let Err(e) = self.fs.remove_file(&self.path) { log::warn!( "Failed to cleanup deleted table {global_id:?} at {:?}: {e:?}", self.path, ); } - - self.file_accessor.as_descriptor_table().inspect(|d| { - d.remove_for_table(&global_id); - }); } } } diff --git a/src/vlog/mod.rs b/src/vlog/mod.rs index 85a0a7e30..1e4d018f4 100644 --- a/src/vlog/mod.rs +++ b/src/vlog/mod.rs @@ -58,6 +58,10 @@ pub fn recover_blob_files( let mut blob_files = Vec::with_capacity(ids.len()); let mut orphaned_blob_files = vec![]; + // Deferred cache inserts — only committed after all blobs parse + // successfully, so a partial recovery doesn't leak FDs in the + // descriptor table. + let mut pending_cache_inserts = Vec::new(); for (idx, dirent) in entries.into_iter().enumerate() { let file_name = &dirent.file_name; @@ -111,17 +115,16 @@ pub fn recover_blob_files( Metadata::from_slice(&metadata_slice)? }; + let file: Arc = Arc::from(file); let file_accessor = if let Some(dt) = descriptor_table.cloned() { - // Pre-populate the FD cache with the handle we already opened - // so the first read doesn't need to reopen. let global_id = (tree_id, blob_file_id).into(); - dt.insert_for_blob_file(global_id, Arc::from(file)); + pending_cache_inserts.push((dt.clone(), global_id, file.clone())); FileAccessor::DescriptorTable { table: dt, fs: fs.clone(), } } else { - FileAccessor::File(Arc::from(file)) + FileAccessor::File(file) }; blob_files.push(BlobFile(Arc::new(BlobFileInner { @@ -146,6 +149,11 @@ pub fn recover_blob_files( return Err(crate::Error::Unrecoverable); } + // All blobs parsed successfully — commit FDs to the descriptor cache. + for (dt, global_id, file) in pending_cache_inserts { + dt.insert_for_blob_file(global_id, file); + } + log::debug!("Successfully recovered {} blob files", blob_files.len()); Ok((blob_files, orphaned_blob_files)) From e911497147449acda91d0e1b26b3cf0046e0a8c3 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sat, 4 Apr 2026 23:53:02 +0300 Subject: [PATCH 25/26] fix(fs): replace as-usize casts with checked usize::try_from MemFile::read, write, set_len, and read_at used unchecked `as usize` casts on u64 cursor/offset/size values. On 32-bit targets these silently truncate, corrupting file state. Now use usize::try_from with InvalidInput errors. Removed all cast_possible_truncation expects. --- src/fs/mem_fs.rs | 46 ++++++++++++++++++++++++++-------------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs index e2d91ccdd..03c30a1cb 100644 --- a/src/fs/mem_fs.rs +++ b/src/fs/mem_fs.rs @@ -98,17 +98,18 @@ fn copy_from_data(buf: &mut [u8], data: &[u8], pos: usize) -> usize { n } -#[expect( - clippy::cast_possible_truncation, - reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" -)] impl Read for MemFile { fn read(&mut self, buf: &mut [u8]) -> io::Result { if !self.readable { return Err(io::Error::other("file not opened for reading")); } let data = lock(&self.data)?; - let pos = self.cursor as usize; + let pos = usize::try_from(self.cursor).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + "cursor exceeds addressable memory", + ) + })?; let n = copy_from_data(buf, &data, pos); drop(data); self.cursor += n as u64; @@ -116,10 +117,6 @@ impl Read for MemFile { } } -#[expect( - clippy::cast_possible_truncation, - reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" -)] impl Write for MemFile { fn write(&mut self, buf: &[u8]) -> io::Result { if !self.writable { @@ -133,7 +130,12 @@ impl Write for MemFile { let pos = if self.is_append { data.len() } else { - self.cursor as usize + usize::try_from(self.cursor).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + "write position exceeds addressable memory", + ) + })? }; let end = pos.checked_add(buf.len()).ok_or_else(|| { @@ -214,28 +216,32 @@ impl FsFile for MemFile { }) } - #[expect( - clippy::cast_possible_truncation, - reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" - )] fn set_len(&self, size: u64) -> io::Result<()> { if !self.writable { return Err(io::Error::other("set_len requires write access")); } - lock(&self.data)?.resize(size as usize, 0); + let new_len = usize::try_from(size).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + "set_len size exceeds usize::MAX", + ) + })?; + lock(&self.data)?.resize(new_len, 0); Ok(()) } - #[expect( - clippy::cast_possible_truncation, - reason = "MemFs is a test/ephemeral backend — files never exceed usize::MAX" - )] fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result { if !self.readable { return Err(io::Error::other("read_at requires read access")); } + let offset = usize::try_from(offset).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + "read_at offset exceeds usize::MAX", + ) + })?; let data = lock(&self.data)?; - Ok(copy_from_data(buf, &data, offset as usize)) + Ok(copy_from_data(buf, &data, offset)) } /// No-op: in-memory files are not shared across processes. `MemFs` is a From 093765c85d032559098fdd39710d6c2602cba4d4 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Sun, 5 Apr 2026 00:10:06 +0300 Subject: [PATCH 26/26] fix(table): close pinned file handle before remove_file in Drop Inner::Drop now moves the FileAccessor out via std::mem::replace (with a Closed sentinel) and drops it before calling fs.remove_file. This ensures both pinned File handles and descriptor-table cached FDs are released before deletion, fixing remove_file failures on Windows where open handles prevent file removal. --- src/file_accessor.rs | 10 +++++++++- src/table/inner.rs | 12 +++++++++--- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/src/file_accessor.rs b/src/file_accessor.rs index b475b66e1..318c37901 100644 --- a/src/file_accessor.rs +++ b/src/file_accessor.rs @@ -24,6 +24,11 @@ pub enum FileAccessor { /// Filesystem backend for opening files on cache miss. fs: Arc, }, + + /// Sentinel used during [`Drop`] to move ownership of the file handle + /// before deleting the underlying file. Not constructed outside `Drop`. + #[doc(hidden)] + Closed, } impl FileAccessor { @@ -31,7 +36,7 @@ impl FileAccessor { pub fn as_descriptor_table(&self) -> Option<&DescriptorTable> { match self { Self::DescriptorTable { table, .. } => Some(table), - Self::File(_) => None, + Self::File(_) | Self::Closed => None, } } @@ -56,6 +61,7 @@ impl FileAccessor { table.insert_for_table(*table_id, fd.clone()); Ok((fd, Some(false))) } + Self::Closed => Err(std::io::Error::other("file accessor closed")), } } @@ -79,6 +85,7 @@ impl FileAccessor { table.insert_for_blob_file(*table_id, fd.clone()); Ok((fd, Some(false))) } + Self::Closed => Err(std::io::Error::other("file accessor closed")), } } @@ -115,6 +122,7 @@ impl std::fmt::Debug for FileAccessor { Self::DescriptorTable { .. } => { write!(f, "FileAccessor::DescriptorTable") } + Self::Closed => write!(f, "FileAccessor::Closed"), } } } diff --git a/src/table/inner.rs b/src/table/inner.rs index 9fcf0d9fb..15748e318 100644 --- a/src/table/inner.rs +++ b/src/table/inner.rs @@ -101,12 +101,18 @@ impl Drop for Inner { if self.is_deleted.load(std::sync::atomic::Ordering::Acquire) { log::trace!("Cleanup deleted table {global_id:?} at {:?}", self.path); - // Evict cached FD before removing the file — on Windows, - // remove_file fails while a handle is open. - self.file_accessor.as_descriptor_table().inspect(|d| { + // Move the accessor out so any pinned file handle is closed + // before attempting deletion on Windows. + let file_accessor = std::mem::replace(&mut self.file_accessor, FileAccessor::Closed); + + // Evict cached FD from the descriptor table. + file_accessor.as_descriptor_table().inspect(|d| { d.remove_for_table(&global_id); }); + // Drop the accessor (and its Arc) before remove. + drop(file_accessor); + if let Err(e) = self.fs.remove_file(&self.path) { log::warn!( "Failed to cleanup deleted table {global_id:?} at {:?}: {e:?}",