Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
1941a12
feat(fs): add MemFs in-memory filesystem for testing and ephemeral trees
polaz Apr 3, 2026
043be5a
fix(fs): wire level-routed Fs to Table::recover and harden MemFs sema…
polaz Apr 3, 2026
b6ec642
fix(fs): fix writable flag for existing files, temp file cleanup, vlo…
polaz Apr 3, 2026
bc0eac1
refactor(vlog): reuse single Fs-opened handle for blob recovery sfa r…
polaz Apr 3, 2026
05e2f30
fix(vlog): skip directories in blob recovery instead of panicking
polaz Apr 3, 2026
531749f
fix(vlog): correct misleading log message in blob file recovery
polaz Apr 3, 2026
e05476c
fix(vlog): eliminate TOCTOU in blob recovery, skip dirs before parse
polaz Apr 3, 2026
cd87d76
docs(fs): add code comments for non-obvious MemFs design decisions
polaz Apr 3, 2026
95ec4e8
style: reformat imports for rustfmt 1.94.1
polaz Apr 3, 2026
9cf15f8
fix(fs): MemFs::remove_dir_all rejects file paths with InvalidInput
polaz Apr 3, 2026
f0eb42e
fix(fs): MemFs::rename rejects directory source and destination paths
polaz Apr 3, 2026
2cb7d82
build: bump rust-toolchain.toml to 1.94.1 to match CI rustfmt
polaz Apr 3, 2026
c5bb6e1
fix(fs): harden MemFs open/create_dir_all/rename against dir-file con…
polaz Apr 3, 2026
54e990a
style(vlog): reformat imports for rustfmt 1.94.1 across all vlog files
polaz Apr 3, 2026
292551a
fix(fs): document Fs::rename atomicity, harden MemFs create_dir_all a…
polaz Apr 3, 2026
5c629a3
fix(fs): MemFs::open rejects truncate/create without write access
polaz Apr 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benches/index_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ fn build_table_for_point_read(restart_interval: u8) -> BenchTable {
0,
Arc::new(Cache::with_capacity_bytes(1_000_000)),
Some(Arc::new(DescriptorTable::new(8))),
Arc::new(lsm_tree::fs::StdFs),
false,
false,
None,
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.94.0"
channel = "1.94.1"
components = ["rustfmt", "clippy"]
1 change: 1 addition & 0 deletions src/blob_tree/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl<'a> BlobIngestion<'a> {
index.id,
index.config.cache.clone(),
index.config.descriptor_table.clone(),
self.table.level_fs.clone(),
false,
false,
index.config.encryption.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ pub struct BlobTree {
impl BlobTree {
pub(crate) fn open(config: Config) -> crate::Result<Self> {
use crate::file::{BLOBS_FOLDER, fsync_directory};
use crate::fs::Fs;

let index = crate::Tree::open(config)?;

Expand Down Expand Up @@ -425,7 +424,7 @@ impl AbstractTree for BlobTree {
self.index.table_id_counter.clone(),
64 * 1_024 * 1_024,
0,
level_fs,
level_fs.clone(),
)?
.set_comparator(self.index.config.comparator.clone())
.use_data_block_restart_interval(data_block_restart_interval)
Expand Down Expand Up @@ -545,6 +544,7 @@ impl AbstractTree for BlobTree {
self.index.id,
self.index.config.cache.clone(),
self.index.config.descriptor_table.clone(),
level_fs.clone(),
pin_filter,
pin_index,
self.index.config.encryption.clone(),
Expand Down
2 changes: 2 additions & 0 deletions src/compaction/flavour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ impl StandardCompaction {

fn consume_writer(self, opts: &Options, dst_lvl: usize) -> crate::Result<Vec<Table>> {
let table_base_folder = self.table_writer.base_path.clone();
let level_fs = self.table_writer.fs.clone();

let pin_filter = opts.config.filter_block_pinning_policy.get(dst_lvl);
let pin_index = opts.config.index_block_pinning_policy.get(dst_lvl);
Expand All @@ -386,6 +387,7 @@ impl StandardCompaction {
opts.tree_id,
opts.config.cache.clone(),
opts.config.descriptor_table.clone(),
level_fs.clone(),
pin_filter,
pin_index,
opts.config.encryption.clone(),
Expand Down
41 changes: 31 additions & 10 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,22 +216,17 @@ impl KvSeparationOptions {
}

/// Tree configuration builder
///
/// The generic parameter `F` selects the filesystem backend.
/// It defaults to [`StdFs`], so existing code that writes `Config`
/// without a type parameter continues to work unchanged.
pub struct Config<F: Fs = StdFs> {
pub struct Config {
/// Folder path
#[doc(hidden)]
pub path: PathBuf,

/// Filesystem backend
///
// All Config fields are `#[doc(hidden)] pub` by convention — callers use
// builder methods or `..Default::default()`, not struct literals directly.
// A `with_fs()` builder will be added when call-site refactoring lands.
/// Defaults to [`StdFs`]. Use [`Config::with_fs`] to plug in an
/// alternative backend such as [`MemFs`](crate::fs::MemFs).
#[doc(hidden)]
pub fs: Arc<F>,
pub fs: Arc<dyn Fs>,
Comment thread
polaz marked this conversation as resolved.

/// Per-level filesystem routing for tiered storage.
///
Expand Down Expand Up @@ -447,6 +442,32 @@ impl Config {
}
}

/// Sets the filesystem backend.
///
/// Defaults to [`StdFs`]. Use [`MemFs`](crate::fs::MemFs) for
/// in-memory trees (testing, ephemeral indexes).
///
/// # Example
///
/// ```
/// use lsm_tree::{Config, SequenceNumberCounter};
/// use lsm_tree::fs::MemFs;
///
/// let tree = Config::new(
/// "/virtual/tree",
/// SequenceNumberCounter::default(),
/// SequenceNumberCounter::default(),
/// )
/// .with_fs(MemFs::new())
/// .open()
/// .unwrap();
/// ```
#[must_use]
pub fn with_fs<F: Fs>(mut self, fs: F) -> Self {
self.fs = Arc::new(fs);
self
}

/// Opens a tree using the config.
///
/// # Errors
Expand Down Expand Up @@ -582,7 +603,7 @@ mod tests {
}
}

impl<F: Fs> Config<F> {
impl Config {
/// Returns the tables folder path and [`Fs`] backend for the given level.
///
/// If [`level_routes`](Self::level_routes) has an entry covering this
Expand Down
62 changes: 38 additions & 24 deletions src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,40 +47,54 @@ pub fn read_exact(file: &dyn FsFile, offset: u64, size: usize) -> std::io::Resul
Ok(builder.freeze().into())
}

/// Atomically rewrites a file.
/// Atomically rewrites a file via the [`Fs`] trait.
///
/// Writes `content` to a temporary file in the same directory, fsyncs it,
/// then renames over `path`. This ensures readers never see a partial write.
pub fn rewrite_atomic(path: &Path, content: &[u8], fs: &dyn Fs) -> std::io::Result<()> {
use crate::fs::FsOpenOptions;
use std::sync::atomic::{AtomicU64, Ordering};

static TEMP_SEQ: AtomicU64 = AtomicU64::new(0);

#[expect(
clippy::expect_used,
reason = "every file should have a parent directory"
)]
let folder = path.parent().expect("should have a parent");

// NOTE: tempfile crate uses std::fs internally; migrating temp-file
// creation to Fs would require a custom implementation.
let mut temp_file = tempfile::NamedTempFile::new_in(folder)?;
temp_file.write_all(content)?;
temp_file.flush()?;
temp_file.as_file_mut().sync_all()?;
temp_file.persist(path)?;

// Suppress unused-variable warning on Windows where the post-persist
// sync block is skipped (directory fsync is unsupported).
let _ = &fs;

#[cfg(not(target_os = "windows"))]
{
use crate::fs::FsOpenOptions;

let file = fs.open(path, &FsOpenOptions::new().read(true))?;
// PID + monotonic seq gives uniqueness within a process and across
// concurrent processes. A crash-then-PID-reuse collision is theoretically
// possible but vanishingly unlikely (requires exact PID reuse AND seq
// counter restart to same value). lsm-tree uses exclusive file locking
// so the same data directory is never written by two processes.
let seq = TEMP_SEQ.fetch_add(1, Ordering::Relaxed);
let pid = std::process::id();
let tmp_path = folder.join(format!(".tmp_{pid}_{seq}"));

let result = (|| -> std::io::Result<()> {
let mut file = fs.open(
Comment thread
polaz marked this conversation as resolved.
&tmp_path,
&FsOpenOptions::new().write(true).create_new(true),
)?;
Comment thread
polaz marked this conversation as resolved.
Comment thread
polaz marked this conversation as resolved.
Comment thread
polaz marked this conversation as resolved.
file.write_all(content)?;
file.flush()?;
FsFile::sync_all(&*file)?;
drop(file);
// std::fs::rename overwrites existing destinations on all platforms
// (Rust uses MoveFileExW with MOVEFILE_REPLACE_EXISTING on Windows).
fs.rename(&tmp_path, path)?;
Comment thread
polaz marked this conversation as resolved.
Ok(())
Comment thread
polaz marked this conversation as resolved.
})();

#[expect(
clippy::expect_used,
reason = "files should always have a parent directory"
)]
let folder = path.parent().expect("should have parent folder");
fs.sync_directory(folder)?;
if result.is_err() {
// Best-effort cleanup of the temp file on any failure path.
// Safe to call even if fs.open() failed (file never created) —
// remove_file will return NotFound which we ignore.
let _ = fs.remove_file(&tmp_path);
}
Comment thread
polaz marked this conversation as resolved.
result?;
fsync_directory(folder, fs)?;

Ok(())
Comment thread
polaz marked this conversation as resolved.
}
Expand Down
86 changes: 64 additions & 22 deletions src/file_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

use crate::GlobalTableId;
use crate::descriptor_table::DescriptorTable;
use crate::fs::FsFile;
use crate::fs::{Fs, FsFile, FsOpenOptions};
use std::path::Path;
use std::sync::Arc;

/// Allows accessing a file (either cached or pinned)
Expand All @@ -15,46 +16,87 @@ pub enum FileAccessor {
/// This is used in case file descriptor cache is `None` (to skip cache lookups)
File(Arc<dyn FsFile>),

/// Access to file descriptor cache
DescriptorTable(Arc<DescriptorTable>),
/// Access to file descriptor cache with [`Fs`]-based fallback for
/// cache misses.
DescriptorTable {
/// The FD cache.
table: Arc<DescriptorTable>,
/// Filesystem backend for opening files on cache miss.
fs: Arc<dyn Fs>,
},
}

impl FileAccessor {
#[must_use]
pub fn as_descriptor_table(&self) -> Option<&DescriptorTable> {
match self {
Self::DescriptorTable(d) => Some(d),
Self::DescriptorTable { table, .. } => Some(table),
Self::File(_) => None,
}
}

#[must_use]
pub fn access_for_table(&self, table_id: &GlobalTableId) -> Option<Arc<dyn FsFile>> {
/// Returns a cached table FD or opens the file via [`Fs`] on cache miss.
///
/// The returned `bool` indicates whether the file descriptor was already
/// cached (`true`) or freshly opened (`false`).
pub fn get_or_open_table(
&self,
table_id: &GlobalTableId,
path: &Path,
) -> std::io::Result<(Arc<dyn FsFile>, bool)> {
match self {
Self::File(fd) => Some(fd.clone()),
Self::DescriptorTable(descriptor_table) => descriptor_table.access_for_table(table_id),
}
}

pub fn insert_for_table(&self, table_id: GlobalTableId, fd: Arc<dyn FsFile>) {
if let Self::DescriptorTable(descriptor_table) = self {
descriptor_table.insert_for_table(table_id, fd);
Self::File(fd) => Ok((fd.clone(), true)),
Self::DescriptorTable { table, fs } => {
if let Some(fd) = table.access_for_table(table_id) {
return Ok((fd, true));
}
let fd: Arc<dyn FsFile> =
Arc::from(fs.open(path, &FsOpenOptions::new().read(true))?);
table.insert_for_table(*table_id, fd.clone());
Ok((fd, false))
}
}
}

#[must_use]
pub fn access_for_blob_file(&self, table_id: &GlobalTableId) -> Option<Arc<dyn FsFile>> {
/// Returns a cached blob file FD or opens it via [`Fs`] on cache miss.
///
/// The returned `bool` indicates whether the file descriptor was already
/// cached (`true`) or freshly opened (`false`).
pub fn get_or_open_blob_file(
&self,
table_id: &GlobalTableId,
path: &Path,
) -> std::io::Result<(Arc<dyn FsFile>, bool)> {
match self {
Self::File(fd) => Some(fd.clone()),
Self::DescriptorTable(descriptor_table) => {
descriptor_table.access_for_blob_file(table_id)
Self::File(fd) => Ok((fd.clone(), true)),
Self::DescriptorTable { table, fs } => {
if let Some(fd) = table.access_for_blob_file(table_id) {
return Ok((fd, true));
}
let fd: Arc<dyn FsFile> =
Arc::from(fs.open(path, &FsOpenOptions::new().read(true))?);
table.insert_for_blob_file(*table_id, fd.clone());
Ok((fd, false))
}
}
}

/// Pre-populates the blob file FD cache after creating a new blob file.
pub fn insert_for_blob_file(&self, table_id: GlobalTableId, fd: Arc<dyn FsFile>) {
if let Self::DescriptorTable(descriptor_table) = self {
descriptor_table.insert_for_blob_file(table_id, fd);
if let Self::DescriptorTable { table, .. } = self {
table.insert_for_blob_file(table_id, fd);
}
}

pub fn remove_for_table(&self, table_id: &GlobalTableId) {
if let Self::DescriptorTable { table, .. } = self {
table.remove_for_table(table_id);
}
}

pub fn remove_for_blob_file(&self, table_id: &GlobalTableId) {
if let Self::DescriptorTable { table, .. } = self {
table.remove_for_blob_file(table_id);
}
}
}
Expand All @@ -63,7 +105,7 @@ impl std::fmt::Debug for FileAccessor {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self {
Self::File(_) => write!(f, "FileAccessor::Pinned"),
Self::DescriptorTable(_) => {
Self::DescriptorTable { .. } => {
write!(f, "FileAccessor::Cached")
}
}
Expand Down
Loading
Loading