diff --git a/benches/index_block.rs b/benches/index_block.rs index c2f1b6a35..490644ef9 100644 --- a/benches/index_block.rs +++ b/benches/index_block.rs @@ -116,6 +116,7 @@ fn build_table_for_point_read(restart_interval: u8) -> BenchTable { 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(8))), + Arc::new(lsm_tree::fs::StdFs), false, false, None, diff --git a/rust-toolchain.toml b/rust-toolchain.toml index 0d8ed42d9..32c68eec1 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] -channel = "1.94.0" +channel = "1.94.1" components = ["rustfmt", "clippy"] diff --git a/src/blob_tree/ingest.rs b/src/blob_tree/ingest.rs index ff1c05d7d..6826779d0 100644 --- a/src/blob_tree/ingest.rs +++ b/src/blob_tree/ingest.rs @@ -227,6 +227,7 @@ impl<'a> BlobIngestion<'a> { index.id, index.config.cache.clone(), index.config.descriptor_table.clone(), + self.table.level_fs.clone(), false, false, index.config.encryption.clone(), diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index ccb0489ce..53946d760 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -139,7 +139,6 @@ pub struct BlobTree { impl BlobTree { pub(crate) fn open(config: Config) -> crate::Result { use crate::file::{BLOBS_FOLDER, fsync_directory}; - use crate::fs::Fs; let index = crate::Tree::open(config)?; @@ -425,7 +424,7 @@ impl AbstractTree for BlobTree { self.index.table_id_counter.clone(), 64 * 1_024 * 1_024, 0, - level_fs, + level_fs.clone(), )? .set_comparator(self.index.config.comparator.clone()) .use_data_block_restart_interval(data_block_restart_interval) @@ -545,6 +544,7 @@ impl AbstractTree for BlobTree { self.index.id, self.index.config.cache.clone(), self.index.config.descriptor_table.clone(), + level_fs.clone(), pin_filter, pin_index, self.index.config.encryption.clone(), diff --git a/src/compaction/flavour.rs b/src/compaction/flavour.rs index 5d6960d6f..0b82237f7 100644 --- a/src/compaction/flavour.rs +++ b/src/compaction/flavour.rs @@ -371,6 +371,7 @@ impl StandardCompaction { fn consume_writer(self, opts: &Options, dst_lvl: usize) -> crate::Result> { let table_base_folder = self.table_writer.base_path.clone(); + let level_fs = self.table_writer.fs.clone(); let pin_filter = opts.config.filter_block_pinning_policy.get(dst_lvl); let pin_index = opts.config.index_block_pinning_policy.get(dst_lvl); @@ -386,6 +387,7 @@ impl StandardCompaction { opts.tree_id, opts.config.cache.clone(), opts.config.descriptor_table.clone(), + level_fs.clone(), pin_filter, pin_index, opts.config.encryption.clone(), diff --git a/src/config/mod.rs b/src/config/mod.rs index c3ecbd2d5..86ac5fb9c 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -216,22 +216,17 @@ impl KvSeparationOptions { } /// Tree configuration builder -/// -/// The generic parameter `F` selects the filesystem backend. -/// It defaults to [`StdFs`], so existing code that writes `Config` -/// without a type parameter continues to work unchanged. -pub struct Config { +pub struct Config { /// Folder path #[doc(hidden)] pub path: PathBuf, - /// Filesystem backend + /// Default filesystem backend for levels without an explicit route. /// - // All Config fields are `#[doc(hidden)] pub` by convention — callers use - // builder methods or `..Default::default()`, not struct literals directly. - // A `with_fs()` builder will be added when call-site refactoring lands. + /// Defaults to [`StdFs`]. Use [`Config::with_fs`] to plug in an + /// alternative backend such as [`MemFs`](crate::fs::MemFs). #[doc(hidden)] - pub fs: Arc, + pub fs: Arc, /// Per-level filesystem routing for tiered storage. /// @@ -447,6 +442,44 @@ impl Config { } } + /// Sets the default filesystem backend used for levels without an explicit route. + /// + /// Defaults to [`StdFs`]. Use [`MemFs`](crate::fs::MemFs) for + /// in-memory trees (testing, ephemeral indexes). + /// + /// # Example + /// + /// ``` + /// # fn main() -> lsm_tree::Result<()> { + /// use lsm_tree::{Config, SequenceNumberCounter}; + /// use lsm_tree::fs::MemFs; + /// + /// let tree = Config::new( + /// "/virtual/tree", + /// SequenceNumberCounter::default(), + /// SequenceNumberCounter::default(), + /// ) + /// .with_fs(MemFs::new()) + /// .open()?; + /// # Ok(()) + /// # } + /// ``` + #[must_use] + pub fn with_fs(mut self, fs: F) -> Self { + self.fs = Arc::new(fs); + self + } + + /// Sets the default filesystem backend from an existing shared handle. + /// + /// Useful when multiple configs should reuse the same backend + /// instance, including trait objects and backends that are not `Clone`. + #[must_use] + pub fn with_shared_fs(mut self, fs: Arc) -> Self { + self.fs = fs; + self + } + /// Opens a tree using the config. /// /// # Errors @@ -582,7 +615,7 @@ mod tests { } } -impl Config { +impl Config { /// Returns the tables folder path and [`Fs`] backend for the given level. /// /// If [`level_routes`](Self::level_routes) has an entry covering this @@ -612,7 +645,10 @@ impl Config { for route in routes { let folder = route.path.join(TABLES_FOLDER); // Dedup by path: scanning the same directory twice would cause - // already-recovered tables to be classified as orphans and deleted. + // already-recovered tables to be classified as orphans and + // deleted. Routing the same path through different Fs backends + // is a configuration error (level_routes validation in + // Config::level_routes rejects overlapping ranges). if !folders.iter().any(|(p, _)| *p == folder) { folders.push((folder, route.fs.clone())); } diff --git a/src/file.rs b/src/file.rs index 81bb7f0ca..e4905e8ed 100644 --- a/src/file.rs +++ b/src/file.rs @@ -47,55 +47,71 @@ pub fn read_exact(file: &dyn FsFile, offset: u64, size: usize) -> std::io::Resul Ok(builder.freeze().into()) } -/// Atomically rewrites a file. +/// Atomically rewrites a file via the [`Fs`] trait. +/// +/// Writes `content` to a temporary file in the same directory, fsyncs it, +/// then renames over `path`. This ensures readers never see a partial write. pub fn rewrite_atomic(path: &Path, content: &[u8], fs: &dyn Fs) -> std::io::Result<()> { + use crate::fs::FsOpenOptions; + use std::sync::atomic::{AtomicU64, Ordering}; + + static TEMP_SEQ: AtomicU64 = AtomicU64::new(0); + #[expect( clippy::expect_used, reason = "every file should have a parent directory" )] let folder = path.parent().expect("should have a parent"); - // NOTE: tempfile crate uses std::fs internally; migrating temp-file - // creation to Fs would require a custom implementation. - let mut temp_file = tempfile::NamedTempFile::new_in(folder)?; - temp_file.write_all(content)?; - temp_file.flush()?; - temp_file.as_file_mut().sync_all()?; - temp_file.persist(path)?; - - // Suppress unused-variable warning on Windows where the post-persist - // sync block is skipped (directory fsync is unsupported). - let _ = &fs; - - #[cfg(not(target_os = "windows"))] - { - use crate::fs::FsOpenOptions; - - let file = fs.open(path, &FsOpenOptions::new().read(true))?; - FsFile::sync_all(&*file)?; - - #[expect( - clippy::expect_used, - reason = "files should always have a parent directory" - )] - let folder = path.parent().expect("should have parent folder"); - fs.sync_directory(folder)?; + let pid = std::process::id(); + + // Retry with incrementing seq on AlreadyExists — handles leftover temp + // files from a previous crash (PID can be reused, especially in containers). + let tmp_path = loop { + let seq = TEMP_SEQ.fetch_add(1, Ordering::Relaxed); + let candidate = folder.join(format!(".tmp_{pid}_{seq}")); + match fs.open( + &candidate, + &FsOpenOptions::new().write(true).create_new(true), + ) { + Ok(mut file) => { + let write_result = file + .write_all(content) + .and_then(|()| file.flush()) + .and_then(|()| FsFile::sync_all(&*file)); + if let Err(e) = write_result { + drop(file); + let _ = fs.remove_file(&candidate); + return Err(e); + } + break candidate; + } + // Leftover temp file from a previous crash — retry with next seq. + Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {} + Err(e) => return Err(e), + } + }; + + // std::fs::rename overwrites existing destinations on all platforms + // (Rust uses MoveFileExW with MOVEFILE_REPLACE_EXISTING on Windows). + if let Err(e) = fs.rename(&tmp_path, path) { + let _ = fs.remove_file(&tmp_path); + return Err(e); } + fsync_directory(folder, fs)?; Ok(()) } -#[cfg(not(target_os = "windows"))] +/// Delegates directory sync to the backend. +/// +/// On Windows, `StdFs::sync_directory` already returns `Ok(())` (directory +/// fsync is unsupported), but non-`StdFs` backends (e.g., `MemFs`) may use +/// this call for path validation. Always delegate rather than short-circuiting. pub fn fsync_directory(path: &Path, fs: &dyn Fs) -> std::io::Result<()> { fs.sync_directory(path) } -#[cfg(target_os = "windows")] -pub fn fsync_directory(_path: &Path, _fs: &dyn Fs) -> std::io::Result<()> { - // Cannot fsync directory on Windows - Ok(()) -} - #[cfg(test)] #[allow( clippy::unwrap_used, @@ -144,4 +160,34 @@ mod tests { Ok(()) } + + /// Verifies that `StdFs::rename` atomically replaces an existing + /// destination file — the contract required by `rewrite_atomic`. + #[test] + fn std_fs_rename_replaces_existing_file() -> crate::Result<()> { + use crate::fs::{Fs, FsOpenOptions}; + + let dir = tempfile::tempdir()?; + let src = dir.path().join("src.txt"); + let dst = dir.path().join("dst.txt"); + + // Create both files via Fs trait. + let opts = FsOpenOptions::new().write(true).create(true); + let mut f = StdFs.open(&src, &opts)?; + f.write_all(b"new")?; + drop(f); + + let mut f = StdFs.open(&dst, &opts)?; + f.write_all(b"old")?; + drop(f); + + StdFs.rename(&src, &dst)?; + + // dst now has src content, src is gone. + let content = std::fs::read_to_string(&dst)?; + assert_eq!("new", content); + assert!(!src.exists()); + + Ok(()) + } } diff --git a/src/file_accessor.rs b/src/file_accessor.rs index c617882b4..318c37901 100644 --- a/src/file_accessor.rs +++ b/src/file_accessor.rs @@ -4,7 +4,8 @@ use crate::GlobalTableId; use crate::descriptor_table::DescriptorTable; -use crate::fs::FsFile; +use crate::fs::{Fs, FsFile, FsOpenOptions}; +use std::path::Path; use std::sync::Arc; /// Allows accessing a file (either cached or pinned) @@ -15,46 +16,101 @@ pub enum FileAccessor { /// This is used in case file descriptor cache is `None` (to skip cache lookups) File(Arc), - /// Access to file descriptor cache - DescriptorTable(Arc), + /// Access to file descriptor cache with [`Fs`]-based fallback for + /// cache misses. + DescriptorTable { + /// The FD cache. + table: Arc, + /// Filesystem backend for opening files on cache miss. + fs: Arc, + }, + + /// Sentinel used during [`Drop`] to move ownership of the file handle + /// before deleting the underlying file. Not constructed outside `Drop`. + #[doc(hidden)] + Closed, } impl FileAccessor { #[must_use] pub fn as_descriptor_table(&self) -> Option<&DescriptorTable> { match self { - Self::DescriptorTable(d) => Some(d), - Self::File(_) => None, + Self::DescriptorTable { table, .. } => Some(table), + Self::File(_) | Self::Closed => None, } } - #[must_use] - pub fn access_for_table(&self, table_id: &GlobalTableId) -> Option> { + /// Returns a table FD, opening via [`Fs`] on descriptor-table cache miss. + /// + /// Returns `(fd, None)` for pinned FDs (no cache involved), + /// `(fd, Some(true))` for descriptor-table cache hit, + /// `(fd, Some(false))` for cache miss (freshly opened and cached). + pub fn get_or_open_table( + &self, + table_id: &GlobalTableId, + path: &Path, + ) -> std::io::Result<(Arc, Option)> { match self { - Self::File(fd) => Some(fd.clone()), - Self::DescriptorTable(descriptor_table) => descriptor_table.access_for_table(table_id), - } - } - - pub fn insert_for_table(&self, table_id: GlobalTableId, fd: Arc) { - if let Self::DescriptorTable(descriptor_table) = self { - descriptor_table.insert_for_table(table_id, fd); + Self::File(fd) => Ok((fd.clone(), None)), + Self::DescriptorTable { table, fs } => { + if let Some(fd) = table.access_for_table(table_id) { + return Ok((fd, Some(true))); + } + let fd: Arc = + Arc::from(fs.open(path, &FsOpenOptions::new().read(true))?); + table.insert_for_table(*table_id, fd.clone()); + Ok((fd, Some(false))) + } + Self::Closed => Err(std::io::Error::other("file accessor closed")), } } - #[must_use] - pub fn access_for_blob_file(&self, table_id: &GlobalTableId) -> Option> { + /// Returns a blob file FD, opening via [`Fs`] on descriptor-table cache miss. + /// + /// See [`get_or_open_table`](Self::get_or_open_table) for + /// semantics of the `Option` cache-hit indicator. + pub fn get_or_open_blob_file( + &self, + table_id: &GlobalTableId, + path: &Path, + ) -> std::io::Result<(Arc, Option)> { match self { - Self::File(fd) => Some(fd.clone()), - Self::DescriptorTable(descriptor_table) => { - descriptor_table.access_for_blob_file(table_id) + Self::File(fd) => Ok((fd.clone(), None)), + Self::DescriptorTable { table, fs } => { + if let Some(fd) = table.access_for_blob_file(table_id) { + return Ok((fd, Some(true))); + } + let fd: Arc = + Arc::from(fs.open(path, &FsOpenOptions::new().read(true))?); + table.insert_for_blob_file(*table_id, fd.clone()); + Ok((fd, Some(false))) } + Self::Closed => Err(std::io::Error::other("file accessor closed")), } } + /// Pre-populates the blob file FD cache after creating a new blob file. pub fn insert_for_blob_file(&self, table_id: GlobalTableId, fd: Arc) { - if let Self::DescriptorTable(descriptor_table) = self { - descriptor_table.insert_for_blob_file(table_id, fd); + if let Self::DescriptorTable { table, .. } = self { + table.insert_for_blob_file(table_id, fd); + } + } + + /// Removes a table FD from the descriptor cache. + /// + /// No-op for pinned `Self::File` variant (no cache to evict from). + pub fn remove_for_table(&self, table_id: &GlobalTableId) { + if let Self::DescriptorTable { table, .. } = self { + table.remove_for_table(table_id); + } + } + + /// Removes a blob file FD from the descriptor cache. + /// + /// No-op for pinned `Self::File` variant (no cache to evict from). + pub fn remove_for_blob_file(&self, table_id: &GlobalTableId) { + if let Self::DescriptorTable { table, .. } = self { + table.remove_for_blob_file(table_id); } } } @@ -63,9 +119,10 @@ impl std::fmt::Debug for FileAccessor { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { match self { Self::File(_) => write!(f, "FileAccessor::Pinned"), - Self::DescriptorTable(_) => { - write!(f, "FileAccessor::Cached") + Self::DescriptorTable { .. } => { + write!(f, "FileAccessor::DescriptorTable") } + Self::Closed => write!(f, "FileAccessor::Closed"), } } } diff --git a/src/fs/mem_fs.rs b/src/fs/mem_fs.rs new file mode 100644 index 000000000..03c30a1cb --- /dev/null +++ b/src/fs/mem_fs.rs @@ -0,0 +1,1250 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! In-memory [`Fs`] implementation for testing and ephemeral trees. +//! +//! All file data lives in memory — there are no durability guarantees. +//! `sync_all`, `sync_data`, and `sync_directory` are deliberate no-ops. +//! +//! # Known limitations +//! +//! - **Tree reopen**: `Tree::open` checks for the `CURRENT` version file +//! via `Path::try_exists()` (bypasses `Fs`). New trees work correctly +//! (the check returns `false` → `create_new`), but reopening an +//! in-memory tree after drop is not supported. +//! - **Version GC**: Old version file cleanup in `SuperVersions::gc` uses +//! `std::fs` directly — stale version entries accumulate in memory +//! until the `MemFs` is dropped. Acceptable for testing / ephemeral use. +//! - **Compaction**: Some code paths in the compaction finalization still +//! bypass the `Fs` trait. Write + flush + point-read works; compaction +//! may fail with `ENOENT` on virtual paths. + +use super::{Fs, FsDirEntry, FsFile, FsMetadata, FsOpenOptions}; +use std::collections::{HashMap, HashSet}; +use std::io::{self, Read, Seek, SeekFrom, Write}; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex, RwLock}; + +// --------------------------------------------------------------------------- +// MemFs +// --------------------------------------------------------------------------- + +/// In-memory [`Fs`] backend for testing and ephemeral in-memory trees. +/// +/// Backed by a `HashMap>>>` — no disk I/O is +/// performed. Clones share the same backing store, and individual file +/// contents are synchronized through a per-file [`Mutex`]. +/// +/// # Example +/// +/// ``` +/// use lsm_tree::fs::MemFs; +/// use std::sync::Arc; +/// +/// let fs = MemFs::new(); +/// let dyn_fs: Arc = Arc::new(fs); +/// ``` +#[derive(Clone, Debug)] +pub struct MemFs { + state: Arc>, +} + +#[derive(Debug, Default)] +struct State { + files: HashMap>>>, + dirs: HashSet, +} + +impl MemFs { + /// Creates a new, empty in-memory filesystem. + #[must_use] + pub fn new() -> Self { + let mut state = State::default(); + // Seed the root directory so exists("/") and read_dir("/") work. + state.dirs.insert(PathBuf::from("/")); + Self { + state: Arc::new(RwLock::new(state)), + } + } +} + +impl Default for MemFs { + fn default() -> Self { + Self::new() + } +} + +// --------------------------------------------------------------------------- +// MemFile +// --------------------------------------------------------------------------- + +/// An open file handle backed by an in-memory buffer. +struct MemFile { + data: Arc>>, + cursor: u64, + readable: bool, + writable: bool, + is_append: bool, +} + +/// Copies bytes from `data[pos..]` into `buf`, returning byte count. +fn copy_from_data(buf: &mut [u8], data: &[u8], pos: usize) -> usize { + let available = data.get(pos..).unwrap_or_default(); + let n = buf.len().min(available.len()); + if let (Some(dst), Some(src)) = (buf.get_mut(..n), available.get(..n)) { + dst.copy_from_slice(src); + } + n +} + +impl Read for MemFile { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + if !self.readable { + return Err(io::Error::other("file not opened for reading")); + } + let data = lock(&self.data)?; + let pos = usize::try_from(self.cursor).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + "cursor exceeds addressable memory", + ) + })?; + let n = copy_from_data(buf, &data, pos); + drop(data); + self.cursor += n as u64; + Ok(n) + } +} + +impl Write for MemFile { + fn write(&mut self, buf: &[u8]) -> io::Result { + if !self.writable { + return Err(io::Error::other("file not opened for writing")); + } + if buf.is_empty() { + return Ok(0); + } + let mut data = lock(&self.data)?; + + let pos = if self.is_append { + data.len() + } else { + usize::try_from(self.cursor).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + "write position exceeds addressable memory", + ) + })? + }; + + let end = pos.checked_add(buf.len()).ok_or_else(|| { + io::Error::new(io::ErrorKind::InvalidInput, "write position overflow") + })?; + if end > data.len() { + data.resize(end, 0); + } + if let Some(dst) = data.get_mut(pos..end) { + dst.copy_from_slice(buf); + } + drop(data); + self.cursor = end as u64; + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl Seek for MemFile { + fn seek(&mut self, pos: SeekFrom) -> io::Result { + let new_pos: u64 = match pos { + SeekFrom::Start(n) => n, + SeekFrom::End(n) => { + let len = { + let data = lock(&self.data)?; + u64::try_from(data.len()).map_err(|_| { + io::Error::other("in-memory file length does not fit in u64") + })? + }; + let result = i128::from(len) + i128::from(n); + if result < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "seek to negative position", + )); + } + u64::try_from(result).map_err(|_| { + io::Error::new(io::ErrorKind::InvalidInput, "seek position overflow") + })? + } + SeekFrom::Current(n) => { + let result = i128::from(self.cursor) + i128::from(n); + if result < 0 { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "seek to negative position", + )); + } + u64::try_from(result).map_err(|_| { + io::Error::new(io::ErrorKind::InvalidInput, "seek position overflow") + })? + } + }; + + self.cursor = new_pos; + Ok(self.cursor) + } +} + +impl FsFile for MemFile { + fn sync_all(&self) -> io::Result<()> { + Ok(()) + } + + fn sync_data(&self) -> io::Result<()> { + Ok(()) + } + + fn metadata(&self) -> io::Result { + let data = lock(&self.data)?; + Ok(FsMetadata { + len: data.len() as u64, + is_dir: false, + is_file: true, + }) + } + + fn set_len(&self, size: u64) -> io::Result<()> { + if !self.writable { + return Err(io::Error::other("set_len requires write access")); + } + let new_len = usize::try_from(size).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + "set_len size exceeds usize::MAX", + ) + })?; + lock(&self.data)?.resize(new_len, 0); + Ok(()) + } + + fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result { + if !self.readable { + return Err(io::Error::other("read_at requires read access")); + } + let offset = usize::try_from(offset).map_err(|_| { + io::Error::new( + io::ErrorKind::InvalidInput, + "read_at offset exceeds usize::MAX", + ) + })?; + let data = lock(&self.data)?; + Ok(copy_from_data(buf, &data, offset)) + } + + /// No-op: in-memory files are not shared across processes. `MemFs` is a + /// test/ephemeral backend — cross-process exclusivity is not meaningful. + fn lock_exclusive(&self) -> io::Result<()> { + Ok(()) + } +} + +/// Rejects empty paths before they can create entries in the `/`-rooted namespace. +fn ensure_non_empty_path(path: &Path) -> io::Result<()> { + if path.as_os_str().is_empty() { + return Err(io::Error::new(io::ErrorKind::InvalidInput, "empty path")); + } + Ok(()) +} + +/// Validates that the parent directory of `path` exists and is a directory. +/// +/// Returns `Ok(())` when the parent is root, empty, or an existing directory. +/// Returns `Err(Other)` when the parent is a file, or `Err(NotFound)` when +/// it does not exist at all. +fn ensure_parent_dir(path: &Path, state: &State) -> io::Result<()> { + if let Some(parent) = path.parent() + && !parent.as_os_str().is_empty() + && parent != Path::new("/") + && !state.dirs.contains(parent) + { + if state.files.contains_key(parent) { + return Err(io::Error::other(format!( + "parent is not a directory: {}", + parent.display() + ))); + } + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("parent directory does not exist: {}", parent.display()), + )); + } + Ok(()) +} + +// --------------------------------------------------------------------------- +// Fs for MemFs +// --------------------------------------------------------------------------- + +#[expect( + clippy::significant_drop_tightening, + reason = "RwLock guards are intentionally held for the duration of each method" +)] +impl Fs for MemFs { + fn open(&self, path: &Path, opts: &FsOpenOptions) -> io::Result> { + ensure_non_empty_path(path)?; + let mut state = write_state(&self.state)?; + let path = path.to_path_buf(); + let wants_write = opts.write || opts.append; + + // Validate flag combinations first (path-independent), before any + // filesystem lookups. This ensures consistent InvalidInput errors + // regardless of whether the parent directory exists. + if !opts.read && !wants_write { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "open requires at least read, write, or append access", + )); + } + if opts.truncate && opts.append { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "truncate and append cannot be used together", + )); + } + if opts.truncate && !opts.write { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "truncate requires write access", + )); + } + if (opts.create || opts.create_new) && !wants_write { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "create/create_new requires write or append access", + )); + } + + ensure_parent_dir(&path, &state)?; + + let exists = state.files.contains_key(&path); + let is_dir = state.dirs.contains(&path); + + // Opening a directory path without create flags is an error (mirrors EISDIR). + if is_dir && !opts.create && !opts.create_new { + return Err(io::Error::other(format!( + "path is a directory: {}", + path.display() + ))); + } + + // Reject creating a file at a path that is already a directory. + if is_dir && (opts.create || opts.create_new) { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("path is a directory: {}", path.display()), + )); + } + + if opts.create_new { + if exists { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("file already exists: {}", path.display()), + )); + } + let data = Arc::new(Mutex::new(Vec::new())); + state.files.insert(path, Arc::clone(&data)); + return Ok(Box::new(MemFile { + data, + cursor: 0, + readable: opts.read, + writable: opts.write || opts.append, + is_append: opts.append, + })); + } + + if exists { + let data = state + .files + .get(&path) + .map(Arc::clone) + .ok_or_else(|| io::Error::new(io::ErrorKind::NotFound, "concurrent removal"))?; + + if opts.truncate { + lock(&data)?.clear(); + } + + // Cursor starts at 0 even in append mode — append only affects + // where writes land (Write::write checks is_append), not the + // read cursor. This matches std::fs::File behaviour. + let cursor = 0; + + Ok(Box::new(MemFile { + data, + cursor, + readable: opts.read, + writable: opts.write || opts.append, + is_append: opts.append, + })) + } else if opts.create { + let data = Arc::new(Mutex::new(Vec::new())); + state.files.insert(path, Arc::clone(&data)); + Ok(Box::new(MemFile { + data, + cursor: 0, + readable: opts.read, + writable: opts.write || opts.append, + is_append: opts.append, + })) + } else { + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("file not found: {}", path.display()), + )) + } + } + + fn create_dir_all(&self, path: &Path) -> io::Result<()> { + ensure_non_empty_path(path)?; + let mut state = write_state(&self.state)?; + + // Collect all components first, then validate, then insert. + // This avoids partial insertion if an ancestor is a regular file. + let mut to_create = Vec::new(); + let mut current = path.to_path_buf(); + loop { + if state.files.contains_key(¤t) { + return Err(io::Error::new( + io::ErrorKind::AlreadyExists, + format!("path conflicts with existing file: {}", current.display()), + )); + } + to_create.push(current.clone()); + if !current.pop() || current.as_os_str().is_empty() { + break; + } + } + + for dir in to_create { + state.dirs.insert(dir); + } + Ok(()) + } + + fn read_dir(&self, path: &Path) -> io::Result> { + let state = read_state(&self.state)?; + + if !state.dirs.contains(path) { + // Distinguish "path is a file" from "path does not exist". + if state.files.contains_key(path) { + return Err(io::Error::other(format!( + "not a directory: {}", + path.display() + ))); + } + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("directory not found: {}", path.display()), + )); + } + + let mut entries = Vec::new(); + + for file_path in state.files.keys() { + if file_path.parent() == Some(path) + && let Some(name) = file_path.file_name() + { + // Match StdFs contract: reject non-UTF-8 names with InvalidData. + let file_name = name.to_str().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "non-UTF-8 filename in directory {}: {}", + path.display(), + name.display() + ), + ) + })?; + entries.push(FsDirEntry { + path: file_path.clone(), + file_name: file_name.to_owned(), + is_dir: false, + }); + } + } + + for dir_path in &state.dirs { + if dir_path.parent() == Some(path) + && dir_path != path + && let Some(name) = dir_path.file_name() + { + let file_name = name.to_str().ok_or_else(|| { + io::Error::new( + io::ErrorKind::InvalidData, + format!( + "non-UTF-8 filename in directory {}: {}", + path.display(), + name.display() + ), + ) + })?; + entries.push(FsDirEntry { + path: dir_path.clone(), + file_name: file_name.to_owned(), + is_dir: true, + }); + } + } + + Ok(entries) + } + + fn remove_file(&self, path: &Path) -> io::Result<()> { + let mut state = write_state(&self.state)?; + if state.dirs.contains(path) { + return Err(io::Error::other(format!( + "cannot remove_file on directory: {}", + path.display() + ))); + } + if state.files.remove(path).is_none() { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("file not found: {}", path.display()), + )); + } + Ok(()) + } + + fn remove_dir_all(&self, path: &Path) -> io::Result<()> { + let mut state = write_state(&self.state)?; + + // Reject files — std::fs::remove_dir_all errors on non-directories. + if state.files.contains_key(path) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("path is not a directory: {}", path.display()), + )); + } + + if !state.dirs.contains(path) { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("path not found: {}", path.display()), + )); + } + + state.files.retain(|p, _| !p.starts_with(path)); + state.dirs.retain(|p| !p.starts_with(path)); + + // Re-seed root so exists("/") and read_dir("/") remain valid. + state.dirs.insert(PathBuf::from("/")); + Ok(()) + } + + fn rename(&self, from: &Path, to: &Path) -> io::Result<()> { + ensure_non_empty_path(from)?; + ensure_non_empty_path(to)?; + let mut state = write_state(&self.state)?; + + ensure_parent_dir(to, &state)?; + + // Reject renaming onto an existing directory. Otherwise `to` would end + // up present in both `files` and `dirs`, corrupting MemFs state. + if state.dirs.contains(to) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("destination is a directory: {}", to.display()), + )); + } + + // Directory renames are not implemented in MemFs because they require + // updating descendant paths in both `dirs` and `files`. + if state.dirs.contains(from) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("path is a directory: {}", from.display()), + )); + } + + if let Some(data) = state.files.remove(from) { + state.files.insert(to.to_path_buf(), data); + Ok(()) + } else { + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("file not found: {}", from.display()), + )) + } + } + + fn metadata(&self, path: &Path) -> io::Result { + let state = read_state(&self.state)?; + + if let Some(data) = state.files.get(path) { + let d = lock(data)?; + Ok(FsMetadata { + len: d.len() as u64, + is_dir: false, + is_file: true, + }) + } else if state.dirs.contains(path) { + Ok(FsMetadata { + len: 0, + is_dir: true, + is_file: false, + }) + } else { + Err(io::Error::new( + io::ErrorKind::NotFound, + format!("path not found: {}", path.display()), + )) + } + } + + fn sync_directory(&self, path: &Path) -> io::Result<()> { + // Durability is a no-op, but validate the path is an existing directory. + let state = read_state(&self.state)?; + if !state.dirs.contains(path) { + if state.files.contains_key(path) { + return Err(io::Error::other(format!( + "sync_directory: not a directory: {}", + path.display() + ))); + } + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("sync_directory: path not found: {}", path.display()), + )); + } + Ok(()) + } + + fn exists(&self, path: &Path) -> io::Result { + let state = read_state(&self.state)?; + Ok(state.files.contains_key(path) || state.dirs.contains(path)) + } +} + +// --------------------------------------------------------------------------- +// Lock helpers — convert PoisonError to io::Error +// --------------------------------------------------------------------------- + +fn lock(m: &Mutex) -> io::Result> { + m.lock().map_err(|_| io::Error::other("mutex poisoned")) +} + +fn read_state(rw: &RwLock) -> io::Result> { + rw.read().map_err(|_| io::Error::other("rwlock poisoned")) +} + +fn write_state(rw: &RwLock) -> io::Result> { + rw.write().map_err(|_| io::Error::other("rwlock poisoned")) +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +#[cfg(test)] +#[expect( + clippy::unwrap_used, + clippy::indexing_slicing, + clippy::unnecessary_wraps, + reason = "test code" +)] +mod tests { + use super::*; + use std::io::{Read, Write}; + use std::sync::Arc; + use test_log::test; + + #[test] + fn create_read_write() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/data"))?; + + let path = Path::new("/data/test.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello world")?; + drop(file); + + let opts = FsOpenOptions::new().read(true); + let mut file = fs.open(path, &opts)?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + assert_eq!(buf, "hello world"); + + Ok(()) + } + + #[test] + fn directory_operations() -> io::Result<()> { + let fs = MemFs::new(); + let nested = PathBuf::from("/a/b/c"); + fs.create_dir_all(&nested)?; + assert!(fs.exists(&nested)?); + assert!(fs.exists(Path::new("/a/b"))?); + + let file_path = nested.join("data.bin"); + let opts = FsOpenOptions::new().write(true).create_new(true); + let mut file = fs.open(&file_path, &opts)?; + file.write_all(b"data")?; + drop(file); + + let entries = fs.read_dir(&nested)?; + assert_eq!(entries.len(), 1); + assert_eq!(entries[0].file_name, "data.bin"); + assert!(!entries[0].is_dir); + + let meta = fs.metadata(&file_path)?; + assert!(meta.is_file); + assert!(!meta.is_dir); + assert_eq!(meta.len, 4); + + fs.remove_file(&file_path)?; + assert!(!fs.exists(&file_path)?); + + fs.remove_dir_all(Path::new("/a"))?; + assert!(!fs.exists(Path::new("/a"))?); + assert!(!fs.exists(&nested)?); + + Ok(()) + } + + #[test] + fn rename_file() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let src = Path::new("/dir/src.txt"); + let dst = Path::new("/dir/dst.txt"); + + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(src, &opts)?; + file.write_all(b"content")?; + drop(file); + + fs.rename(src, dst)?; + assert!(!fs.exists(src)?); + assert!(fs.exists(dst)?); + + Ok(()) + } + + #[test] + fn rename_atomically_replaces_existing_destination() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let src = Path::new("/dir/new.txt"); + let dst = Path::new("/dir/existing.txt"); + + // Create destination with old content + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(dst, &opts)?; + file.write_all(b"old")?; + drop(file); + + // Create source with new content + let mut file = fs.open(src, &opts)?; + file.write_all(b"new")?; + drop(file); + + // Rename should atomically replace destination + fs.rename(src, dst)?; + assert!(!fs.exists(src)?); + + let mut file = fs.open(dst, &FsOpenOptions::new().read(true))?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + assert_eq!(buf, "new"); + + Ok(()) + } + + #[test] + fn sync_directory_is_noop() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + fs.sync_directory(Path::new("/dir"))?; + Ok(()) + } + + #[test] + fn file_metadata_and_set_len() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/meta.bin"); + let opts = FsOpenOptions::new().write(true).create(true).read(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"12345")?; + + let meta = file.metadata()?; + assert!(meta.is_file); + assert_eq!(meta.len, 5); + + file.set_len(3)?; + let meta = file.metadata()?; + assert_eq!(meta.len, 3); + + Ok(()) + } + + #[test] + fn read_at_positional() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/pread.bin"); + let opts = FsOpenOptions::new().write(true).create(true).read(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello world")?; + + let mut buf = [0u8; 5]; + let n = file.read_at(&mut buf, 6)?; + assert_eq!(n, 5); + assert_eq!(&buf, b"world"); + + let n = file.read_at(&mut buf, 0)?; + assert_eq!(n, 5); + assert_eq!(&buf, b"hello"); + + // Past EOF + let n = file.read_at(&mut buf, 100)?; + assert_eq!(n, 0); + + Ok(()) + } + + #[test] + fn lock_exclusive_is_noop() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/lock"); + let opts = FsOpenOptions::new().write(true).create(true); + let file = fs.open(path, &opts)?; + file.lock_exclusive()?; + Ok(()) + } + + #[test] + fn open_create_new_fails_on_existing() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/file"); + let opts = FsOpenOptions::new().write(true).create_new(true); + fs.open(path, &opts)?; + + let err = fs.open(path, &opts).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::AlreadyExists); + Ok(()) + } + + #[test] + fn open_nonexistent_without_create_fails() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/missing"); + let opts = FsOpenOptions::new().read(true); + let err = fs.open(path, &opts).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn open_fails_when_parent_missing() -> io::Result<()> { + let fs = MemFs::new(); + let path = Path::new("/no/such/dir/file"); + let opts = FsOpenOptions::new().write(true).create(true); + let err = fs.open(path, &opts).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn truncate_on_open() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/trunc.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello world")?; + drop(file); + + let opts = FsOpenOptions::new().write(true).truncate(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hi")?; + drop(file); + + let meta = fs.metadata(path)?; + assert_eq!(meta.len, 2); + Ok(()) + } + + #[test] + fn append_mode() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/append.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello")?; + drop(file); + + let opts = FsOpenOptions::new().write(true).append(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b" world")?; + drop(file); + + let opts = FsOpenOptions::new().read(true); + let mut file = fs.open(path, &opts)?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + assert_eq!(buf, "hello world"); + Ok(()) + } + + #[test] + fn read_append_cursor_starts_at_zero() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/rw_append.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"existing")?; + drop(file); + + // Open with read + append — cursor should start at 0 for reads, + // but writes go to EOF. + let opts = FsOpenOptions::new().read(true).append(true); + let mut file = fs.open(path, &opts)?; + + // Read should return existing content from offset 0. + let mut buf = [0u8; 8]; + let n = file.read(&mut buf)?; + assert_eq!(n, 8); + assert_eq!(&buf, b"existing"); + + // Write appends to EOF. + file.write_all(b"+new")?; + drop(file); + + // Verify full content. + let opts = FsOpenOptions::new().read(true); + let mut file = fs.open(path, &opts)?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + assert_eq!(buf, "existing+new"); + + Ok(()) + } + + #[test] + fn seek_and_overwrite() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/seek.bin"); + let opts = FsOpenOptions::new().write(true).create(true).read(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"hello world")?; + + file.seek(SeekFrom::Start(6))?; + file.write_all(b"rust!")?; + + file.seek(SeekFrom::Start(0))?; + let mut buf = String::new(); + file.read_to_string(&mut buf)?; + assert_eq!(buf, "hello rust!"); + + Ok(()) + } + + #[test] + fn object_safety() -> io::Result<()> { + let fs: Arc = Arc::new(MemFs::new()); + let bogus = Path::new("/nonexistent"); + assert!(!fs.exists(bogus)?); + Ok(()) + } + + #[test] + fn metadata_directory() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/mydir"))?; + let meta = fs.metadata(Path::new("/mydir"))?; + assert!(meta.is_dir); + assert!(!meta.is_file); + Ok(()) + } + + #[test] + fn read_dir_with_subdirectory() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/root/subdir"))?; + + let file_path = Path::new("/root/file.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(file_path, &opts)?; + + let mut entries = fs.read_dir(Path::new("/root"))?; + entries.sort_by(|a, b| a.file_name.cmp(&b.file_name)); + assert_eq!(entries.len(), 2); + assert_eq!(entries[0].file_name, "file.txt"); + assert!(!entries[0].is_dir); + assert_eq!(entries[1].file_name, "subdir"); + assert!(entries[1].is_dir); + Ok(()) + } + + #[test] + fn remove_file_nonexistent_fails() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs.remove_file(Path::new("/missing")).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn rename_nonexistent_fails() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs + .rename(Path::new("/missing"), Path::new("/dst")) + .err() + .unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn read_dir_nonexistent_fails() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs.read_dir(Path::new("/missing")).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn metadata_nonexistent_fails() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs.metadata(Path::new("/missing")).err().unwrap(); + assert_eq!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn sync_data_is_noop() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let path = Path::new("/dir/file"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs.open(path, &opts)?; + file.write_all(b"data")?; + file.sync_data()?; + Ok(()) + } + + #[test] + fn clones_share_state() -> io::Result<()> { + let fs1 = MemFs::new(); + let fs2 = fs1.clone(); + + fs1.create_dir_all(Path::new("/shared"))?; + let path = Path::new("/shared/file.txt"); + let opts = FsOpenOptions::new().write(true).create(true); + let mut file = fs1.open(path, &opts)?; + file.write_all(b"shared data")?; + drop(file); + + assert!(fs2.exists(path)?); + let meta = fs2.metadata(path)?; + assert_eq!(meta.len, 11); + Ok(()) + } + + // ── Wrong-type error-path tests ───────────────────────────────────── + + #[test] + fn read_dir_on_file_returns_not_a_directory() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(Path::new("/dir/file"), &opts)?; + + let err = fs.read_dir(Path::new("/dir/file")).unwrap_err(); + // Must NOT be NotFound — the path exists but is a file. + assert_ne!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn remove_file_on_dir_returns_error() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/somedir"))?; + + let err = fs.remove_file(Path::new("/somedir")).unwrap_err(); + assert_ne!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn sync_directory_on_file_returns_not_a_directory() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(Path::new("/dir/file"), &opts)?; + + let err = fs.sync_directory(Path::new("/dir/file")).unwrap_err(); + assert_ne!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn open_with_parent_as_file_returns_error() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(Path::new("/dir/file"), &opts)?; + + // Try to create a file whose "parent" is actually a file. + let err = fs + .open(Path::new("/dir/file/child"), &opts) + .map(|_| ()) + .unwrap_err(); + assert_ne!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn rename_directory_returns_invalid_input() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/src_dir"))?; + fs.create_dir_all(Path::new("/dst_parent"))?; + + let err = fs + .rename(Path::new("/src_dir"), Path::new("/dst_parent/moved")) + .unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + Ok(()) + } + + #[test] + fn rename_onto_directory_returns_invalid_input() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(Path::new("/dir/file"), &opts)?; + fs.create_dir_all(Path::new("/dir/dst_dir"))?; + + let err = fs + .rename(Path::new("/dir/file"), Path::new("/dir/dst_dir")) + .unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + Ok(()) + } + + #[test] + fn rename_with_file_as_dest_parent_returns_error() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(Path::new("/dir/src"), &opts)?; + fs.open(Path::new("/dir/blocker"), &opts)?; + + // /dir/blocker is a file, not a directory — cannot be parent of dst. + let err = fs + .rename(Path::new("/dir/src"), Path::new("/dir/blocker/child")) + .unwrap_err(); + assert_ne!(err.kind(), io::ErrorKind::NotFound); + Ok(()) + } + + #[test] + fn remove_dir_all_on_file_returns_invalid_input() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(Path::new("/dir/file"), &opts)?; + + let err = fs.remove_dir_all(Path::new("/dir/file")).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + Ok(()) + } + + #[test] + fn set_len_without_write_access_returns_error() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/file.bin"); + let mut file = fs.open(path, &FsOpenOptions::new().write(true).create(true))?; + file.write_all(b"data")?; + drop(file); + + let file = fs.open(path, &FsOpenOptions::new().read(true))?; + assert!(file.set_len(1).is_err()); + Ok(()) + } + + #[test] + fn read_at_without_read_access_returns_error() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + + let path = Path::new("/dir/file.bin"); + let mut file = fs.open(path, &FsOpenOptions::new().write(true).create(true))?; + file.write_all(b"data")?; + + let mut buf = [0u8; 1]; + assert!(file.read_at(&mut buf, 0).is_err()); + Ok(()) + } + + #[test] + fn open_empty_path_returns_invalid_input() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs + .open(Path::new(""), &FsOpenOptions::new().read(true)) + .map(|_| ()) + .unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + Ok(()) + } + + #[test] + fn create_dir_all_empty_path_returns_invalid_input() -> io::Result<()> { + let fs = MemFs::new(); + let err = fs.create_dir_all(Path::new("")).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + Ok(()) + } + + #[test] + fn rename_empty_path_returns_invalid_input() -> io::Result<()> { + let fs = MemFs::new(); + fs.create_dir_all(Path::new("/dir"))?; + let opts = FsOpenOptions::new().write(true).create(true); + fs.open(Path::new("/dir/file"), &opts)?; + + let err = fs.rename(Path::new(""), Path::new("/dir/dst")).unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + + let err = fs + .rename(Path::new("/dir/file"), Path::new("")) + .unwrap_err(); + assert_eq!(err.kind(), io::ErrorKind::InvalidInput); + Ok(()) + } +} diff --git a/src/fs/mod.rs b/src/fs/mod.rs index 12a4cba5b..6f84c53b1 100644 --- a/src/fs/mod.rs +++ b/src/fs/mod.rs @@ -23,11 +23,13 @@ //! - **macOS / BSD**: no batched I/O API exists (`dispatch_io` and `kqueue` //! do not help for storage I/O patterns); [`StdFs`] is the correct choice +mod mem_fs; mod std_fs; #[cfg(all(target_os = "linux", feature = "io-uring"))] mod io_uring_fs; +pub use mem_fs::MemFs; pub use std_fs::StdFs; #[cfg(all(target_os = "linux", feature = "io-uring"))] @@ -272,7 +274,14 @@ pub trait Fs: Send + Sync + 'static { /// Returns an I/O error if the directory cannot be removed. fn remove_dir_all(&self, path: &Path) -> io::Result<()>; - /// Renames a file or directory from `from` to `to`. + /// Renames a file from `from` to `to`. + /// + /// If `to` already exists as a regular file, it is atomically replaced. + /// This is required by [`rewrite_atomic`](crate::file::rewrite_atomic) + /// for crash-safe version pointer updates. + /// + /// lsm-tree only renames files (table files, version pointers), never + /// directories. [`MemFs`] rejects directory renames with `InvalidInput`. /// /// # Errors /// diff --git a/src/table/inner.rs b/src/table/inner.rs index 96074a916..15748e318 100644 --- a/src/table/inner.rs +++ b/src/table/inner.rs @@ -12,6 +12,7 @@ use crate::{ comparator::SharedComparator, encryption::EncryptionProvider, file_accessor::FileAccessor, + fs::Fs, range_tombstone::RangeTombstone, table::{IndexBlock, filter::block::FilterBlock}, tree::inner::TreeId, @@ -29,6 +30,9 @@ pub struct Inner { #[doc(hidden)] pub(crate) file_accessor: FileAccessor, + /// Filesystem backend for file operations (open, remove, etc.). + pub(crate) fs: Arc, + /// Parsed metadata #[doc(hidden)] pub metadata: ParsedMeta, @@ -97,16 +101,24 @@ impl Drop for Inner { if self.is_deleted.load(std::sync::atomic::Ordering::Acquire) { log::trace!("Cleanup deleted table {global_id:?} at {:?}", self.path); - if let Err(e) = std::fs::remove_file(&*self.path) { + // Move the accessor out so any pinned file handle is closed + // before attempting deletion on Windows. + let file_accessor = std::mem::replace(&mut self.file_accessor, FileAccessor::Closed); + + // Evict cached FD from the descriptor table. + file_accessor.as_descriptor_table().inspect(|d| { + d.remove_for_table(&global_id); + }); + + // Drop the accessor (and its Arc) before remove. + drop(file_accessor); + + if let Err(e) = self.fs.remove_file(&self.path) { log::warn!( "Failed to cleanup deleted table {global_id:?} at {:?}: {e:?}", self.path, ); } - - self.file_accessor.as_descriptor_table().inspect(|d| { - d.remove_for_table(&global_id); - }); } } } diff --git a/src/table/mod.rs b/src/table/mod.rs index b19379b26..72c906d8d 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -40,7 +40,7 @@ use crate::{ comparator::SharedComparator, descriptor_table::DescriptorTable, file_accessor::FileAccessor, - fs::FsFile, + fs::{Fs, FsFile, FsOpenOptions}, range_tombstone::RangeTombstone, table::{ block::{BlockType, ParsedItem}, @@ -119,23 +119,14 @@ impl Table { Ok(if let Some(handle) = &self.regions.linked_blob_files { let table_id = self.global_id(); - let (fd, fd_cache_miss) = - if let Some(fd) = self.file_accessor.access_for_table(&table_id) { - (fd, false) - } else { - let fd: Arc = Arc::new(std::fs::File::open(&*self.path)?); - (fd, true) - }; + let (fd, _) = self + .file_accessor + .get_or_open_table(&table_id, &self.path)?; // Read the exact region using pread-style helper let buf = crate::file::read_exact(fd.as_ref(), *handle.offset(), handle.size() as usize)?; - // If we opened the file here, cache the FD for future accesses - if fd_cache_miss { - self.file_accessor.insert_for_table(table_id, fd); - } - // Parse the buffer let mut reader = &buf[..]; let len = reader.read_u32::()?; @@ -489,6 +480,7 @@ impl Table { tree_id: TreeId, cache: Arc, descriptor_table: Option>, + fs: Arc, pin_filter: bool, pin_index: bool, encryption: Option>, @@ -501,7 +493,7 @@ impl Table { use std::sync::atomic::AtomicBool; log::debug!("Recovering table from file {}", file_path.display()); - let mut file = std::fs::File::open(&file_path)?; + let mut file = fs.open(&file_path, &FsOpenOptions::new().read(true))?; let file_path = Arc::new(file_path); #[cfg(feature = "metrics")] @@ -514,7 +506,7 @@ impl Table { log::trace!("Reading meta block, with meta_ptr={:?}", regions.metadata); let metadata = - ParsedMeta::load_with_handle(&file, ®ions.metadata, encryption.as_deref())?; + ParsedMeta::load_with_handle(&*file, ®ions.metadata, encryption.as_deref())?; // Fail-fast: if this table was written with dictionary compression, // verify the caller provided the matching dictionary. Without this @@ -531,10 +523,13 @@ impl Table { } } - let file_handle: Arc = Arc::new(file); + let file_handle: Arc = Arc::from(file); let file_accessor = if let Some(dt) = descriptor_table { - FileAccessor::DescriptorTable(dt) + FileAccessor::DescriptorTable { + table: dt, + fs: fs.clone(), + } } else { FileAccessor::File(file_handle.clone()) }; @@ -705,6 +700,7 @@ impl Table { cache, file_accessor, + fs, block_index: Arc::new(block_index), diff --git a/src/table/multi_writer.rs b/src/table/multi_writer.rs index fd029ea91..6b8e61028 100644 --- a/src/table/multi_writer.rs +++ b/src/table/multi_writer.rs @@ -15,7 +15,7 @@ use std::{path::PathBuf, sync::Arc}; /// /// This results in a sorted "run" of tables pub struct MultiWriter { - fs: Arc, + pub(crate) fs: Arc, pub(crate) base_path: PathBuf, @@ -647,6 +647,7 @@ mod tests { 0, cache.clone(), None, + Arc::new(StdFs), false, false, None, @@ -737,6 +738,7 @@ mod tests { 0, cache.clone(), None, + Arc::new(StdFs), false, false, None, diff --git a/src/table/tests.rs b/src/table/tests.rs index f77321631..fe4467343 100644 --- a/src/table/tests.rs +++ b/src/table/tests.rs @@ -82,6 +82,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, false, None, @@ -99,7 +100,7 @@ fn test_with_table_impl( assert_eq!(0, table.pinned_filter_size(), "should not pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -116,6 +117,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), true, false, None, @@ -133,7 +135,7 @@ fn test_with_table_impl( // assert!(table.pinned_filter_size() > 0, "should pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -150,6 +152,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, true, None, @@ -167,7 +170,7 @@ fn test_with_table_impl( assert_eq!(0, table.pinned_filter_size(), "should not pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -184,6 +187,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -201,7 +205,7 @@ fn test_with_table_impl( // assert!(table.pinned_filter_size() > 0, "should pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -218,6 +222,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), None, + Arc::new(StdFs), true, true, None, @@ -275,6 +280,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, false, None, @@ -291,7 +297,7 @@ fn test_with_table_impl( assert_eq!(0, table.pinned_filter_size(), "should not pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -308,6 +314,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), true, false, None, @@ -324,7 +331,7 @@ fn test_with_table_impl( // assert!(table.pinned_filter_size() > 0, "should pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -341,6 +348,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, true, None, @@ -358,7 +366,7 @@ fn test_with_table_impl( // assert_eq!(0, table.pinned_filter_size(), "should not pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -375,6 +383,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -392,7 +401,7 @@ fn test_with_table_impl( // assert!(table.pinned_filter_size() > 0, "should pin filter"); assert!(matches!( table.file_accessor, - FileAccessor::DescriptorTable(..) + FileAccessor::DescriptorTable { .. } )); f(table)?; @@ -409,6 +418,7 @@ fn test_with_table_impl( 0, Arc::new(Cache::with_capacity_bytes(1_000_000)), None, + Arc::new(StdFs), true, true, None, @@ -1420,6 +1430,7 @@ fn table_read_fuzz_1() -> crate::Result<()> { 0, Arc::new(crate::Cache::with_capacity_bytes(0)), Some(Arc::new(crate::DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -1498,6 +1509,7 @@ fn table_partitioned_index() -> crate::Result<()> { 0, Arc::new(crate::Cache::with_capacity_bytes(0)), Some(Arc::new(crate::DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -1612,6 +1624,7 @@ fn table_global_seqno() -> crate::Result<()> { 0, Arc::new(crate::Cache::with_capacity_bytes(0)), Some(Arc::new(crate::DescriptorTable::new(10))), + Arc::new(StdFs), true, true, None, @@ -1823,6 +1836,7 @@ fn load_block_range_tombstone_metrics() -> crate::Result<()> { // with how filter and index recovery reads are handled. Arc::new(Cache::with_capacity_bytes(10_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, false, None, @@ -1930,6 +1944,7 @@ fn load_block_cache_hit_rejects_wrong_block_type() -> crate::Result<()> { 0, Arc::new(Cache::with_capacity_bytes(10_000_000)), Some(Arc::new(DescriptorTable::new(10))), + Arc::new(StdFs), false, false, None, @@ -2253,6 +2268,7 @@ fn two_level_index_scan_skips_empty_child_partition() -> crate::Result<()> { 0, Arc::new(crate::Cache::with_capacity_bytes(0)), Some(Arc::new(crate::DescriptorTable::new(10))), + Arc::new(StdFs), true, false, None, diff --git a/src/table/util.rs b/src/table/util.rs index cd9f480d3..ca7fda41a 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -5,9 +5,9 @@ use super::{Block, BlockHandle, GlobalTableId}; use crate::{ Cache, CompressionType, KeyRange, Table, encryption::EncryptionProvider, - file_accessor::FileAccessor, fs::FsFile, table::block::BlockType, version::run::Ranged, + file_accessor::FileAccessor, table::block::BlockType, version::run::Ranged, }; -use std::{path::Path, sync::Arc}; +use std::path::Path; #[cfg(feature = "metrics")] use crate::metrics::Metrics; @@ -83,23 +83,20 @@ pub fn load_block( return Ok(block); } - let (fd, fd_cache_miss) = if let Some(cached_fd) = file_accessor.access_for_table(&table_id) { - #[cfg(feature = "metrics")] - metrics.table_file_opened_cached.fetch_add(1, Relaxed); - - (cached_fd, false) - } else { - let file = std::fs::File::open(path)?; + let (fd, cache_event) = file_accessor.get_or_open_table(&table_id, path)?; - #[cfg(feature = "metrics")] - metrics.table_file_opened_uncached.fetch_add(1, Relaxed); + // Only track descriptor-table cache metrics; pinned FDs (None) are not cache events. + #[cfg(feature = "metrics")] + if let Some(hit) = cache_event { + if hit { + metrics.table_file_opened_cached.fetch_add(1, Relaxed); + } else { + metrics.table_file_opened_uncached.fetch_add(1, Relaxed); + } + } - // The if-branch returns Arc from the descriptor - // table, so the else-branch needs an explicit type annotation - // to trigger unsizing coercion. - let fd: Arc = Arc::new(file); - (fd, true) - }; + #[cfg(not(feature = "metrics"))] + let _ = cache_event; let block = Block::from_file( fd.as_ref(), @@ -149,11 +146,6 @@ pub fn load_block( } } - // Cache FD - if fd_cache_miss { - file_accessor.insert_for_table(table_id, fd); - } - cache.insert_block(table_id, handle.offset(), block.clone()); Ok(block) diff --git a/src/tree/ingest.rs b/src/tree/ingest.rs index 467aaf81b..3405d0d4e 100644 --- a/src/tree/ingest.rs +++ b/src/tree/ingest.rs @@ -4,11 +4,12 @@ use super::Tree; use crate::{ - BlobIndirection, SeqNo, UserKey, UserValue, config::FilterPolicyEntry, + BlobIndirection, SeqNo, UserKey, UserValue, config::FilterPolicyEntry, fs::Fs, table::multi_writer::MultiWriter, }; use std::cmp::Ordering; use std::path::PathBuf; +use std::sync::Arc; pub const INITIAL_CANONICAL_LEVEL: usize = 1; @@ -20,6 +21,8 @@ pub const INITIAL_CANONICAL_LEVEL: usize = 1; /// using the same table writer configuration that is used for flush and compaction. pub struct Ingestion<'a> { pub(crate) folder: PathBuf, + /// Level-routed filesystem backend for the target level. + pub(crate) level_fs: Arc, tree: &'a Tree, pub(crate) writer: MultiWriter, seqno: SeqNo, @@ -54,7 +57,7 @@ impl<'a> Ingestion<'a> { tree.table_id_counter.clone(), 64 * 1_024 * 1_024, 6, - level_fs, + level_fs.clone(), )? .set_comparator(tree.config.comparator.clone()) .use_bloom_policy({ @@ -116,6 +119,7 @@ impl<'a> Ingestion<'a> { Ok(Self { folder, + level_fs, tree, writer, seqno: 0, @@ -311,6 +315,7 @@ impl<'a> Ingestion<'a> { self.tree.id, self.tree.config.cache.clone(), self.tree.config.descriptor_table.clone(), + self.level_fs.clone(), false, false, self.tree.config.encryption.clone(), diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 997bea00d..7fa6a80b4 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -392,7 +392,7 @@ impl AbstractTree for Tree { self.table_id_counter.clone(), 64 * 1_024 * 1_024, 0, - level_fs, + level_fs.clone(), )? .set_comparator(self.config.comparator.clone()) .use_data_block_restart_interval(data_block_restart_interval) @@ -453,6 +453,7 @@ impl AbstractTree for Tree { self.id, self.config.cache.clone(), self.config.descriptor_table.clone(), + level_fs.clone(), pin_filter, pin_index, self.config.encryption.clone(), @@ -1200,6 +1201,11 @@ impl Tree { pub(crate) fn open(config: Config) -> crate::Result { log::debug!("Opening LSM-tree at {}", config.path.display()); + // NOTE: try_exists() and recover() below use std::fs directly, bypassing + // the pluggable Fs trait. This means MemFs (and other non-StdFs backends) + // cannot reopen a tree after drop — only new tree creation works. + // Tracked in: #209 + // Check for old version if config.path.join("version").try_exists()? { log::error!( @@ -1444,7 +1450,6 @@ impl Tree { /// Creates a new LSM-tree in a directory. fn create_new(config: Config) -> crate::Result { use crate::file::fsync_directory; - use crate::fs::Fs; let path = config.path.clone(); log::trace!("Creating LSM-tree at {}", path.display()); @@ -1488,7 +1493,7 @@ impl Tree { config: &Config, #[cfg(feature = "metrics")] metrics: &Arc, ) -> crate::Result { - use crate::{TableId, file::fsync_directory, fs::Fs}; + use crate::{TableId, file::fsync_directory}; let tree_path = tree_path.as_ref(); @@ -1598,6 +1603,7 @@ impl Tree { tree_id, config.cache.clone(), config.descriptor_table.clone(), + folder_fs.clone(), pin_filter, pin_index, config.encryption.clone(), @@ -1680,6 +1686,7 @@ impl Tree { &recovery.blob_file_ids, tree_id, config.descriptor_table.as_ref(), + &config.fs, )?; let version = Version::from_recovery(recovery, &tables, &blob_files)?; diff --git a/src/vlog/accessor.rs b/src/vlog/accessor.rs index 7757ebdd7..50a22c34b 100644 --- a/src/vlog/accessor.rs +++ b/src/vlog/accessor.rs @@ -4,11 +4,10 @@ use crate::{ Cache, GlobalTableId, TreeId, UserValue, - fs::FsFile, version::BlobFileList, vlog::{ValueHandle, blob_file::reader::Reader}, }; -use std::{path::Path, sync::Arc}; +use std::path::Path; pub struct Accessor<'a>(&'a BlobFileList); @@ -35,23 +34,13 @@ impl<'a> Accessor<'a> { let bf_id = GlobalTableId::from((tree_id, blob_file.id())); - let (file, fd_cache_miss) = - if let Some(cached_fd) = blob_file.file_accessor().access_for_blob_file(&bf_id) { - (cached_fd, false) - } else { - let file: Arc = Arc::new(std::fs::File::open( - base_path.join(vhandle.blob_file_id.to_string()), - )?); - (file, true) - }; + let (file, _) = blob_file + .file_accessor() + .get_or_open_blob_file(&bf_id, &base_path.join(vhandle.blob_file_id.to_string()))?; let value = Reader::new(blob_file, file.as_ref()).get(key, vhandle)?; cache.insert_blob(tree_id, vhandle, value.clone()); - if fd_cache_miss { - blob_file.file_accessor().insert_for_blob_file(bf_id, file); - } - Ok(Some(value)) } } diff --git a/src/vlog/blob_file/multi_writer.rs b/src/vlog/blob_file/multi_writer.rs index 4af702f76..14fae2475 100644 --- a/src/vlog/blob_file/multi_writer.rs +++ b/src/vlog/blob_file/multi_writer.rs @@ -116,7 +116,7 @@ impl MultiWriter { old_writer, self.passthrough_compression, self.descriptor_table.clone(), - &*self.fs, + &self.fs, )?; self.results.extend(blob_file); @@ -127,7 +127,7 @@ impl MultiWriter { writer: Writer, passthrough_compression: CompressionType, descriptor_table: Option>, - fs: &dyn Fs, + fs: &Arc, ) -> crate::Result> { if writer.item_count > 0 { let blob_file_id = writer.blob_file_id; @@ -143,13 +143,16 @@ impl MultiWriter { let (metadata, checksum) = writer.finish()?; - // NOTE: Read-back uses std::fs::File because FileAccessor/DescriptorTable - // expect Arc. Migrating the read path to Fs is a separate scope. - let file: Arc = Arc::new(std::fs::File::open(&path)?); - let file_accessor = descriptor_table.map_or_else( - || FileAccessor::File(file.clone()), - FileAccessor::DescriptorTable, - ); + let file: Arc = + Arc::from(fs.open(&path, &crate::fs::FsOpenOptions::new().read(true))?); + let file_accessor = if let Some(dt) = descriptor_table { + FileAccessor::DescriptorTable { + table: dt, + fs: fs.clone(), + } + } else { + FileAccessor::File(file.clone()) + }; file_accessor.insert_for_blob_file((tree_id, blob_file_id).into(), file); let blob_file = BlobFile(Arc::new(BlobFileInner { @@ -258,7 +261,7 @@ impl MultiWriter { self.active_writer, self.passthrough_compression, self.descriptor_table.clone(), - &*self.fs, + &self.fs, )?; self.results.extend(blob_file); Ok(self.results) diff --git a/src/vlog/mod.rs b/src/vlog/mod.rs index 383ce0e51..1e4d018f4 100644 --- a/src/vlog/mod.rs +++ b/src/vlog/mod.rs @@ -16,6 +16,7 @@ pub use { use crate::{ Checksum, DescriptorTable, TreeId, file_accessor::FileAccessor, + fs::Fs, vlog::blob_file::{Inner as BlobFileInner, Metadata}, }; use std::{ @@ -28,10 +29,22 @@ pub fn recover_blob_files( ids: &[(BlobFileId, Checksum)], tree_id: TreeId, descriptor_table: Option<&Arc>, + fs: &Arc, ) -> crate::Result<(Vec, Vec)> { - if !folder.try_exists()? { - return Ok((vec![], vec![])); - } + // Recover directly from read_dir; treat NotFound as empty only for + // standard (non-blob) trees where no blob folder is expected. + // If the manifest references blob files (ids non-empty) but the folder + // is missing, that is unrecoverable corruption — fail fast. + let entries = match fs.read_dir(folder) { + Ok(entries) => entries, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + if ids.is_empty() { + return Ok((vec![], vec![])); + } + return Err(crate::Error::Unrecoverable); + } + Err(e) => return Err(e.into()), + }; let cnt = ids.len(); @@ -45,10 +58,13 @@ pub fn recover_blob_files( let mut blob_files = Vec::with_capacity(ids.len()); let mut orphaned_blob_files = vec![]; + // Deferred cache inserts — only committed after all blobs parse + // successfully, so a partial recovery doesn't leak FDs in the + // descriptor table. + let mut pending_cache_inserts = Vec::new(); - for (idx, dirent) in std::fs::read_dir(folder)?.enumerate() { - let dirent = dirent?; - let file_name = dirent.file_name(); + for (idx, dirent) in entries.into_iter().enumerate() { + let file_name = &dirent.file_name; // https://en.wikipedia.org/wiki/.DS_Store if file_name == ".DS_Store" { @@ -56,22 +72,22 @@ pub fn recover_blob_files( } // https://en.wikipedia.org/wiki/AppleSingle_and_AppleDouble_formats - if file_name.to_string_lossy().starts_with("._") { + if file_name.starts_with("._") { continue; } - let blob_file_name = file_name.to_str().ok_or_else(|| { - log::error!("invalid table file name {}", file_name.display()); - crate::Error::Unrecoverable - })?; + // Skip directories before parsing — non-numeric directory names would + // fail the parse and abort recovery. + if dirent.is_dir { + continue; + } - let blob_file_id = blob_file_name.parse::().map_err(|e| { - log::error!("invalid table file name {blob_file_name:?}: {e:?}"); + let blob_file_id = file_name.parse::().map_err(|e| { + log::error!("invalid blob file name {file_name:?}: {e:?}"); crate::Error::Unrecoverable })?; - let blob_file_path = dirent.path(); - assert!(!blob_file_path.is_dir()); + let blob_file_path = &dirent.path; if let Some(&(_, checksum)) = ids.iter().find(|(id, _)| id == &blob_file_id) { log::trace!( @@ -79,10 +95,10 @@ pub fn recover_blob_files( blob_file_path.display(), ); - let file = std::fs::File::open(&blob_file_path)?; + let mut file = fs.open(blob_file_path, &crate::fs::FsOpenOptions::new().read(true))?; let meta = { - let reader = sfa::Reader::new(&blob_file_path)?; + let reader = sfa::Reader::from_reader(&mut file)?; let toc = reader.toc(); let metadata_section = toc.section(b"meta") @@ -94,20 +110,26 @@ pub fn recover_blob_files( let metadata_len = usize::try_from(metadata_section.len()) .map_err(|_| crate::Error::Unrecoverable)?; let metadata_slice = - crate::file::read_exact(&file, metadata_section.pos(), metadata_len)?; + crate::file::read_exact(&*file, metadata_section.pos(), metadata_len)?; Metadata::from_slice(&metadata_slice)? }; + let file: Arc = Arc::from(file); let file_accessor = if let Some(dt) = descriptor_table.cloned() { - FileAccessor::DescriptorTable(dt) + let global_id = (tree_id, blob_file_id).into(); + pending_cache_inserts.push((dt.clone(), global_id, file.clone())); + FileAccessor::DescriptorTable { + table: dt, + fs: fs.clone(), + } } else { - FileAccessor::File(Arc::new(file)) + FileAccessor::File(file) }; blob_files.push(BlobFile(Arc::new(BlobFileInner { id: blob_file_id, - path: blob_file_path, + path: blob_file_path.clone(), meta, is_deleted: AtomicBool::new(false), checksum, @@ -127,6 +149,11 @@ pub fn recover_blob_files( return Err(crate::Error::Unrecoverable); } + // All blobs parsed successfully — commit FDs to the descriptor cache. + for (dt, global_id, file) in pending_cache_inserts { + dt.insert_for_blob_file(global_id, file); + } + log::debug!("Successfully recovered {} blob files", blob_files.len()); Ok((blob_files, orphaned_blob_files)) @@ -136,15 +163,53 @@ pub fn recover_blob_files( pub type BlobFileId = u64; #[cfg(test)] +#[expect(clippy::unwrap_used, reason = "test code")] mod tests { use super::*; use test_log::test; #[test] - fn vlog_recovery_missing_blob_file() { - assert!(matches!( - recover_blob_files(Path::new("."), &[(0, Checksum::from_raw(0))], 0, None), - Err(crate::Error::Unrecoverable), - )); + fn vlog_recovery_missing_blob_file_returns_unrecoverable() { + // Manifest says blob id=0 exists, but the blobs folder is empty. + // Recovery should fail with Unrecoverable because blob_files.len() < ids.len(). + let dir = tempfile::tempdir().unwrap(); + let result = recover_blob_files( + dir.path(), + &[(0, Checksum::from_raw(0))], + 0, + None, + &(Arc::new(crate::fs::StdFs) as Arc), + ); + assert!(matches!(result, Err(crate::Error::Unrecoverable))); + } + + #[test] + fn vlog_recovery_nonexistent_folder_no_ids_returns_empty() { + let dir = tempfile::tempdir().unwrap(); + let missing = dir.path().join("no_such_dir"); + let (blob_files, orphans) = recover_blob_files( + &missing, + &[], + 0, + None, + &(Arc::new(crate::fs::StdFs) as Arc), + ) + .unwrap(); + assert!(blob_files.is_empty()); + assert!(orphans.is_empty()); + } + + #[test] + fn vlog_recovery_nonexistent_folder_with_ids_returns_unrecoverable() { + let dir = tempfile::tempdir().unwrap(); + let missing = dir.path().join("no_such_dir"); + let result = recover_blob_files( + &missing, + &[(0, Checksum::from_raw(0))], + 0, + None, + &(Arc::new(crate::fs::StdFs) as Arc), + ); + assert!(matches!(result, Err(crate::Error::Unrecoverable))); } } diff --git a/tests/mem_fs_tree.rs b/tests/mem_fs_tree.rs new file mode 100644 index 000000000..5896ce081 --- /dev/null +++ b/tests/mem_fs_tree.rs @@ -0,0 +1,162 @@ +use lsm_tree::fs::MemFs; +// Guard trait import required for IterGuardImpl::into_inner() method dispatch. +use lsm_tree::{AbstractTree, Config, Guard, SeqNo, SequenceNumberCounter}; +use test_log::test; + +#[test] +fn open_tree_with_memfs() -> lsm_tree::Result<()> { + let tree = Config::new( + "/virtual/tree", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(MemFs::new()) + .open()?; + + assert!(tree.is_empty(SeqNo::MAX, None)?); + + tree.insert("key1", "value1", 0); + tree.insert("key2", "value2", 1); + tree.insert("key3", "value3", 2); + + assert_eq!(tree.len(SeqNo::MAX, None)?, 3); + + let val = tree.get("key2", SeqNo::MAX)?.expect("key2 should exist"); + assert_eq!(&*val, b"value2"); + + Ok(()) +} + +#[test] +fn memfs_tree_flush_and_read() -> lsm_tree::Result<()> { + let tree = Config::new( + "/virtual/flush", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(MemFs::new()) + .open()?; + + for i in 0u64..100 { + tree.insert(format!("key_{i:05}"), format!("val_{i}"), i); + } + + // Flush memtable to SST (in-memory via MemFs) + tree.flush_active_memtable(0)?; + + // Reads should still work after flush (from SST) + let val = tree + .get("key_00050", SeqNo::MAX)? + .expect("key should exist after flush"); + assert_eq!(&*val, b"val_50"); + + assert_eq!(tree.len(SeqNo::MAX, None)?, 100); + + Ok(()) +} + +#[test] +fn memfs_tree_delete_and_range() -> lsm_tree::Result<()> { + let tree = Config::new( + "/virtual/range", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(MemFs::new()) + .open()?; + + tree.insert("a", "1", 0); + tree.insert("b", "2", 1); + tree.insert("c", "3", 2); + tree.remove("b", 3); + + let items: Vec<_> = tree + .iter(SeqNo::MAX, None) + .map(|guard| guard.into_inner()) + .collect::>>()?; + + assert_eq!(items.len(), 2); + assert_eq!(&*items[0].0, b"a"); + assert_eq!(&*items[0].1, b"1"); + assert_eq!(&*items[1].0, b"c"); + assert_eq!(&*items[1].1, b"3"); + + Ok(()) +} + +#[test] +fn memfs_tree_multiple_flushes() -> lsm_tree::Result<()> { + let tree = Config::new( + "/virtual/multi_flush", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_fs(MemFs::new()) + .open()?; + + // First batch + for i in 0u64..50 { + tree.insert(format!("key_{i:05}"), format!("batch1_{i}"), i); + } + tree.flush_active_memtable(0)?; + + // Second batch — some overwrites, some new + for i in 25u64..75 { + tree.insert(format!("key_{i:05}"), format!("batch2_{i}"), 50 + i); + } + tree.flush_active_memtable(0)?; + + // Verify latest values + let val = tree + .get("key_00030", SeqNo::MAX)? + .expect("overwritten key should exist"); + assert_eq!(&*val, b"batch2_30"); + + let val = tree + .get("key_00010", SeqNo::MAX)? + .expect("original key should exist"); + assert_eq!(&*val, b"batch1_10"); + + assert_eq!(tree.len(SeqNo::MAX, None)?, 75); + + Ok(()) +} + +#[test] +fn memfs_shared_across_trees() -> lsm_tree::Result<()> { + use std::sync::Arc; + + // Exercise Config::with_shared_fs(Arc) for shared backend reuse. + let fs: Arc = Arc::new(MemFs::new()); + + let tree1 = Config::new( + "/virtual/tree1", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_shared_fs(Arc::clone(&fs)) + .open()?; + + let tree2 = Config::new( + "/virtual/tree2", + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_shared_fs(fs) + .open()?; + + tree1.insert("from_tree1", "hello", 0); + tree2.insert("from_tree2", "world", 0); + + assert!(tree1.get("from_tree1", SeqNo::MAX)?.is_some()); + assert!(tree1.get("from_tree2", SeqNo::MAX)?.is_none()); + assert!(tree2.get("from_tree2", SeqNo::MAX)?.is_some()); + assert!(tree2.get("from_tree1", SeqNo::MAX)?.is_none()); + + Ok(()) +} + +// NOTE: Compaction is not yet fully supported with MemFs. +// There are remaining `std::fs` bypass points in the compaction +// finalization path that produce ENOENT when running fully in-memory. +// Tracked as a known limitation — see mem_fs.rs module docs.