Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d051b11
feat(fs): add MemFs in-memory filesystem and pipe Fs through all I/O …
polaz Apr 3, 2026
303361e
build: bump rust-toolchain.toml from 1.94.0 to 1.94.1
polaz Apr 3, 2026
52f701a
fix(fs): retry temp file on AlreadyExists, validate open flags, corre…
polaz Apr 3, 2026
8d6033e
style(file): fix import ordering for rustfmt 1.94.1
polaz Apr 3, 2026
3520e08
fix(fs): return Option<bool> from FileAccessor for precise cache metrics
polaz Apr 3, 2026
afeccbd
fix(test): use byte comparisons in iter test, exercise with_shared_fs
polaz Apr 3, 2026
53d24f0
fix(fs): MemFs::open returns error when opening a directory path
polaz Apr 3, 2026
e8ce4f2
fix(fs): seed root dir, correct Seek arithmetic, validate sync_directory
polaz Apr 3, 2026
f5b1030
fix(fs): MemFile::set_len requires write access, read_at requires rea…
polaz Apr 3, 2026
0f599da
fix(fs): protect root dir invariant, distinguish file-vs-dir in read_…
polaz Apr 3, 2026
74352ee
fix(fs): guard write position overflow, distinguish file-as-parent er…
polaz Apr 4, 2026
f9c2a21
style(fs): multi-line expect attribute for CI rustfmt
polaz Apr 4, 2026
9451982
fix(fs): validate open flags before parent lookup, remove_file reject…
polaz Apr 4, 2026
e0b8973
fix(fs): improve create_dir_all error message for file-path conflicts
polaz Apr 4, 2026
7e0fe3c
fix(fs): delegate fsync_directory to Fs backend on all platforms
polaz Apr 4, 2026
8c33e29
test(fs): add read+append cursor regression test for MemFs
polaz Apr 4, 2026
adad93e
fix(fs): reject truncate+append combo, drop unwrap from Config::with_…
polaz Apr 4, 2026
76f3b67
test(fs): wrong-type error paths and StdFs rename-replace
polaz Apr 4, 2026
cf46569
fix(fs): use map(|_| ()) for FsFile open error unwrap in test
polaz Apr 4, 2026
fee8a89
refactor(fs): extract ensure_parent_dir helper, empty write no-op
polaz Apr 4, 2026
62e1fcf
fix(fs): reject empty paths, permission guard tests, doc clarity
polaz Apr 4, 2026
c20521e
docs(fs): document no-op behavior of FileAccessor remove methods
polaz Apr 4, 2026
1cc66b4
fix(vlog): fail-fast on missing blobs folder when manifest references…
polaz Apr 4, 2026
5662b67
fix(table,vlog): evict FD before remove_file, defer blob cache inserts
polaz Apr 4, 2026
e911497
fix(fs): replace as-usize casts with checked usize::try_from
polaz Apr 4, 2026
093765c
fix(table): close pinned file handle before remove_file in Drop
polaz Apr 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benches/index_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ fn build_table_for_point_read(restart_interval: u8) -> BenchTable {
0,
Arc::new(Cache::with_capacity_bytes(1_000_000)),
Some(Arc::new(DescriptorTable::new(8))),
Arc::new(lsm_tree::fs::StdFs),
false,
false,
None,
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.94.0"
channel = "1.94.1"
Comment thread
polaz marked this conversation as resolved.
components = ["rustfmt", "clippy"]
1 change: 1 addition & 0 deletions src/blob_tree/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl<'a> BlobIngestion<'a> {
index.id,
index.config.cache.clone(),
index.config.descriptor_table.clone(),
self.table.level_fs.clone(),
false,
false,
index.config.encryption.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ pub struct BlobTree {
impl BlobTree {
pub(crate) fn open(config: Config) -> crate::Result<Self> {
use crate::file::{BLOBS_FOLDER, fsync_directory};
use crate::fs::Fs;

let index = crate::Tree::open(config)?;

Expand Down Expand Up @@ -425,7 +424,7 @@ impl AbstractTree for BlobTree {
self.index.table_id_counter.clone(),
64 * 1_024 * 1_024,
0,
level_fs,
level_fs.clone(),
)?
.set_comparator(self.index.config.comparator.clone())
.use_data_block_restart_interval(data_block_restart_interval)
Expand Down Expand Up @@ -545,6 +544,7 @@ impl AbstractTree for BlobTree {
self.index.id,
self.index.config.cache.clone(),
self.index.config.descriptor_table.clone(),
level_fs.clone(),
pin_filter,
pin_index,
self.index.config.encryption.clone(),
Expand Down
2 changes: 2 additions & 0 deletions src/compaction/flavour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ impl StandardCompaction {

fn consume_writer(self, opts: &Options, dst_lvl: usize) -> crate::Result<Vec<Table>> {
let table_base_folder = self.table_writer.base_path.clone();
let level_fs = self.table_writer.fs.clone();

let pin_filter = opts.config.filter_block_pinning_policy.get(dst_lvl);
let pin_index = opts.config.index_block_pinning_policy.get(dst_lvl);
Expand All @@ -386,6 +387,7 @@ impl StandardCompaction {
opts.tree_id,
opts.config.cache.clone(),
opts.config.descriptor_table.clone(),
level_fs.clone(),
pin_filter,
pin_index,
opts.config.encryption.clone(),
Expand Down
51 changes: 41 additions & 10 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,22 +216,17 @@ impl KvSeparationOptions {
}

/// Tree configuration builder
///
/// The generic parameter `F` selects the filesystem backend.
/// It defaults to [`StdFs`], so existing code that writes `Config`
/// without a type parameter continues to work unchanged.
pub struct Config<F: Fs = StdFs> {
pub struct Config {
/// Folder path
#[doc(hidden)]
pub path: PathBuf,

/// Filesystem backend
///
// All Config fields are `#[doc(hidden)] pub` by convention — callers use
// builder methods or `..Default::default()`, not struct literals directly.
// A `with_fs()` builder will be added when call-site refactoring lands.
/// Defaults to [`StdFs`]. Use [`Config::with_fs`] to plug in an
/// alternative backend such as [`MemFs`](crate::fs::MemFs).
#[doc(hidden)]
pub fs: Arc<F>,
pub fs: Arc<dyn Fs>,
Comment thread
polaz marked this conversation as resolved.
Outdated

/// Per-level filesystem routing for tiered storage.
///
Expand Down Expand Up @@ -447,6 +442,42 @@ impl Config {
}
}

/// Sets the filesystem backend.
///
/// Defaults to [`StdFs`]. Use [`MemFs`](crate::fs::MemFs) for
/// in-memory trees (testing, ephemeral indexes).
///
/// # Example
///
/// ```
/// use lsm_tree::{Config, SequenceNumberCounter};
/// use lsm_tree::fs::MemFs;
///
/// let tree = Config::new(
/// "/virtual/tree",
/// SequenceNumberCounter::default(),
/// SequenceNumberCounter::default(),
/// )
/// .with_fs(MemFs::new())
/// .open()
/// .unwrap();
Comment thread
polaz marked this conversation as resolved.
Outdated
/// ```
#[must_use]
pub fn with_fs<F: Fs>(mut self, fs: F) -> Self {
self.fs = Arc::new(fs);
self
}

Comment thread
polaz marked this conversation as resolved.
/// Sets the filesystem backend from an existing shared handle.
///
/// Useful when multiple configs should reuse the same backend
/// instance, including trait objects and backends that are not `Clone`.
#[must_use]
pub fn with_shared_fs(mut self, fs: Arc<dyn Fs>) -> Self {
self.fs = fs;
self
}

/// Opens a tree using the config.
///
/// # Errors
Expand Down Expand Up @@ -582,7 +613,7 @@ mod tests {
}
}

impl<F: Fs> Config<F> {
impl Config {
/// Returns the tables folder path and [`Fs`] backend for the given level.
///
/// If [`level_routes`](Self::level_routes) has an entry covering this
Expand Down
70 changes: 44 additions & 26 deletions src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,40 +47,58 @@ pub fn read_exact(file: &dyn FsFile, offset: u64, size: usize) -> std::io::Resul
Ok(builder.freeze().into())
}

/// Atomically rewrites a file.
/// Atomically rewrites a file via the [`Fs`] trait.
///
/// Writes `content` to a temporary file in the same directory, fsyncs it,
/// then renames over `path`. This ensures readers never see a partial write.
pub fn rewrite_atomic(path: &Path, content: &[u8], fs: &dyn Fs) -> std::io::Result<()> {
use crate::fs::FsOpenOptions;
use std::sync::atomic::{AtomicU64, Ordering};

static TEMP_SEQ: AtomicU64 = AtomicU64::new(0);

#[expect(
clippy::expect_used,
reason = "every file should have a parent directory"
)]
let folder = path.parent().expect("should have a parent");

// NOTE: tempfile crate uses std::fs internally; migrating temp-file
// creation to Fs would require a custom implementation.
let mut temp_file = tempfile::NamedTempFile::new_in(folder)?;
temp_file.write_all(content)?;
temp_file.flush()?;
temp_file.as_file_mut().sync_all()?;
temp_file.persist(path)?;

// Suppress unused-variable warning on Windows where the post-persist
// sync block is skipped (directory fsync is unsupported).
let _ = &fs;

#[cfg(not(target_os = "windows"))]
{
use crate::fs::FsOpenOptions;

let file = fs.open(path, &FsOpenOptions::new().read(true))?;
FsFile::sync_all(&*file)?;

#[expect(
clippy::expect_used,
reason = "files should always have a parent directory"
)]
let folder = path.parent().expect("should have parent folder");
fs.sync_directory(folder)?;
let pid = std::process::id();

// Retry with incrementing seq on AlreadyExists — handles leftover temp
// files from a previous crash (PID can be reused, especially in containers).
let tmp_path = loop {
let seq = TEMP_SEQ.fetch_add(1, Ordering::Relaxed);
let candidate = folder.join(format!(".tmp_{pid}_{seq}"));
Comment thread
polaz marked this conversation as resolved.
match fs.open(
&candidate,
&FsOpenOptions::new().write(true).create_new(true),
) {
Ok(mut file) => {
let write_result = file
.write_all(content)
.and_then(|()| file.flush())
.and_then(|()| FsFile::sync_all(&*file));
if let Err(e) = write_result {
drop(file);
let _ = fs.remove_file(&candidate);
return Err(e);
}
break candidate;
}
// Leftover temp file from a previous crash — retry with next seq.
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
Err(e) => return Err(e),
}
Comment thread
polaz marked this conversation as resolved.
};

// std::fs::rename overwrites existing destinations on all platforms
// (Rust uses MoveFileExW with MOVEFILE_REPLACE_EXISTING on Windows).
if let Err(e) = fs.rename(&tmp_path, path) {
let _ = fs.remove_file(&tmp_path);
return Err(e);
}
fsync_directory(folder, fs)?;

Ok(())
}
Expand Down
86 changes: 64 additions & 22 deletions src/file_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

use crate::GlobalTableId;
use crate::descriptor_table::DescriptorTable;
use crate::fs::FsFile;
use crate::fs::{Fs, FsFile, FsOpenOptions};
use std::path::Path;
use std::sync::Arc;

/// Allows accessing a file (either cached or pinned)
Expand All @@ -15,46 +16,87 @@ pub enum FileAccessor {
/// This is used in case file descriptor cache is `None` (to skip cache lookups)
File(Arc<dyn FsFile>),

/// Access to file descriptor cache
DescriptorTable(Arc<DescriptorTable>),
/// Access to file descriptor cache with [`Fs`]-based fallback for
/// cache misses.
DescriptorTable {
/// The FD cache.
table: Arc<DescriptorTable>,
/// Filesystem backend for opening files on cache miss.
fs: Arc<dyn Fs>,
},
}
Comment on lines 15 to 32
Copy link

Copilot AI Apr 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FileAccessor is a publicly exported enum (#[doc(hidden)] pub mod file_accessor), and this change adds a new Closed variant. Adding variants to a public enum is a semver-breaking API change and also allows external callers to construct FileAccessor::Closed, which is meant to be an internal sentinel (per the doc comment). Consider making FileAccessor pub(crate) (or moving it to a private module), or marking it #[non_exhaustive] and providing a crate-private sentinel type so external code cannot construct the internal-only state.

Copilot uses AI. Check for mistakes.

impl FileAccessor {
#[must_use]
pub fn as_descriptor_table(&self) -> Option<&DescriptorTable> {
match self {
Self::DescriptorTable(d) => Some(d),
Self::DescriptorTable { table, .. } => Some(table),
Self::File(_) => None,
}
}

#[must_use]
pub fn access_for_table(&self, table_id: &GlobalTableId) -> Option<Arc<dyn FsFile>> {
/// Returns a table FD, opening via [`Fs`] on descriptor-table cache miss.
///
/// Returns `(fd, None)` for pinned FDs (no cache involved),
/// `(fd, Some(true))` for descriptor-table cache hit,
/// `(fd, Some(false))` for cache miss (freshly opened and cached).
pub fn get_or_open_table(
&self,
table_id: &GlobalTableId,
path: &Path,
) -> std::io::Result<(Arc<dyn FsFile>, Option<bool>)> {
match self {
Self::File(fd) => Some(fd.clone()),
Self::DescriptorTable(descriptor_table) => descriptor_table.access_for_table(table_id),
}
}

pub fn insert_for_table(&self, table_id: GlobalTableId, fd: Arc<dyn FsFile>) {
if let Self::DescriptorTable(descriptor_table) = self {
descriptor_table.insert_for_table(table_id, fd);
Self::File(fd) => Ok((fd.clone(), None)),
Self::DescriptorTable { table, fs } => {
if let Some(fd) = table.access_for_table(table_id) {
return Ok((fd, Some(true)));
}
let fd: Arc<dyn FsFile> =
Arc::from(fs.open(path, &FsOpenOptions::new().read(true))?);
table.insert_for_table(*table_id, fd.clone());
Ok((fd, Some(false)))
}
}
}

#[must_use]
pub fn access_for_blob_file(&self, table_id: &GlobalTableId) -> Option<Arc<dyn FsFile>> {
/// Returns a cached blob file FD or opens it via [`Fs`] on cache miss.
/// Returns a blob file FD. See [`get_or_open_table`](Self::get_or_open_table) for
/// semantics of the `Option<bool>` cache-hit indicator.
Comment thread
polaz marked this conversation as resolved.
Outdated
pub fn get_or_open_blob_file(
&self,
table_id: &GlobalTableId,
path: &Path,
) -> std::io::Result<(Arc<dyn FsFile>, Option<bool>)> {
match self {
Self::File(fd) => Some(fd.clone()),
Self::DescriptorTable(descriptor_table) => {
descriptor_table.access_for_blob_file(table_id)
Self::File(fd) => Ok((fd.clone(), None)),
Self::DescriptorTable { table, fs } => {
if let Some(fd) = table.access_for_blob_file(table_id) {
return Ok((fd, Some(true)));
}
let fd: Arc<dyn FsFile> =
Arc::from(fs.open(path, &FsOpenOptions::new().read(true))?);
table.insert_for_blob_file(*table_id, fd.clone());
Ok((fd, Some(false)))
}
}
}

/// Pre-populates the blob file FD cache after creating a new blob file.
pub fn insert_for_blob_file(&self, table_id: GlobalTableId, fd: Arc<dyn FsFile>) {
if let Self::DescriptorTable(descriptor_table) = self {
descriptor_table.insert_for_blob_file(table_id, fd);
if let Self::DescriptorTable { table, .. } = self {
table.insert_for_blob_file(table_id, fd);
}
}

pub fn remove_for_table(&self, table_id: &GlobalTableId) {
if let Self::DescriptorTable { table, .. } = self {
table.remove_for_table(table_id);
}
}

pub fn remove_for_blob_file(&self, table_id: &GlobalTableId) {
if let Self::DescriptorTable { table, .. } = self {
table.remove_for_blob_file(table_id);
}
}
Comment thread
polaz marked this conversation as resolved.
}
Expand All @@ -63,7 +105,7 @@ impl std::fmt::Debug for FileAccessor {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::File(_) => write!(f, "FileAccessor::Pinned"),
Self::DescriptorTable(_) => {
Self::DescriptorTable { .. } => {
write!(f, "FileAccessor::Cached")
}
}
Expand Down
Loading
Loading