Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/blob_tree/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ impl<'a> BlobIngestion<'a> {

// Perform maintenance on the version history (e.g., clean up old versions).
// We use gc_watermark=0 since ingestion doesn't affect sealed memtables.
if let Err(e) = version_lock.maintenance(&index.config.path, 0) {
if let Err(e) = version_lock.maintenance(&index.config.path, 0, &*index.config.fs) {
log::warn!("Version GC failed: {e:?}");
}

Expand Down
14 changes: 11 additions & 3 deletions src/compaction/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,11 @@ fn move_tables(
&*opts.config.fs,
)?;

if let Err(e) = version_history_lock.maintenance(&opts.config.path, opts.mvcc_gc_watermark) {
if let Err(e) = version_history_lock.maintenance(
&opts.config.path,
opts.mvcc_gc_watermark,
&*opts.config.fs,
) {
log::error!("Manifest maintenance failed: {e:?}");
return Err(e);
}
Expand Down Expand Up @@ -619,7 +623,7 @@ fn merge_tables(
.show(payload.table_ids.iter().copied());

version_history_lock
.maintenance(&opts.config.path, opts.mvcc_gc_watermark)
.maintenance(&opts.config.path, opts.mvcc_gc_watermark, &*opts.config.fs)
.inspect_err(|e| {
log::error!("Manifest maintenance failed: {e:?}");
})?;
Expand Down Expand Up @@ -698,7 +702,11 @@ fn drop_tables(
&*opts.config.fs,
)?;

if let Err(e) = version_history_lock.maintenance(&opts.config.path, opts.mvcc_gc_watermark) {
if let Err(e) = version_history_lock.maintenance(
&opts.config.path,
opts.mvcc_gc_watermark,
&*opts.config.fs,
) {
log::error!("Manifest maintenance failed: {e:?}");
return Err(e);
}
Expand Down
14 changes: 14 additions & 0 deletions src/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,17 @@ pub trait Fs: Send + Sync + 'static {
/// (for example, due to permission issues or transient backend failures).
fn exists(&self, path: &Path) -> io::Result<bool>;
}

/// Opens a section of an sfa archive for buffered reading via the [`Fs`] trait.
///
/// Replaces [`sfa::TocEntry::buf_reader`] which opens files through
/// [`std::fs`] directly, bypassing the pluggable filesystem.
pub(crate) fn open_section_reader(
fs: &dyn Fs,
path: &Path,
section: &sfa::TocEntry,
) -> io::Result<io::BufReader<io::Take<Box<dyn FsFile>>>> {
let mut file = fs.open(path, &FsOpenOptions::new().read(true))?;
file.seek(io::SeekFrom::Start(section.pos()))?;
Ok(io::BufReader::new(file.take(section.len())))
}
44 changes: 28 additions & 16 deletions src/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::{FormatVersion, TreeType, checksum::ChecksumType};
use crate::{
FormatVersion, TreeType,
checksum::ChecksumType,
fs::{Fs, open_section_reader},
};
use byteorder::ReadBytesExt;
use std::{io::Read, path::Path};

Expand All @@ -18,7 +22,11 @@ pub struct Manifest {
}

impl Manifest {
pub fn decode_from(path: &Path, reader: &sfa::Reader) -> Result<Self, crate::Error> {
pub fn decode_from(
path: &Path,
reader: &sfa::Reader,
fs: &dyn Fs,
) -> Result<Self, crate::Error> {
let toc = reader.toc();

let version = {
Expand All @@ -30,7 +38,7 @@ impl Manifest {
.section(b"format_version")
.expect("format_version section should exist in manifest");

let mut reader = section.buf_reader(path)?;
let mut reader = open_section_reader(fs, path, section)?;
let version = reader.read_u8()?;
FormatVersion::try_from(version).map_err(|()| crate::Error::InvalidVersion(version))?
};
Expand All @@ -44,7 +52,7 @@ impl Manifest {
.section(b"tree_type")
.expect("tree_type section should exist in manifest");

let mut reader = section.buf_reader(path)?;
let mut reader = open_section_reader(fs, path, section)?;
let tree_type = reader.read_u8()?;
tree_type
.try_into()
Expand All @@ -60,7 +68,7 @@ impl Manifest {
.section(b"level_count")
.expect("level_count section should exist in manifest");

let mut reader = section.buf_reader(path)?;
let mut reader = open_section_reader(fs, path, section)?;
reader.read_u8()?
};

Expand All @@ -77,8 +85,7 @@ impl Manifest {
.section(b"filter_hash_type")
.expect("filter_hash_type section should exist in manifest");

section
.buf_reader(path)?
open_section_reader(fs, path, section)?
.bytes()
.collect::<Result<Vec<_>, _>>()?
};
Expand Down Expand Up @@ -108,7 +115,7 @@ impl Manifest {
}

let mut bytes = Vec::new();
section.buf_reader(path)?.read_to_end(&mut bytes)?;
open_section_reader(fs, path, section)?.read_to_end(&mut bytes)?;

String::from_utf8(bytes).map_err(|e| crate::Error::Utf8(e.utf8_error()))?
}
Expand All @@ -127,6 +134,7 @@ impl Manifest {
#[cfg(test)]
mod tests {
use super::*;
use crate::fs::StdFs;
use byteorder::WriteBytesExt;
use std::io::Write;

Expand Down Expand Up @@ -168,8 +176,9 @@ mod tests {

write_test_manifest(&path, None)?;

let reader = sfa::Reader::new(&path)?;
let manifest = Manifest::decode_from(&path, &reader)?;
let mut file = StdFs.open(&path, &crate::fs::FsOpenOptions::new().read(true))?;
let reader = sfa::Reader::from_reader(&mut file)?;
let manifest = Manifest::decode_from(&path, &reader, &StdFs)?;
assert_eq!(manifest.comparator_name, "default");
Comment thread
polaz marked this conversation as resolved.
Ok(())
}
Expand All @@ -181,8 +190,9 @@ mod tests {

write_test_manifest(&path, Some("u64-big-endian"))?;

let reader = sfa::Reader::new(&path)?;
let manifest = Manifest::decode_from(&path, &reader)?;
let mut file = StdFs.open(&path, &crate::fs::FsOpenOptions::new().read(true))?;
let reader = sfa::Reader::from_reader(&mut file)?;
let manifest = Manifest::decode_from(&path, &reader, &StdFs)?;
assert_eq!(manifest.comparator_name, "u64-big-endian");
Ok(())
}
Expand All @@ -195,8 +205,9 @@ mod tests {
let long_name = "x".repeat(300);
write_test_manifest(&path, Some(&long_name))?;

let reader = sfa::Reader::new(&path)?;
let result = Manifest::decode_from(&path, &reader);
let mut file = StdFs.open(&path, &crate::fs::FsOpenOptions::new().read(true))?;
let reader = sfa::Reader::from_reader(&mut file)?;
let result = Manifest::decode_from(&path, &reader, &StdFs);
assert!(
matches!(result, Err(crate::Error::DecompressedSizeTooLarge { .. })),
"expected DecompressedSizeTooLarge"
Expand Down Expand Up @@ -226,8 +237,9 @@ mod tests {

writer.finish()?;

let reader = sfa::Reader::new(&path)?;
let result = Manifest::decode_from(&path, &reader);
let mut file = StdFs.open(&path, &crate::fs::FsOpenOptions::new().read(true))?;
let reader = sfa::Reader::from_reader(&mut file)?;
let result = Manifest::decode_from(&path, &reader, &StdFs);
assert!(
matches!(result, Err(crate::Error::Utf8(_))),
"expected Utf8 error"
Expand Down
2 changes: 1 addition & 1 deletion src/tree/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ impl<'a> Ingestion<'a> {

// Perform maintenance on the version history (e.g., clean up old versions).
// We use gc_watermark=0 since ingestion doesn't affect sealed memtables.
if let Err(e) = version_lock.maintenance(&self.tree.config.path, 0) {
if let Err(e) = version_lock.maintenance(&self.tree.config.path, 0, &*self.tree.config.fs) {
log::warn!("Version GC failed: {e:?}");
}

Expand Down
41 changes: 19 additions & 22 deletions src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,8 @@ impl AbstractTree for Tree {
&*self.config.fs,
)?;

if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark) {
if let Err(e) = version_lock.maintenance(&self.config.path, gc_watermark, &*self.config.fs)
{
log::warn!("Version GC failed: {e:?}");
}

Expand Down Expand Up @@ -1201,20 +1202,15 @@ impl Tree {
pub(crate) fn open(config: Config) -> crate::Result<Self> {
log::debug!("Opening LSM-tree at {}", config.path.display());

// NOTE: try_exists() and recover() below use std::fs directly, bypassing
// the pluggable Fs trait. This means MemFs (and other non-StdFs backends)
// cannot reopen a tree after drop — only new tree creation works.
// Tracked in: #209

// Check for old version
if config.path.join("version").try_exists()? {
if config.fs.exists(&config.path.join("version"))? {
log::error!(
"It looks like you are trying to open a V1 database - the database needs a manual migration, however a migration tool is not provided, as V1 is extremely outdated."
);
return Err(crate::Error::InvalidVersion(FormatVersion::V1.into()));
}

let tree = if config.path.join(CURRENT_VERSION_FILE).try_exists()? {
let tree = if config.fs.exists(&config.path.join(CURRENT_VERSION_FILE))? {
Self::recover(config)
} else {
Self::create_new(config)
Expand Down Expand Up @@ -1366,10 +1362,14 @@ impl Tree {
// recover_levels for table/blob data). This is intentional — metadata
// validation must complete before any disk-mutating recovery work.
{
let version_id = crate::version::recovery::get_current_version(&config.path)?;
let version_id =
crate::version::recovery::get_current_version(&config.path, &*config.fs)?;
let manifest_path = config.path.join(format!("v{version_id}"));
let reader = sfa::Reader::new(&manifest_path)?;
let manifest = Manifest::decode_from(&manifest_path, &reader)?;
let mut manifest_file = config
.fs
.open(&manifest_path, &crate::fs::FsOpenOptions::new().read(true))?;
let reader = sfa::Reader::from_reader(&mut manifest_file)?;
let manifest = Manifest::decode_from(&manifest_path, &reader, &*config.fs)?;

if !matches!(manifest.version, FormatVersion::V3 | FormatVersion::V4) {
return Err(crate::Error::InvalidVersion(manifest.version.into()));
Expand Down Expand Up @@ -1497,7 +1497,7 @@ impl Tree {

let tree_path = tree_path.as_ref();

let recovery = recover(tree_path)?;
let recovery = recover(tree_path, &*config.fs)?;

let mut table_map = {
let mut result: crate::HashMap<TableId, (u8 /* Level index */, Checksum, SeqNo)> =
Expand Down Expand Up @@ -1693,7 +1693,7 @@ impl Tree {

// NOTE: Cleanup old versions
// But only after we definitely recovered the latest version
Self::cleanup_orphaned_version(tree_path, version.id())?;
Self::cleanup_orphaned_version(tree_path, version.id(), &*config.fs)?;

for (table_path, orphan_fs) in orphaned_tables {
log::debug!("Deleting orphaned table {}", table_path.display());
Expand All @@ -1711,21 +1711,18 @@ impl Tree {
fn cleanup_orphaned_version(
path: &Path,
latest_version_id: crate::version::VersionId,
fs: &dyn crate::fs::Fs,
) -> crate::Result<()> {
let version_str = format!("v{latest_version_id}");

for file in std::fs::read_dir(path)? {
let dirent = file?;

if dirent.file_type()?.is_dir() {
for dirent in fs.read_dir(path)? {
if dirent.is_dir {
continue;
}

let name = dirent.file_name();

if name.to_string_lossy().starts_with('v') && *name != *version_str {
log::trace!("Cleanup orphaned version {}", name.display());
std::fs::remove_file(dirent.path())?;
if dirent.file_name.starts_with('v') && dirent.file_name != version_str {
log::trace!("Cleanup orphaned version {}", dirent.file_name);
fs.remove_file(&dirent.path)?;
Comment thread
polaz marked this conversation as resolved.
Outdated
Comment thread
polaz marked this conversation as resolved.
Outdated
}
Comment thread
polaz marked this conversation as resolved.
}

Expand Down
Loading
Loading