Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 33 additions & 26 deletions src/abstract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,13 @@
// (found in the LICENSE-* files in the repository)

use crate::{
iter_guard::IterGuardImpl, table::Table, version::Version, vlog::BlobFile, AnyTree, BlobTree,
Config, Guard, InternalValue, KvPair, Memtable, SeqNo, TableId, Tree, UserKey, UserValue,
fs::FileSystem,
iter_guard::IterGuardImpl,
table::Table,
version::Version,
vlog::BlobFile,
AnyTree, BlobTree, Config, Guard, InternalValue, KvPair, Memtable, SeqNo, TableId, Tree,
UserKey, UserValue,
};
use std::{
ops::RangeBounds,
Expand All @@ -13,11 +18,11 @@ use std::{

pub type RangeItem = crate::Result<KvPair>;

type FlushToTablesResult = (Vec<Table>, Option<Vec<BlobFile>>);
type FlushToTablesResult<F> = (Vec<Table<F>>, Option<Vec<BlobFile<F>>>);

/// Generic Tree API
#[enum_dispatch::enum_dispatch]
pub trait AbstractTree {
pub trait AbstractTree<F: FileSystem + 'static> {
/// Returns the number of cached table file descriptors.
fn table_file_cache_size(&self) -> usize;

Expand All @@ -40,10 +45,12 @@ pub trait AbstractTree {
fn get_internal_entry(&self, key: &[u8], seqno: SeqNo) -> crate::Result<Option<InternalValue>>;

#[doc(hidden)]
fn current_version(&self) -> Version;
fn current_version(&self) -> Version<F>;

#[doc(hidden)]
fn get_version_history_lock(&self) -> RwLockWriteGuard<'_, crate::version::SuperVersions>;
fn get_version_history_lock(
&self,
) -> RwLockWriteGuard<'_, crate::version::SuperVersions<F>>;

/// Seals the active memtable and flushes to table(s).
///
Expand Down Expand Up @@ -120,7 +127,7 @@ pub trait AbstractTree {
&self,
seqno: SeqNo,
index: Option<(Arc<Memtable>, SeqNo)>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static> {
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<F>> + Send + 'static> {
self.range::<&[u8], _>(.., seqno, index)
}

Expand All @@ -132,7 +139,7 @@ pub trait AbstractTree {
prefix: K,
seqno: SeqNo,
index: Option<(Arc<Memtable>, SeqNo)>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<F>> + Send + 'static>;

/// Returns an iterator over a range of items.
///
Expand All @@ -142,7 +149,7 @@ pub trait AbstractTree {
range: R,
seqno: SeqNo,
index: Option<(Arc<Memtable>, SeqNo)>,
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl> + Send + 'static>;
) -> Box<dyn DoubleEndedIterator<Item = IterGuardImpl<F>> + Send + 'static>;

/// Returns the approximate number of tombstones in the tree.
fn tombstone_count(&self) -> u64;
Expand Down Expand Up @@ -216,7 +223,7 @@ pub trait AbstractTree {
fn flush_to_tables(
&self,
stream: impl Iterator<Item = crate::Result<InternalValue>>,
) -> crate::Result<Option<FlushToTablesResult>>;
) -> crate::Result<Option<FlushToTablesResult<F>>>;

/// Atomically registers flushed tables into the tree, removing their associated sealed memtables.
///
Expand All @@ -225,8 +232,8 @@ pub trait AbstractTree {
/// Will return `Err` if an IO error occurs.
fn register_tables(
&self,
tables: &[Table],
blob_files: Option<&[BlobFile]>,
tables: &[Table<F>],
blob_files: Option<&[BlobFile<F>]>,
frag_map: Option<crate::blob_tree::FragmentationMap>,
sealed_memtables_to_delete: &[crate::tree::inner::MemtableId],
gc_watermark: SeqNo,
Expand All @@ -245,15 +252,15 @@ pub trait AbstractTree {
/// Will return `Err` if an IO error occurs.
fn compact(
&self,
strategy: Arc<dyn crate::compaction::CompactionStrategy>,
strategy: Arc<dyn crate::compaction::CompactionStrategy<F>>,
seqno_threshold: SeqNo,
) -> crate::Result<()>;

/// Returns the next table's ID.
fn get_next_table_id(&self) -> TableId;

/// Returns the tree config.
fn tree_config(&self) -> &Config;
fn tree_config(&self) -> &Config<F>;

/// Returns the highest sequence number.
fn get_highest_seqno(&self) -> Option<SeqNo> {
Expand Down Expand Up @@ -315,7 +322,7 @@ pub trait AbstractTree {
/// use lsm_tree::{AbstractTree, Config, Tree};
///
/// let folder = tempfile::tempdir()?;
/// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
/// let tree = Config::<lsm_tree::fs::StdFileSystem>::new(folder, Default::default(), Default::default()).open()?;
///
/// assert_eq!(tree.len(0, None)?, 0);
/// tree.insert("1", "abc", 0);
Expand Down Expand Up @@ -350,7 +357,7 @@ pub trait AbstractTree {
/// # let folder = tempfile::tempdir()?;
/// use lsm_tree::{AbstractTree, Config, Tree};
///
/// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
/// let tree = Config::<lsm_tree::fs::StdFileSystem>::new(folder, Default::default(), Default::default()).open()?;
/// assert!(tree.is_empty(0, None)?);
///
/// tree.insert("a", "abc", 0);
Expand Down Expand Up @@ -380,7 +387,7 @@ pub trait AbstractTree {
/// # use lsm_tree::{AbstractTree, Config, Tree, Guard};
/// #
/// # let folder = tempfile::tempdir()?;
/// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
/// let tree = Config::<lsm_tree::fs::StdFileSystem>::new(folder, Default::default(), Default::default()).open()?;
///
/// tree.insert("1", "abc", 0);
/// tree.insert("3", "abc", 1);
Expand All @@ -399,7 +406,7 @@ pub trait AbstractTree {
&self,
seqno: SeqNo,
index: Option<(Arc<Memtable>, SeqNo)>,
) -> Option<IterGuardImpl> {
) -> Option<IterGuardImpl<F>> {
self.iter(seqno, index).next()
}

Expand All @@ -413,7 +420,7 @@ pub trait AbstractTree {
/// # use lsm_tree::{AbstractTree, Config, Tree, Guard};
/// #
/// # let folder = tempfile::tempdir()?;
/// # let tree = Config::new(folder, Default::default(), Default::default()).open()?;
/// # let tree = Config::<lsm_tree::fs::StdFileSystem>::new(folder, Default::default(), Default::default()).open()?;
/// #
/// tree.insert("1", "abc", 0);
/// tree.insert("3", "abc", 1);
Expand All @@ -432,7 +439,7 @@ pub trait AbstractTree {
&self,
seqno: SeqNo,
index: Option<(Arc<Memtable>, SeqNo)>,
) -> Option<IterGuardImpl> {
) -> Option<IterGuardImpl<F>> {
self.iter(seqno, index).next_back()
}

Expand All @@ -444,7 +451,7 @@ pub trait AbstractTree {
/// # let folder = tempfile::tempdir()?;
/// use lsm_tree::{AbstractTree, Config, Tree};
///
/// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
/// let tree = Config::<lsm_tree::fs::StdFileSystem>::new(folder, Default::default(), Default::default()).open()?;
/// tree.insert("a", "my_value", 0);
///
/// let size = tree.size_of("a", 1)?.unwrap_or_default();
Expand All @@ -469,7 +476,7 @@ pub trait AbstractTree {
/// # let folder = tempfile::tempdir()?;
/// use lsm_tree::{AbstractTree, Config, Tree};
///
/// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
/// let tree = Config::<lsm_tree::fs::StdFileSystem>::new(folder, Default::default(), Default::default()).open()?;
/// tree.insert("a", "my_value", 0);
///
/// let item = tree.get("a", 1)?;
Expand All @@ -491,7 +498,7 @@ pub trait AbstractTree {
/// # let folder = tempfile::tempdir()?;
/// # use lsm_tree::{AbstractTree, Config, Tree};
/// #
/// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
/// let tree = Config::<lsm_tree::fs::StdFileSystem>::new(folder, Default::default(), Default::default()).open()?;
/// assert!(!tree.contains_key("a", 0)?);
///
/// tree.insert("a", "abc", 0);
Expand Down Expand Up @@ -519,7 +526,7 @@ pub trait AbstractTree {
/// # let folder = tempfile::tempdir()?;
/// use lsm_tree::{AbstractTree, Config, Tree};
///
/// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
/// let tree = Config::<lsm_tree::fs::StdFileSystem>::new(folder, Default::default(), Default::default()).open()?;
/// tree.insert("a", "abc", 0);
/// #
/// # Ok::<(), lsm_tree::Error>(())
Expand All @@ -545,7 +552,7 @@ pub trait AbstractTree {
/// # let folder = tempfile::tempdir()?;
/// # use lsm_tree::{AbstractTree, Config, Tree};
/// #
/// # let tree = Config::new(folder, Default::default(), Default::default()).open()?;
/// # let tree = Config::<lsm_tree::fs::StdFileSystem>::new(folder, Default::default(), Default::default()).open()?;
/// tree.insert("a", "abc", 0);
///
/// let item = tree.get("a", 1)?.expect("should have item");
Expand Down Expand Up @@ -579,7 +586,7 @@ pub trait AbstractTree {
/// # let folder = tempfile::tempdir()?;
/// # use lsm_tree::{AbstractTree, Config, Tree};
/// #
/// # let tree = Config::new(folder, Default::default(), Default::default()).open()?;
/// # let tree = Config::<lsm_tree::fs::StdFileSystem>::new(folder, Default::default(), Default::default()).open()?;
/// tree.insert("a", "abc", 0);
///
/// let item = tree.get("a", 1)?.expect("should have item");
Expand Down
13 changes: 8 additions & 5 deletions src/any_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@
// This source code is licensed under both the Apache 2.0 and MIT License
// (found in the LICENSE-* files in the repository)

use crate::{BlobTree, Tree};
use crate::{
fs::{FileSystem, StdFileSystem},
BlobTree, Tree,
};
use enum_dispatch::enum_dispatch;

/// May be a standard [`Tree`] or a [`BlobTree`]
#[derive(Clone)]
#[enum_dispatch(AbstractTree)]
pub enum AnyTree {
#[enum_dispatch(AbstractTree<F>)]
pub enum AnyTree<F: FileSystem + 'static = StdFileSystem> {
/// Standard LSM-tree, see [`Tree`]
Standard(Tree),
Standard(Tree<F>),

/// Key-value separated LSM-tree, see [`BlobTree`]
Blob(BlobTree),
Blob(BlobTree<F>),
}
6 changes: 4 additions & 2 deletions src/blob_tree/gc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

use crate::{
blob_tree::handle::BlobIndirection, coding::Decode, compaction::stream::ExpiredKvCallback,
version::BlobFileList, vlog::BlobFileId,
fs::FileSystem,
version::BlobFileList,
vlog::BlobFileId,
};

/// Tracks fragmentation information in a blob file
Expand Down Expand Up @@ -58,7 +60,7 @@ impl FragmentationMap {

/// Removes blob file entries that are not part of the value log (anymore)
/// to reduce linear memory growth.
pub fn prune(&mut self, value_log: &BlobFileList) {
pub fn prune<F: FileSystem>(&mut self, value_log: &BlobFileList<F>) {
self.0.retain(|&k, _| value_log.contains_key(k));
}

Expand Down
23 changes: 12 additions & 11 deletions src/blob_tree/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use crate::{
blob_tree::handle::BlobIndirection,
file::BLOBS_FOLDER,
fs::{FileSystem, StdFileSystem},
table::Table,
tree::ingest::Ingestion as TableIngestion,
vlog::{BlobFileWriter, ValueHandle},
Expand All @@ -17,22 +18,22 @@ use crate::{
///
/// Uses table ingestion for the index and a blob file writer for large
/// values so both streams advance together.
pub struct BlobIngestion<'a> {
tree: &'a crate::BlobTree,
pub(crate) table: TableIngestion<'a>,
pub(crate) blob: BlobFileWriter,
pub struct BlobIngestion<'a, F: FileSystem = StdFileSystem> {
tree: &'a crate::BlobTree<F>,
pub(crate) table: TableIngestion<'a, F>,
pub(crate) blob: BlobFileWriter<F>,
seqno: SeqNo,
separation_threshold: u32,
last_key: Option<UserKey>,
}

impl<'a> BlobIngestion<'a> {
impl<'a, F: FileSystem + 'static> BlobIngestion<'a, F> {
/// Creates a new ingestion.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
pub fn new(tree: &'a crate::BlobTree) -> crate::Result<Self> {
pub fn new(tree: &'a crate::BlobTree<F>) -> crate::Result<Self> {
#[expect(
clippy::expect_used,
reason = "cannot define blob tree without kv separation options"
Expand All @@ -47,7 +48,7 @@ impl<'a> BlobIngestion<'a> {
let blob_file_size = kv.file_target_size;

let table = TableIngestion::new(&tree.index)?;
let blob = BlobFileWriter::new(
let blob = BlobFileWriter::<F>::new(
tree.index.0.blob_file_id_counter.clone(),
tree.index.config.path.join(BLOBS_FOLDER),
)?
Expand Down Expand Up @@ -224,8 +225,8 @@ impl<'a> BlobIngestion<'a> {
// pressure unnecessarily.
let created_tables = results
.into_iter()
.map(|(table_id, checksum)| -> crate::Result<Table> {
Table::recover(
.map(|(table_id, checksum)| -> crate::Result<Table<F>> {
Table::<F>::recover(
index
.config
.path
Expand Down Expand Up @@ -268,15 +269,15 @@ impl<'a> BlobIngestion<'a> {

// Perform maintenance on the version history (e.g., clean up old versions).
// We use gc_watermark=0 since ingestion doesn't affect sealed memtables.
if let Err(e) = version_lock.maintenance(&index.config.path, 0) {
if let Err(e) = version_lock.maintenance::<F>(&index.config.path, 0) {
log::warn!("Version GC failed: {e:?}");
}

Ok(())
}

#[inline]
fn index(&self) -> &crate::Tree {
fn index(&self) -> &crate::Tree<F> {
&self.tree.index
}
}
Loading