Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/blob_tree/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl<'a> BlobIngestion<'a> {

// Perform maintenance on the version history (e.g., clean up old versions).
// We use gc_watermark=0 since ingestion doesn't affect sealed memtables.
if let Err(e) = version_lock.maintenance(&index.config.path, 0) {
if let Err(e) = version_lock.maintenance(&index.config.path, 0, &*index.config.fs) {
log::warn!("Version GC failed: {e:?}");
}

Expand Down
14 changes: 11 additions & 3 deletions src/compaction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,11 @@ fn move_tables(
&*opts.config.fs,
)?;

if let Err(e) = version_history_lock.maintenance(&opts.config.path, opts.mvcc_gc_watermark) {
if let Err(e) = version_history_lock.maintenance(
&opts.config.path,
opts.mvcc_gc_watermark,
&*opts.config.fs,
) {
log::error!("Manifest maintenance failed: {e:?}");
return Err(e);
}
Expand Down Expand Up @@ -619,7 +623,7 @@ fn merge_tables(
.show(payload.table_ids.iter().copied());

version_history_lock
.maintenance(&opts.config.path, opts.mvcc_gc_watermark)
.maintenance(&opts.config.path, opts.mvcc_gc_watermark, &*opts.config.fs)
.inspect_err(|e| {
log::error!("Manifest maintenance failed: {e:?}");
})?;
Expand Down Expand Up @@ -698,7 +702,11 @@ fn drop_tables(
&*opts.config.fs,
)?;

if let Err(e) = version_history_lock.maintenance(&opts.config.path, opts.mvcc_gc_watermark) {
if let Err(e) = version_history_lock.maintenance(
&opts.config.path,
opts.mvcc_gc_watermark,
&*opts.config.fs,
) {
log::error!("Manifest maintenance failed: {e:?}");
return Err(e);
}
Expand Down
8 changes: 2 additions & 6 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,8 @@ pub struct Config {
/// Defaults to [`StdFs`]. Use [`Config::with_fs`] to plug in an
/// alternative backend such as [`MemFs`](crate::fs::MemFs).
///
/// **Note:** `Tree::open` still probes `CURRENT` via `std::fs`, so
/// reopening an existing tree on a non-`StdFs` backend is not yet
/// supported. Fresh tree creation works. Tracked in #209.
/// Both fresh tree creation and reopening (recovery) are supported
/// for any backend that implements [`Fs`].
Comment thread
polaz marked this conversation as resolved.
#[doc(hidden)]
pub fs: Arc<dyn Fs>,

Expand Down Expand Up @@ -451,8 +450,6 @@ impl Config {
/// Defaults to [`StdFs`]. Use [`MemFs`](crate::fs::MemFs) for
/// in-memory trees (testing, ephemeral indexes).
///
/// See [`Config::fs`] for the reopen limitation on non-`StdFs` backends.
///
/// # Example
///
/// ```
Expand Down Expand Up @@ -481,7 +478,6 @@ impl Config {
/// Useful when multiple configs should reuse the same backend
/// instance, including trait objects and backends that are not `Clone`.
///
/// See [`Config::fs`] for the reopen limitation on non-`StdFs` backends.
#[must_use]
pub fn with_shared_fs(mut self, fs: Arc<dyn Fs>) -> Self {
self.fs = fs;
Expand Down
7 changes: 0 additions & 7 deletions src/fs/mem_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@
//!
//! # Known limitations
//!
//! - **Tree reopen**: `Tree::open` checks for the `CURRENT` version file
//! via `Path::try_exists()` (bypasses `Fs`). New trees work correctly
//! (the check returns `false` → `create_new`), but reopening an
//! in-memory tree after drop is not supported.
//! - **Version GC**: Old version file cleanup in `SuperVersions::gc` uses
//! `std::fs` directly — stale version entries accumulate in memory
//! until the `MemFs` is dropped. Acceptable for testing / ephemeral use.
//! - **Compaction**: Some code paths in the compaction finalization still
//! bypass the `Fs` trait. Write + flush + point-read works; compaction
//! may fail with `ENOENT` on virtual paths.
Expand Down
18 changes: 18 additions & 0 deletions src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ pub trait Fs: Send + Sync + 'static {

/// Removes a single file.
///
/// If the file does not exist, implementations must return
/// [`io::ErrorKind::NotFound`]. Callers such as version GC rely on
/// this to perform idempotent deletes.
///
/// # Errors
///
/// Returns an I/O error if the file cannot be removed.
Expand Down Expand Up @@ -313,3 +317,17 @@ pub trait Fs: Send + Sync + 'static {
/// (for example, due to permission issues or transient backend failures).
fn exists(&self, path: &Path) -> io::Result<bool>;
}

/// Opens a section of an sfa archive for buffered reading via the [`Fs`] trait.
///
/// Replaces [`sfa::TocEntry::buf_reader`] which opens files through
/// [`std::fs`] directly, bypassing the pluggable filesystem.
pub(crate) fn open_section_reader(
fs: &dyn Fs,
path: &Path,
section: &sfa::TocEntry,
) -> io::Result<io::BufReader<io::Take<Box<dyn FsFile>>>> {
let mut file = fs.open(path, &FsOpenOptions::new().read(true))?;
file.seek(io::SeekFrom::Start(section.pos()))?;
Ok(io::BufReader::new(file.take(section.len())))
}
120 changes: 93 additions & 27 deletions src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,34 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::{FormatVersion, TreeType, checksum::ChecksumType};
use crate::{
FormatVersion, TreeType,
checksum::ChecksumType,
fs::{Fs, open_section_reader},
};
use byteorder::ReadBytesExt;
use std::{io::Read, path::Path};

pub struct Manifest {
pub version: FormatVersion,
#[expect(
dead_code,
reason = "deserialized from on-disk manifest, retained for validation"
#[cfg_attr(
not(test),
expect(
dead_code,
reason = "deserialized from on-disk manifest, retained for validation; read in tests"
)
)]
pub tree_type: TreeType,
pub level_count: u8,
pub comparator_name: String,
}

impl Manifest {
pub fn decode_from(path: &Path, reader: &sfa::Reader) -> Result<Self, crate::Error> {
pub fn decode_from(
path: &Path,
reader: &sfa::Reader,
fs: &dyn Fs,
) -> Result<Self, crate::Error> {
let toc = reader.toc();

let version = {
Expand All @@ -30,7 +41,7 @@ impl Manifest {
.section(b"format_version")
.expect("format_version section should exist in manifest");

let mut reader = section.buf_reader(path)?;
let mut reader = open_section_reader(fs, path, section)?;
let version = reader.read_u8()?;
FormatVersion::try_from(version).map_err(|()| crate::Error::InvalidVersion(version))?
};
Expand All @@ -44,7 +55,7 @@ impl Manifest {
.section(b"tree_type")
.expect("tree_type section should exist in manifest");

let mut reader = section.buf_reader(path)?;
let mut reader = open_section_reader(fs, path, section)?;
let tree_type = reader.read_u8()?;
tree_type
.try_into()
Expand All @@ -60,7 +71,7 @@ impl Manifest {
.section(b"level_count")
.expect("level_count section should exist in manifest");

let mut reader = section.buf_reader(path)?;
let mut reader = open_section_reader(fs, path, section)?;
reader.read_u8()?
};

Expand All @@ -77,8 +88,7 @@ impl Manifest {
.section(b"filter_hash_type")
.expect("filter_hash_type section should exist in manifest");

section
.buf_reader(path)?
open_section_reader(fs, path, section)?
.bytes()
.collect::<Result<Vec<_>, _>>()?
};
Expand Down Expand Up @@ -108,7 +118,7 @@ impl Manifest {
}

let mut bytes = Vec::new();
section.buf_reader(path)?.read_to_end(&mut bytes)?;
open_section_reader(fs, path, section)?.read_to_end(&mut bytes)?;

String::from_utf8(bytes).map_err(|e| crate::Error::Utf8(e.utf8_error()))?
}
Expand All @@ -127,17 +137,22 @@ impl Manifest {
#[cfg(test)]
mod tests {
use super::*;
use crate::fs::{Fs, FsOpenOptions, StdFs};
use byteorder::WriteBytesExt;
use std::io::Write;

/// Write the mandatory manifest sections (`format_version`, `tree_type`,
/// `level_count`, `filter_hash_type`) into an sfa archive at `path`.
/// If `comparator_name` is `Some`, also writes that section.
/// `level_count`, `filter_hash_type`) into an sfa archive via the [`Fs`]
/// trait. If `comparator_name` is `Some`, also writes that section.
fn write_test_manifest(
path: &std::path::Path,
comparator_name: Option<&str>,
fs: &dyn Fs,
) -> crate::Result<()> {
let file = std::fs::File::create(path)?;
let file = fs.open(
path,
&FsOpenOptions::new().write(true).create(true).truncate(true),
)?;
let mut writer = sfa::Writer::from_writer(std::io::BufWriter::new(file));

writer.start("format_version")?;
Expand All @@ -161,15 +176,25 @@ mod tests {
Ok(())
}

/// Decode a manifest from `path` using the given [`Fs`] backend.
fn decode_manifest(path: &std::path::Path, fs: &dyn Fs) -> crate::Result<Manifest> {
let mut file = fs.open(path, &FsOpenOptions::new().read(true))?;
let reader = sfa::Reader::from_reader(&mut file)?;
Manifest::decode_from(path, &reader, fs)
}

// ------------------------------------------------------------------
// StdFs tests
// ------------------------------------------------------------------

#[test]
fn manifest_without_comparator_name_defaults_to_default() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let path = dir.path().join("manifest");

write_test_manifest(&path, None)?;
write_test_manifest(&path, None, &StdFs)?;

let reader = sfa::Reader::new(&path)?;
let manifest = Manifest::decode_from(&path, &reader)?;
let manifest = decode_manifest(&path, &StdFs)?;
assert_eq!(manifest.comparator_name, "default");
Ok(())
}
Expand All @@ -179,10 +204,9 @@ mod tests {
let dir = tempfile::tempdir()?;
let path = dir.path().join("manifest");

write_test_manifest(&path, Some("u64-big-endian"))?;
write_test_manifest(&path, Some("u64-big-endian"), &StdFs)?;

let reader = sfa::Reader::new(&path)?;
let manifest = Manifest::decode_from(&path, &reader)?;
let manifest = decode_manifest(&path, &StdFs)?;
assert_eq!(manifest.comparator_name, "u64-big-endian");
Ok(())
}
Expand All @@ -193,10 +217,9 @@ mod tests {
let path = dir.path().join("manifest");

let long_name = "x".repeat(300);
write_test_manifest(&path, Some(&long_name))?;
write_test_manifest(&path, Some(&long_name), &StdFs)?;

let reader = sfa::Reader::new(&path)?;
let result = Manifest::decode_from(&path, &reader);
let result = decode_manifest(&path, &StdFs);
assert!(
matches!(result, Err(crate::Error::DecompressedSizeTooLarge { .. })),
"expected DecompressedSizeTooLarge"
Expand All @@ -209,8 +232,13 @@ mod tests {
let dir = tempfile::tempdir()?;
let path = dir.path().join("manifest");

// Write manifest with invalid UTF-8 bytes in comparator_name
let file = std::fs::File::create(&path)?;
// Write manifest with invalid UTF-8 bytes in comparator_name.
// This needs raw Write access — write_test_manifest only handles
// valid strings, so we inline the sfa construction here.
let file = StdFs.open(
&path,
&FsOpenOptions::new().write(true).create(true).truncate(true),
)?;
let mut writer = sfa::Writer::from_writer(std::io::BufWriter::new(file));

writer.start("format_version")?;
Expand All @@ -226,12 +254,50 @@ mod tests {

writer.finish()?;

let reader = sfa::Reader::new(&path)?;
let result = Manifest::decode_from(&path, &reader);
let result = decode_manifest(&path, &StdFs);
assert!(
matches!(result, Err(crate::Error::Utf8(_))),
"expected Utf8 error"
);
Ok(())
}

// ------------------------------------------------------------------
// MemFs tests — verify decode_from works with non-StdFs backends
// ------------------------------------------------------------------

#[test]
fn manifest_memfs_default_comparator() -> crate::Result<()> {
use crate::fs::MemFs;

let fs = MemFs::new();
let dir = std::path::Path::new("/memfs");
fs.create_dir_all(dir)?;
let path = dir.join("manifest_default");

write_test_manifest(&path, None, &fs)?;

let manifest = decode_manifest(&path, &fs)?;
assert_eq!(manifest.comparator_name, "default");
assert_eq!(manifest.level_count, 7);
assert!(matches!(manifest.version, FormatVersion::V4));
assert!(matches!(manifest.tree_type, TreeType::Standard));
Ok(())
}

#[test]
fn manifest_memfs_custom_comparator_round_trips() -> crate::Result<()> {
use crate::fs::MemFs;

let fs = MemFs::new();
let dir = std::path::Path::new("/memfs");
fs.create_dir_all(dir)?;
let path = dir.join("manifest_custom");

write_test_manifest(&path, Some("u64-big-endian"), &fs)?;

let manifest = decode_manifest(&path, &fs)?;
assert_eq!(manifest.comparator_name, "u64-big-endian");
Ok(())
}
}
2 changes: 1 addition & 1 deletion src/tree/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ impl<'a> Ingestion<'a> {

// Perform maintenance on the version history (e.g., clean up old versions).
// We use gc_watermark=0 since ingestion doesn't affect sealed memtables.
if let Err(e) = version_lock.maintenance(&self.tree.config.path, 0) {
if let Err(e) = version_lock.maintenance(&self.tree.config.path, 0, &*self.tree.config.fs) {
log::warn!("Version GC failed: {e:?}");
}

Expand Down
Loading
Loading