Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
d051b11
feat(fs): add MemFs in-memory filesystem and pipe Fs through all I/O …
polaz Apr 3, 2026
303361e
build: bump rust-toolchain.toml from 1.94.0 to 1.94.1
polaz Apr 3, 2026
52f701a
fix(fs): retry temp file on AlreadyExists, validate open flags, corre…
polaz Apr 3, 2026
8d6033e
style(file): fix import ordering for rustfmt 1.94.1
polaz Apr 3, 2026
3520e08
fix(fs): return Option<bool> from FileAccessor for precise cache metrics
polaz Apr 3, 2026
afeccbd
fix(test): use byte comparisons in iter test, exercise with_shared_fs
polaz Apr 3, 2026
53d24f0
fix(fs): MemFs::open returns error when opening a directory path
polaz Apr 3, 2026
e8ce4f2
fix(fs): seed root dir, correct Seek arithmetic, validate sync_directory
polaz Apr 3, 2026
f5b1030
fix(fs): MemFile::set_len requires write access, read_at requires rea…
polaz Apr 3, 2026
0f599da
fix(fs): protect root dir invariant, distinguish file-vs-dir in read_…
polaz Apr 3, 2026
74352ee
fix(fs): guard write position overflow, distinguish file-as-parent er…
polaz Apr 4, 2026
f9c2a21
style(fs): multi-line expect attribute for CI rustfmt
polaz Apr 4, 2026
9451982
fix(fs): validate open flags before parent lookup, remove_file reject…
polaz Apr 4, 2026
e0b8973
fix(fs): improve create_dir_all error message for file-path conflicts
polaz Apr 4, 2026
7e0fe3c
fix(fs): delegate fsync_directory to Fs backend on all platforms
polaz Apr 4, 2026
8c33e29
test(fs): add read+append cursor regression test for MemFs
polaz Apr 4, 2026
adad93e
fix(fs): reject truncate+append combo, drop unwrap from Config::with_…
polaz Apr 4, 2026
76f3b67
test(fs): wrong-type error paths and StdFs rename-replace
polaz Apr 4, 2026
cf46569
fix(fs): use map(|_| ()) for FsFile open error unwrap in test
polaz Apr 4, 2026
fee8a89
refactor(fs): extract ensure_parent_dir helper, empty write no-op
polaz Apr 4, 2026
62e1fcf
fix(fs): reject empty paths, permission guard tests, doc clarity
polaz Apr 4, 2026
c20521e
docs(fs): document no-op behavior of FileAccessor remove methods
polaz Apr 4, 2026
1cc66b4
fix(vlog): fail-fast on missing blobs folder when manifest references…
polaz Apr 4, 2026
5662b67
fix(table,vlog): evict FD before remove_file, defer blob cache inserts
polaz Apr 4, 2026
e911497
fix(fs): replace as-usize casts with checked usize::try_from
polaz Apr 4, 2026
093765c
fix(table): close pinned file handle before remove_file in Drop
polaz Apr 4, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions benches/index_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ fn build_table_for_point_read(restart_interval: u8) -> BenchTable {
0,
Arc::new(Cache::with_capacity_bytes(1_000_000)),
Some(Arc::new(DescriptorTable::new(8))),
Arc::new(lsm_tree::fs::StdFs),
false,
false,
None,
Expand Down
2 changes: 1 addition & 1 deletion rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
[toolchain]
channel = "1.94.0"
channel = "1.94.1"
Comment thread
polaz marked this conversation as resolved.
components = ["rustfmt", "clippy"]
1 change: 1 addition & 0 deletions src/blob_tree/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl<'a> BlobIngestion<'a> {
index.id,
index.config.cache.clone(),
index.config.descriptor_table.clone(),
self.table.level_fs.clone(),
false,
false,
index.config.encryption.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,6 @@ pub struct BlobTree {
impl BlobTree {
pub(crate) fn open(config: Config) -> crate::Result<Self> {
use crate::file::{BLOBS_FOLDER, fsync_directory};
use crate::fs::Fs;

let index = crate::Tree::open(config)?;

Expand Down Expand Up @@ -425,7 +424,7 @@ impl AbstractTree for BlobTree {
self.index.table_id_counter.clone(),
64 * 1_024 * 1_024,
0,
level_fs,
level_fs.clone(),
)?
.set_comparator(self.index.config.comparator.clone())
.use_data_block_restart_interval(data_block_restart_interval)
Expand Down Expand Up @@ -545,6 +544,7 @@ impl AbstractTree for BlobTree {
self.index.id,
self.index.config.cache.clone(),
self.index.config.descriptor_table.clone(),
level_fs.clone(),
pin_filter,
pin_index,
self.index.config.encryption.clone(),
Expand Down
2 changes: 2 additions & 0 deletions src/compaction/flavour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ impl StandardCompaction {

fn consume_writer(self, opts: &Options, dst_lvl: usize) -> crate::Result<Vec<Table>> {
let table_base_folder = self.table_writer.base_path.clone();
let level_fs = self.table_writer.fs.clone();

let pin_filter = opts.config.filter_block_pinning_policy.get(dst_lvl);
let pin_index = opts.config.index_block_pinning_policy.get(dst_lvl);
Expand All @@ -386,6 +387,7 @@ impl StandardCompaction {
opts.tree_id,
opts.config.cache.clone(),
opts.config.descriptor_table.clone(),
level_fs.clone(),
pin_filter,
pin_index,
opts.config.encryption.clone(),
Expand Down
60 changes: 48 additions & 12 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,22 +216,17 @@ impl KvSeparationOptions {
}

/// Tree configuration builder
///
/// The generic parameter `F` selects the filesystem backend.
/// It defaults to [`StdFs`], so existing code that writes `Config`
/// without a type parameter continues to work unchanged.
pub struct Config<F: Fs = StdFs> {
pub struct Config {
/// Folder path
#[doc(hidden)]
pub path: PathBuf,

/// Filesystem backend
/// Default filesystem backend for levels without an explicit route.
///
// All Config fields are `#[doc(hidden)] pub` by convention — callers use
// builder methods or `..Default::default()`, not struct literals directly.
// A `with_fs()` builder will be added when call-site refactoring lands.
/// Defaults to [`StdFs`]. Use [`Config::with_fs`] to plug in an
/// alternative backend such as [`MemFs`](crate::fs::MemFs).
#[doc(hidden)]
pub fs: Arc<F>,
pub fs: Arc<dyn Fs>,

/// Per-level filesystem routing for tiered storage.
///
Expand Down Expand Up @@ -447,6 +442,44 @@ impl Config {
}
}

/// Sets the default filesystem backend used for levels without an explicit route.
///
/// Defaults to [`StdFs`]. Use [`MemFs`](crate::fs::MemFs) for
/// in-memory trees (testing, ephemeral indexes).
///
/// # Example
///
/// ```
/// # fn main() -> lsm_tree::Result<()> {
/// use lsm_tree::{Config, SequenceNumberCounter};
/// use lsm_tree::fs::MemFs;
///
/// let tree = Config::new(
/// "/virtual/tree",
/// SequenceNumberCounter::default(),
/// SequenceNumberCounter::default(),
/// )
/// .with_fs(MemFs::new())
/// .open()?;
/// # Ok(())
/// # }
/// ```
#[must_use]
pub fn with_fs<F: Fs>(mut self, fs: F) -> Self {
self.fs = Arc::new(fs);
self
}

Comment thread
polaz marked this conversation as resolved.
/// Sets the default filesystem backend from an existing shared handle.
///
/// Useful when multiple configs should reuse the same backend
/// instance, including trait objects and backends that are not `Clone`.
#[must_use]
pub fn with_shared_fs(mut self, fs: Arc<dyn Fs>) -> Self {
self.fs = fs;
self
}

/// Opens a tree using the config.
///
/// # Errors
Expand Down Expand Up @@ -582,7 +615,7 @@ mod tests {
}
}

impl<F: Fs> Config<F> {
impl Config {
/// Returns the tables folder path and [`Fs`] backend for the given level.
///
/// If [`level_routes`](Self::level_routes) has an entry covering this
Expand Down Expand Up @@ -612,7 +645,10 @@ impl<F: Fs> Config<F> {
for route in routes {
let folder = route.path.join(TABLES_FOLDER);
// Dedup by path: scanning the same directory twice would cause
// already-recovered tables to be classified as orphans and deleted.
// already-recovered tables to be classified as orphans and
// deleted. Routing the same path through different Fs backends
// is a configuration error (level_routes validation in
// Config::level_routes rejects overlapping ranges).
if !folders.iter().any(|(p, _)| *p == folder) {
folders.push((folder, route.fs.clone()));
}
Expand Down
112 changes: 79 additions & 33 deletions src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,55 +47,71 @@ pub fn read_exact(file: &dyn FsFile, offset: u64, size: usize) -> std::io::Resul
Ok(builder.freeze().into())
}

/// Atomically rewrites a file.
/// Atomically rewrites a file via the [`Fs`] trait.
///
/// Writes `content` to a temporary file in the same directory, fsyncs it,
/// then renames over `path`. This ensures readers never see a partial write.
pub fn rewrite_atomic(path: &Path, content: &[u8], fs: &dyn Fs) -> std::io::Result<()> {
use crate::fs::FsOpenOptions;
use std::sync::atomic::{AtomicU64, Ordering};

static TEMP_SEQ: AtomicU64 = AtomicU64::new(0);

#[expect(
clippy::expect_used,
reason = "every file should have a parent directory"
)]
let folder = path.parent().expect("should have a parent");

// NOTE: tempfile crate uses std::fs internally; migrating temp-file
// creation to Fs would require a custom implementation.
let mut temp_file = tempfile::NamedTempFile::new_in(folder)?;
temp_file.write_all(content)?;
temp_file.flush()?;
temp_file.as_file_mut().sync_all()?;
temp_file.persist(path)?;

// Suppress unused-variable warning on Windows where the post-persist
// sync block is skipped (directory fsync is unsupported).
let _ = &fs;

#[cfg(not(target_os = "windows"))]
{
use crate::fs::FsOpenOptions;

let file = fs.open(path, &FsOpenOptions::new().read(true))?;
FsFile::sync_all(&*file)?;

#[expect(
clippy::expect_used,
reason = "files should always have a parent directory"
)]
let folder = path.parent().expect("should have parent folder");
fs.sync_directory(folder)?;
let pid = std::process::id();

// Retry with incrementing seq on AlreadyExists — handles leftover temp
// files from a previous crash (PID can be reused, especially in containers).
let tmp_path = loop {
let seq = TEMP_SEQ.fetch_add(1, Ordering::Relaxed);
let candidate = folder.join(format!(".tmp_{pid}_{seq}"));
Comment thread
polaz marked this conversation as resolved.
match fs.open(
&candidate,
&FsOpenOptions::new().write(true).create_new(true),
) {
Ok(mut file) => {
let write_result = file
.write_all(content)
.and_then(|()| file.flush())
.and_then(|()| FsFile::sync_all(&*file));
if let Err(e) = write_result {
drop(file);
let _ = fs.remove_file(&candidate);
return Err(e);
}
break candidate;
}
// Leftover temp file from a previous crash — retry with next seq.
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {}
Err(e) => return Err(e),
}
Comment thread
polaz marked this conversation as resolved.
};

// std::fs::rename overwrites existing destinations on all platforms
// (Rust uses MoveFileExW with MOVEFILE_REPLACE_EXISTING on Windows).
if let Err(e) = fs.rename(&tmp_path, path) {
let _ = fs.remove_file(&tmp_path);
return Err(e);
}
fsync_directory(folder, fs)?;

Ok(())
}

#[cfg(not(target_os = "windows"))]
/// Delegates directory sync to the backend.
///
/// On Windows, `StdFs::sync_directory` already returns `Ok(())` (directory
/// fsync is unsupported), but non-`StdFs` backends (e.g., `MemFs`) may use
/// this call for path validation. Always delegate rather than short-circuiting.
pub fn fsync_directory(path: &Path, fs: &dyn Fs) -> std::io::Result<()> {
fs.sync_directory(path)
}

#[cfg(target_os = "windows")]
pub fn fsync_directory(_path: &Path, _fs: &dyn Fs) -> std::io::Result<()> {
// Cannot fsync directory on Windows
Ok(())
}

#[cfg(test)]
#[allow(
clippy::unwrap_used,
Expand Down Expand Up @@ -144,4 +160,34 @@ mod tests {

Ok(())
}

/// Verifies that `StdFs::rename` atomically replaces an existing
/// destination file — the contract required by `rewrite_atomic`.
#[test]
fn std_fs_rename_replaces_existing_file() -> crate::Result<()> {
use crate::fs::{Fs, FsOpenOptions};

let dir = tempfile::tempdir()?;
let src = dir.path().join("src.txt");
let dst = dir.path().join("dst.txt");

// Create both files via Fs trait.
let opts = FsOpenOptions::new().write(true).create(true);
let mut f = StdFs.open(&src, &opts)?;
f.write_all(b"new")?;
drop(f);

let mut f = StdFs.open(&dst, &opts)?;
f.write_all(b"old")?;
drop(f);

StdFs.rename(&src, &dst)?;

// dst now has src content, src is gone.
let content = std::fs::read_to_string(&dst)?;
assert_eq!("new", content);
assert!(!src.exists());

Ok(())
}
}
Loading
Loading