diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 727c42ecb..2917797ae 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -25,6 +25,7 @@ This is a **maintained fork** of [fjall-rs/lsm-tree](https://github.com/fjall-rs - **Unsizing coercions:** `Arc::new(ConcreteType) as Arc` is valid Rust — the compiler performs an unsizing coercion from `Arc` to `Arc` when `T: Trait`. This is NOT an invalid cast. Do not flag it as a compile error. - **Feature gates:** Code behind `#[cfg(feature = "...")]` must compile with any combination of features. Variables used only in feature-gated branches must also be feature-gated. - **Definite initialization:** `let x: T;` without a default value is valid Rust — the compiler tracks initialization per control-flow path and does not drop uninitialized bindings; do not flag this as a bug when the variable is only used in paths that initialize it. +- **Unit structs as values:** `struct Foo;` (unit struct) is both a type AND a value (constructor). Writing `HashMap::with_capacity_and_hasher(n, FxBuildHasher)` is valid when `FxBuildHasher` is a unit struct — do not confuse with type aliases (`type Foo = Bar`) which cannot be used as values. ## Testing Standards diff --git a/Cargo.toml b/Cargo.toml index f02cb281d..0dea1b8a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ interval-heap = "0.0.5" log = "0.4.27" lz4_flex = { version = "0.13.0", optional = true, default-features = false } zstd = { version = "0.13", optional = true, default-features = false } -structured-zstd = { version = "0.0.1", optional = true, default-features = false, features = ["std"] } +structured-zstd = { version = "0.0.7", optional = true, default-features = false, features = ["std"] } quick_cache = { version = "0.6.16", default-features = false, features = [] } rustc-hash = "2.1.1" self_cell = "1.2.0" @@ -90,3 +90,9 @@ name = "index_block" harness = false path = "benches/index_block.rs" required-features = [] + +[[bench]] +name = "batch_ops" +harness = false +path = "benches/batch_ops.rs" +required-features = [] diff --git a/README.md b/README.md index 0c52b527f..827b7fa07 100644 --- a/README.md +++ b/README.md @@ -12,7 +12,7 @@ [![License](https://img.shields.io/badge/license-Apache--2.0-blue)](#license) > LSM-tree engine for [CoordiNode](https://github.com/structured-world/coordinode), maintained by [Structured World Foundation](https://sw.foundation). -> Derivative work of [fjall-rs/lsm-tree](https://github.com/fjall-rs/lsm-tree), developed independently with diverging features: zstd dictionary compression, custom sequence number generators, multi_get, intra-L0 compaction, and security hardening. +> Derivative work of [fjall-rs/lsm-tree](https://github.com/fjall-rs/lsm-tree), developed independently with diverging features: zstd dictionary compression, custom sequence number generators, multi_get (batch-optimized), PinnableSlice zero-copy reads, WriteBatch seqno-grouped batch writes with caller-controlled atomic visibility, intra-L0 compaction, and security hardening. > [!IMPORTANT] > This fork now introduces a fork-specific **disk format V4** compatibility boundary. diff --git a/benches/batch_ops.rs b/benches/batch_ops.rs new file mode 100644 index 000000000..3a828a389 --- /dev/null +++ b/benches/batch_ops.rs @@ -0,0 +1,128 @@ +use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main}; +use lsm_tree::{AbstractTree, AnyTree, Config, SeqNo, SequenceNumberCounter, WriteBatch}; + +fn setup_tree_with_disk_data(n: u64) -> (AnyTree, tempfile::TempDir) { + let folder = tempfile::tempdir().unwrap(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open() + .unwrap(); + + for i in 0..n { + tree.insert(format!("key_{i:06}"), format!("value_{i}"), i); + } + tree.flush_active_memtable(0).unwrap(); + + (tree, folder) +} + +fn bench_multi_get(c: &mut Criterion) { + let mut group = c.benchmark_group("multi_get"); + + for count in [10, 50, 100, 500] { + let (tree, _folder) = setup_tree_with_disk_data(1000); + + let keys: Vec = (0..count).map(|i| format!("key_{i:06}")).collect(); + + group.bench_with_input(BenchmarkId::new("disk", count), &count, |b, _| { + b.iter(|| { + std::hint::black_box(tree.multi_get(&keys, SeqNo::MAX).unwrap()); + }); + }); + } + + group.finish(); +} + +fn bench_get_pinned(c: &mut Criterion) { + let mut group = c.benchmark_group("get_pinned"); + + let (tree, _folder) = setup_tree_with_disk_data(1000); + + group.bench_function("disk_hit", |b| { + b.iter(|| { + std::hint::black_box(tree.get_pinned("key_000500", SeqNo::MAX).unwrap()); + }); + }); + + // Compare with regular get + group.bench_function("get_regular", |b| { + b.iter(|| { + std::hint::black_box(tree.get("key_000500", SeqNo::MAX).unwrap()); + }); + }); + + group.finish(); +} + +fn setup_empty_tree() -> (AnyTree, tempfile::TempDir) { + let folder = tempfile::tempdir().unwrap(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open() + .unwrap(); + (tree, folder) +} + +fn bench_write_batch(c: &mut Criterion) { + let mut group = c.benchmark_group("write_batch"); + + for batch_size in [10, 50, 100, 500] { + group.bench_with_input( + BenchmarkId::new("insert", batch_size), + &batch_size, + |b, &size| { + let keys: Vec = (0..size).map(|i| format!("key_{i:04}")).collect(); + + b.iter_batched( + setup_empty_tree, + |(tree, _folder)| { + let mut batch = WriteBatch::with_capacity(size); + for k in &keys { + batch.insert(k.as_str(), "value"); + } + tree.apply_batch(batch, 0).unwrap(); + }, + criterion::BatchSize::SmallInput, + ); + }, + ); + } + + // Compare with individual inserts + for batch_size in [10, 50, 100, 500] { + group.bench_with_input( + BenchmarkId::new("individual_inserts", batch_size), + &batch_size, + |b, &size| { + let keys: Vec = (0..size).map(|i| format!("key_{i:04}")).collect(); + + b.iter_batched( + setup_empty_tree, + |(tree, _folder)| { + for k in &keys { + tree.insert(k.as_str(), "value", 0); + } + }, + criterion::BatchSize::SmallInput, + ); + }, + ); + } + + group.finish(); +} + +criterion_group!( + benches, + bench_multi_get, + bench_get_pinned, + bench_write_batch +); +criterion_main!(benches); diff --git a/src/abstract_tree.rs b/src/abstract_tree.rs index 213b1aa32..7378f3a3b 100644 --- a/src/abstract_tree.rs +++ b/src/abstract_tree.rs @@ -554,6 +554,44 @@ pub trait AbstractTree: sealed::Sealed { /// Will return `Err` if an IO error occurs. fn get>(&self, key: K, seqno: SeqNo) -> crate::Result>; + /// Retrieves an item from the tree as a [`PinnableSlice`]. + /// + /// When the value is backed by an on-disk data block, implementations + /// may return [`PinnableSlice::Pinned`] holding a reference to that block's + /// decompressed buffer (avoiding a data copy). Memtable and blob-resolved + /// values use [`PinnableSlice::Owned`]. The default implementation always + /// returns `Owned`; only [`Tree`] overrides with the pinned path. + /// + /// The existing [`AbstractTree::get`] method is unaffected. + /// + /// # Examples + /// + /// ``` + /// # let folder = tempfile::tempdir()?; + /// use lsm_tree::{AbstractTree, Config, Tree}; + /// + /// let tree = Config::new(&folder, Default::default(), Default::default()).open()?; + /// tree.insert("a", "my_value", 0); + /// + /// let item = tree.get_pinned("a", 1)?; + /// assert_eq!(item.as_ref().map(|v| v.as_ref()), Some("my_value".as_bytes())); + /// # + /// # Ok::<(), lsm_tree::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + fn get_pinned>( + &self, + key: K, + seqno: SeqNo, + ) -> crate::Result> { + // Default: delegate to get() and wrap as Owned + self.get(key, seqno) + .map(|opt| opt.map(crate::PinnableSlice::owned)) + } + /// Returns `true` if the tree contains the specified key. /// /// # Examples @@ -656,6 +694,44 @@ pub trait AbstractTree: sealed::Sealed { keys.into_iter().map(|key| self.get(key, seqno)).collect() } + /// Applies a [`WriteBatch`] with the given sequence number. + /// + /// All entries share a single seqno. This is more efficient than individual + /// writes because the version-history lock and memtable size accounting + /// are performed only once for the entire batch. + /// + /// **Visibility:** entries become individually visible to concurrent readers + /// as they are inserted. For atomic batch visibility, the caller must + /// publish `seqno` (via `visible_seqno.fetch_max(seqno + 1)`) only + /// **after** this method returns. + /// + /// Returns the total bytes added and new size of the memtable. + /// + /// # Examples + /// + /// ``` + /// # let folder = tempfile::tempdir()?; + /// use lsm_tree::{AbstractTree, Config, Tree, WriteBatch}; + /// + /// let tree = Config::new(&folder, Default::default(), Default::default()).open()?; + /// + /// let mut batch = WriteBatch::new(); + /// batch.insert("key1", "value1"); + /// batch.insert("key2", "value2"); + /// batch.remove("key3"); + /// + /// let (bytes_added, memtable_size) = tree.apply_batch(batch, 0)?; + /// assert!(bytes_added > 0); + /// # + /// # Ok::<(), lsm_tree::Error>(()) + /// ``` + /// + /// # Errors + /// + /// Returns [`Error::MixedOperationBatch`](crate::Error::MixedOperationBatch) + /// if the batch contains mixed operation types for the same user key. + fn apply_batch(&self, batch: crate::WriteBatch, seqno: SeqNo) -> crate::Result<(u64, u64)>; + /// Inserts a key-value pair into the tree. /// /// If the key already exists, the item will be overwritten. diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 53946d760..399208149 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -667,6 +667,10 @@ impl AbstractTree for BlobTree { self.index.get_highest_persisted_seqno() } + fn apply_batch(&self, batch: crate::WriteBatch, seqno: SeqNo) -> crate::Result<(u64, u64)> { + self.index.apply_batch(batch, seqno) + } + fn insert, V: Into>( &self, key: K, @@ -681,16 +685,119 @@ impl AbstractTree for BlobTree { self.resolve_key(&super_version, key.as_ref(), seqno) } + #[expect( + clippy::indexing_slicing, + reason = "indices are generated from 0..n range, always in bounds" + )] fn multi_get>( &self, keys: impl IntoIterator, seqno: SeqNo, ) -> crate::Result>> { + let keys: Vec<_> = keys.into_iter().collect(); + let n = keys.len(); + if n == 0 { + return Ok(Vec::new()); + } + let super_version = self.index.get_version_for_snapshot(seqno); + let comparator = self.index.config.comparator.as_ref(); + + // For small batches, use the simple per-key path + if n <= 2 { + return keys + .iter() + .map(|key| self.resolve_key(&super_version, key.as_ref(), seqno)) + .collect(); + } + + // Phase 1: Check memtables (unsorted — defer sort+hash for SST phase) + let mut internal_entries: Vec> = vec![None; n]; + let mut remaining: Vec = Vec::with_capacity(n); + + for idx in 0..n { + let key = keys[idx].as_ref(); + if let Some(entry) = super_version.active_memtable.get(key, seqno) { + internal_entries[idx] = Some(entry); + continue; + } + if let Some(entry) = + crate::Tree::get_internal_entry_from_sealed_memtables(&super_version, key, seqno) + { + internal_entries[idx] = Some(entry); + continue; + } + remaining.push(idx); + } + + // Phase 2: Sort + hash only if memtable misses exist + if !remaining.is_empty() { + remaining.sort_by(|&a, &b| comparator.compare(keys[a].as_ref(), keys[b].as_ref())); + + let miss_keys: Vec<(usize, u64)> = remaining + .iter() + .map(|&idx| { + let hash = + crate::table::filter::standard_bloom::Builder::get_hash(keys[idx].as_ref()); + (idx, hash) + }) + .collect(); + + crate::Tree::batch_get_from_tables( + &super_version.version, + &keys, + miss_keys, + seqno, + comparator, + &mut internal_entries, + )?; + } + + // Phase 3: Resolve each entry (tombstones, RT suppression, merge, blob indirections) + let mut results = vec![None; n]; + for idx in 0..n { + if let Some(item) = internal_entries[idx].take() { + if item.is_tombstone() { + continue; + } + if crate::Tree::is_suppressed_by_range_tombstones( + &super_version, + keys[idx].as_ref(), + item.key.seqno, + seqno, + comparator, + ) { + continue; + } + // Merge operand resolution. Merge operands in BlobTree are stored + // inline (not as blob indirection), so the pipeline result is a + // plain value. Without a merge operator, return raw operand value + // (same as resolve_key / resolve_pinned_entry behavior). + if item.key.value_type.is_merge_operand() { + if let Some(merge_op) = &self.index.config.merge_operator { + results[idx] = crate::Tree::resolve_merge_via_pipeline( + super_version.clone(), + keys[idx].as_ref(), + seqno, + Arc::clone(merge_op), + )?; + } else { + results[idx] = Some(item.value); + } + continue; + } + let (_, v) = resolve_value_handle( + self.id(), + self.blobs_folder.as_path(), + &self.index.config.cache, + &super_version.version, + item, + )?; + results[idx] = Some(v); + } + } - keys.into_iter() - .map(|key| self.resolve_key(&super_version, key.as_ref(), seqno)) - .collect() + Ok(results) } fn merge, V: Into>( diff --git a/src/error.rs b/src/error.rs index e42a23675..13ff6b6cc 100644 --- a/src/error.rs +++ b/src/error.rs @@ -107,6 +107,17 @@ pub enum Error { offset: u64, }, + /// A [`WriteBatch`](crate::WriteBatch) contains mixed operation types + /// (e.g. insert + remove) for the same user key. + /// + /// Mixed ops at the same logical version are rejected because the + /// memtable/skiplist ordering ties on `(user_key, seqno)` and does not + /// include `value_type` as a tie-breaker. That would otherwise make + /// equal-key entries with different operation types ambiguous to later + /// reads and merges, yielding tie-break-dependent "last write wins" + /// semantics. + MixedOperationBatch, + /// Route-compatibility mismatch on reopen. /// /// Recovery found fewer tables on disk than the manifest expects, and all diff --git a/src/lib.rs b/src/lib.rs index 0ac3c22b1..3287b598f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -133,6 +133,7 @@ pub(crate) mod metrics; pub mod mvcc_stream; mod path; +mod pinnable_slice; mod prefix; #[doc(hidden)] @@ -161,6 +162,7 @@ pub mod util; mod value; mod value_type; +mod write_batch; /// Integrity verification for SST and blob files. pub mod verify; @@ -196,6 +198,9 @@ pub use encryption::EncryptionProvider; #[cfg(feature = "encryption")] pub use encryption::Aes256GcmProvider; +pub use pinnable_slice::PinnableSlice; +pub use write_batch::WriteBatch; + pub use { abstract_tree::AbstractTree, any_tree::AnyTree, diff --git a/src/memtable/mod.rs b/src/memtable/mod.rs index e2c72c7e9..c1d49ac6a 100644 --- a/src/memtable/mod.rs +++ b/src/memtable/mod.rs @@ -185,6 +185,61 @@ impl Memtable { self.items.is_empty() && self.range_tombstone_count() == 0 } + /// Inserts multiple items into the memtable in bulk. + /// + /// More efficient than calling [`Memtable::insert`] in a loop because it + /// performs a single `fetch_add` for the total size and a single + /// `fetch_max` for the highest seqno. + /// + /// Returns `(total_bytes_added, new_memtable_size)`. + #[doc(hidden)] + pub fn insert_batch(&self, items: Vec) -> (u64, u64) { + if items.is_empty() { + let size = self + .approximate_size + .load(std::sync::atomic::Ordering::Acquire); + return (0, size); + } + + let mut total_size: u64 = 0; + let mut max_seqno: u64 = 0; + + let overhead = + std::mem::size_of::() + std::mem::size_of::(); + + for item in &items { + #[expect( + clippy::expect_used, + reason = "keys are limited to 16-bit length + values are limited to 32-bit length" + )] + let item_size: u64 = (item.key.user_key.len() + item.value.len() + overhead) + .try_into() + .expect("should fit into u64"); + + total_size = total_size.saturating_add(item_size); + + if item.key.seqno > max_seqno { + max_seqno = item.key.seqno; + } + } + + let size_before = self + .approximate_size + .fetch_add(total_size, std::sync::atomic::Ordering::AcqRel); + + for item in items { + let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type); + self.items.insert(&key, &item.value); + } + + self.highest_seqno + .fetch_max(max_seqno, std::sync::atomic::Ordering::AcqRel); + + // fetch_add returns value BEFORE the add, so size_before + total_size + // = value AFTER add = new memtable size. Same pattern as Memtable::insert(). + (total_size, size_before + total_size) + } + /// Inserts an item into the memtable #[doc(hidden)] pub fn insert(&self, item: InternalValue) -> (u64, u64) { diff --git a/src/pinnable_slice.rs b/src/pinnable_slice.rs new file mode 100644 index 000000000..dbf7c76d1 --- /dev/null +++ b/src/pinnable_slice.rs @@ -0,0 +1,142 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! Zero-copy value reference that keeps the decompressed block buffer alive. +//! +//! [`PinnableSlice`] is inspired by `RocksDB`'s `PinnableSlice` +//! (`include/rocksdb/slice.h:179-263`). It wraps a value that was read from +//! the LSM tree and indicates whether the underlying data shares the +//! decompressed block buffer or is independently owned (e.g. from a memtable +//! or merge result). +//! +//! When the value comes from an on-disk data block, holding a +//! `PinnableSlice::Pinned` keeps the block's decompressed buffer alive +//! (via the refcounted [`Slice`] / `ByteView` backing) for the duration of +//! the reference. The value bytes are a sub-slice of that buffer — no copy +//! is performed. Note: this does **not** prevent the block cache from +//! evicting its entry; it only ensures the backing memory remains valid. +//! +//! Memtable and blob-resolved values use the `Owned` variant. + +use crate::{Slice, UserValue, table::Block}; + +/// A value reference that may share the decompressed block buffer. +/// +/// Use [`PinnableSlice::as_ref`] to access the raw bytes regardless of variant. +/// +/// # Lifetime +/// +/// The `Pinned` variant holds a [`Block`] clone whose `data` field is a +/// refcounted [`Slice`]. As long as the `PinnableSlice` is alive, the +/// decompressed block buffer remains valid. Dropping it releases the +/// reference count on the underlying `ByteView` allocation. +#[derive(Clone)] +pub enum PinnableSlice { + /// Value sharing the decompressed block buffer — zero copy. + /// + /// The [`Block`] keeps the decompressed data alive via refcounted + /// `Slice` / `ByteView`. `value` is a sub-slice created via + /// [`Slice::slice`], sharing the same backing allocation. + Pinned { + /// Keeps the decompressed block buffer alive via refcount. + _block: Block, + /// Zero-copy sub-slice into the block's decompressed data. + value: Slice, + }, + + /// Value owned independently (memtable, blob, merge result). + Owned(UserValue), +} + +impl PinnableSlice { + /// Creates a pinned value sharing the decompressed block buffer. + #[must_use] + pub fn pinned(block: Block, value: Slice) -> Self { + Self::Pinned { + _block: block, + value, + } + } + + /// Creates an owned value (not sharing any block buffer). + #[must_use] + pub fn owned(value: UserValue) -> Self { + Self::Owned(value) + } + + /// Returns `true` if this value shares the decompressed block buffer. + #[must_use] + pub fn is_pinned(&self) -> bool { + matches!(self, Self::Pinned { .. }) + } + + /// Returns the raw value bytes. + #[must_use] + pub fn value(&self) -> &[u8] { + self.as_ref() + } + + /// Returns the length of the value in bytes. + #[must_use] + pub fn len(&self) -> usize { + self.as_ref().len() + } + + /// Returns `true` if the value is empty. + #[must_use] + pub fn is_empty(&self) -> bool { + self.as_ref().is_empty() + } + + /// Converts this `PinnableSlice` into an owned `UserValue`. + /// + /// For the `Pinned` variant, the `Block` is dropped but the returned + /// `Slice` still shares the same `ByteView` backing allocation. + /// For the `Owned` variant, the value is returned directly. + #[must_use] + pub fn into_value(self) -> UserValue { + match self { + Self::Pinned { value, .. } => value, + Self::Owned(v) => v, + } + } +} + +impl std::fmt::Debug for PinnableSlice { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Pinned { value, .. } => { + f.debug_struct("Pinned").field("len", &value.len()).finish() + } + Self::Owned(v) => f.debug_tuple("Owned").field(&v.len()).finish(), + } + } +} + +impl AsRef<[u8]> for PinnableSlice { + fn as_ref(&self) -> &[u8] { + match self { + Self::Pinned { value, .. } => value.as_ref(), + Self::Owned(v) => v.as_ref(), + } + } +} + +impl PartialEq<[u8]> for PinnableSlice { + fn eq(&self, other: &[u8]) -> bool { + self.as_ref() == other + } +} + +impl PartialEq<&[u8]> for PinnableSlice { + fn eq(&self, other: &&[u8]) -> bool { + self.as_ref() == *other + } +} + +impl From for UserValue { + fn from(ps: PinnableSlice) -> Self { + ps.into_value() + } +} diff --git a/src/table/mod.rs b/src/table/mod.rs index 952d24bac..c2cd85736 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -93,6 +93,28 @@ impl std::fmt::Debug for Table { } } +/// Result of a bloom filter check. +enum BloomResult { + /// Bloom says key is definitely absent — skip point read. + Skip, + /// Point read should proceed. + Proceed { + /// Whether a filter was present (used for metrics accounting). + has_filter: bool, + }, +} + +impl BloomResult { + fn should_skip(&self) -> bool { + matches!(self, Self::Skip) + } + + #[cfg(feature = "metrics")] + fn has_filter(&self) -> bool { + matches!(self, Self::Proceed { has_filter: true }) + } +} + impl Table { #[must_use] pub fn global_seqno(&self) -> SeqNo { @@ -234,27 +256,25 @@ impl Table { self.metadata.file_size } - pub fn get( - &self, - key: &[u8], - seqno: SeqNo, - key_hash: u64, - ) -> crate::Result> { - #[cfg(feature = "metrics")] - use std::sync::atomic::Ordering::Relaxed; - - // Translate seqno to "our" seqno - let seqno = seqno.saturating_sub(self.global_seqno()); - - if self.metadata.seqnos.0 >= seqno { - return Ok(None); - } + /// Loads the filter block (if any) and checks the bloom filter. + /// + /// Returns `Ok(BloomResult::Skip)` if the bloom filter says the key is definitely absent + /// (and updates metrics accordingly), `Ok(BloomResult::Proceed { has_filter })` otherwise. + fn check_bloom(&self, key: &[u8], key_hash: u64) -> crate::Result { + debug_assert_eq!( + key_hash, + crate::table::filter::standard_bloom::Builder::get_hash(key), + "key_hash must match the hash of the provided key" + ); let filter_block = if let Some(block) = &self.pinned_filter_block { Some(Cow::Borrowed(block)) } else if let Some(filter_idx) = &self.pinned_filter_index { let mut iter = filter_idx.iter(self.comparator.clone()); - iter.seek(key, seqno); + // Filter partitions are written with seqno=0, making the seqno + // parameter irrelevant to partition selection. Use MAX_SEQNO + // consistently to match the index-block seek in Table::range(). + iter.seek(key, crate::seqno::MAX_SEQNO); if let Some(filter_block_handle) = iter.next() { let filter_block_handle = filter_block_handle.materialize(filter_idx.as_slice()); @@ -262,15 +282,20 @@ impl Table { let block = self.load_block( &filter_block_handle.into_inner(), BlockType::Filter, - CompressionType::None, // NOTE: We never write a filter block with compression + CompressionType::None, #[cfg(zstd_any)] None, )?; - let block = FilterBlock::new(block); - - Some(Cow::Owned(block)) + Some(Cow::Owned(FilterBlock::new(block))) } else { - None + // Key sorts past the last filter partition — definite miss. + #[cfg(feature = "metrics")] + { + use std::sync::atomic::Ordering::Relaxed; + self.metrics.filter_queries.fetch_add(1, Relaxed); + self.metrics.io_skipped_by_filter.fetch_add(1, Relaxed); + } + return Ok(BloomResult::Skip); } } else if let Some(_filter_tli_handle) = &self.regions.filter_tli { unimplemented!("unpinned filter TLI not supported"); @@ -278,55 +303,124 @@ impl Table { let block = self.load_block( filter_block_handle, BlockType::Filter, - CompressionType::None, // NOTE: We never write a filter block with compression + CompressionType::None, #[cfg(zstd_any)] None, )?; - let block = FilterBlock::new(block); - - Some(Cow::Owned(block)) + Some(Cow::Owned(FilterBlock::new(block))) } else { None }; + let has_filter = filter_block.is_some(); + if let Some(filter_block) = &filter_block && !filter_block.maybe_contains_hash(key_hash)? { #[cfg(feature = "metrics")] { + use std::sync::atomic::Ordering::Relaxed; self.metrics.filter_queries.fetch_add(1, Relaxed); self.metrics.io_skipped_by_filter.fetch_add(1, Relaxed); } + return Ok(BloomResult::Skip); + } + + Ok(BloomResult::Proceed { has_filter }) + } + + pub fn get( + &self, + key: &[u8], + seqno: SeqNo, + key_hash: u64, + ) -> crate::Result> { + let global_seqno = self.global_seqno(); + let seqno = seqno.saturating_sub(global_seqno); + + if self.metadata.seqnos.0 >= seqno { + return Ok(None); + } + let bloom = self.check_bloom(key, key_hash)?; + if bloom.should_skip() { return Ok(None); } - let item = self.point_read(key, seqno); + let item = self.point_read(key, seqno)?; - #[cfg(not(feature = "metrics"))] + // Translate table-local seqno back to global coordinate so callers + // can compare across tables/memtables (L0 best-selection, RT suppression). + let item = item.map(|mut iv| { + iv.key.seqno = iv.key.seqno.saturating_add(global_seqno); + iv + }); + + #[cfg(feature = "metrics")] { - item + use std::sync::atomic::Ordering::Relaxed; + // NOTE: `check_bloom()` accounts for lookups rejected by the filter + // (skip I/O entirely). This path accounts for negative point lookups + // that still reached storage even though a filter was present, so + // `filter_queries` remains interpretable alongside `filter_efficiency()`. + // https://github.com/fjall-rs/lsm-tree/issues/246 + if item.is_none() && bloom.has_filter() { + self.metrics.filter_queries.fetch_add(1, Relaxed); + } + } + + Ok(item) + } + + /// Like [`Table::get`], but also returns the [`Block`] containing the value. + /// + /// Used by `get_pinned()` to construct `PinnableSlice::Pinned`. + /// + pub(crate) fn get_with_block( + &self, + key: &[u8], + seqno: SeqNo, + key_hash: u64, + ) -> crate::Result> { + let global_seqno = self.global_seqno(); + let seqno = seqno.saturating_sub(global_seqno); + + if self.metadata.seqnos.0 >= seqno { + return Ok(None); + } + + let bloom = self.check_bloom(key, key_hash)?; + if bloom.should_skip() { + return Ok(None); } + let result = self.point_read_with_block(key, seqno)?; + + // Translate table-local seqno back to global coordinate (see Table::get). + let result = result.map(|(mut iv, block)| { + iv.key.seqno = iv.key.seqno.saturating_add(global_seqno); + (iv, block) + }); + #[cfg(feature = "metrics")] { - // NOTE: Only increment the filter queries when the filter reported a miss - // and we actually waste an I/O for a non-existing item. - // Otherwise, the filter efficiency decreases whenever an item is hit. - // https://github.com/fjall-rs/lsm-tree/issues/246 - item.inspect(|maybe_kv| { - if maybe_kv.is_none() && filter_block.is_some() { - self.metrics.filter_queries.fetch_add(1, Relaxed); - } - }) + use std::sync::atomic::Ordering::Relaxed; + if result.is_none() && bloom.has_filter() { + self.metrics.filter_queries.fetch_add(1, Relaxed); + } } + + Ok(result) } - // TODO: maybe we can skip Fuse costs of the user key - // TODO: because we just want to return the value - // TODO: we would need to return something like ValueType + Value - // TODO: so the caller can decide whether to return the value or not - fn point_read(&self, key: &[u8], seqno: SeqNo) -> crate::Result> { + /// Shared block-index walk for point reads. Returns the matching entry + /// together with the [`DataBlock`] it was found in, so callers that need + /// the block (e.g. for [`PinnableSlice`]) can keep it alive. + fn point_read_inner( + &self, + key: &[u8], + seqno: SeqNo, + ) -> crate::Result> { let Some(iter) = self.block_index.forward_reader(key, seqno) else { return Ok(None); }; @@ -334,10 +428,10 @@ impl Table { for block_handle in iter { let block_handle = block_handle?; - let block = self.load_data_block(block_handle.as_ref())?; + let data_block = self.load_data_block(block_handle.as_ref())?; - if let Some(item) = block.point_read(key, seqno, &self.comparator)? { - return Ok(Some(item)); + if let Some(item) = data_block.point_read(key, seqno, &self.comparator)? { + return Ok(Some((item, data_block))); } // NOTE: If the last block key is higher than ours, @@ -350,6 +444,25 @@ impl Table { Ok(None) } + fn point_read(&self, key: &[u8], seqno: SeqNo) -> crate::Result> { + self.point_read_inner(key, seqno) + .map(|opt| opt.map(|(iv, _)| iv)) + } + + /// Like [`Table::point_read`], but also returns the underlying [`Block`]. + /// + /// Holding on to the returned [`Block`] (e.g. for [`PinnableSlice`]) keeps the + /// block data alive while the value is in use, but does not guarantee that the + /// cache will retain its own entry for that block. + fn point_read_with_block( + &self, + key: &[u8], + seqno: SeqNo, + ) -> crate::Result> { + self.point_read_inner(key, seqno) + .map(|opt| opt.map(|(iv, db)| (iv, db.inner))) + } + /// Creates a scanner over the `Table`. /// /// The scanner is ĺogically the same as a normal iter(), diff --git a/src/tree/mod.rs b/src/tree/mod.rs index b344757df..4b3d61fe5 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -63,6 +63,66 @@ impl IterGuard for Guard { } } +/// Trait for monomorphized table point-read results. +/// +/// Allows `find_in_tables` to operate generically over `InternalValue` (for +/// `get`) and `(InternalValue, Block)` (for `get_pinned`), generating optimal +/// code for each path without runtime dispatch or extra refcount overhead. +trait TablePointLookup: Sized { + fn lookup( + table: &Table, + key: &[u8], + seqno: SeqNo, + key_hash: u64, + ) -> crate::Result>; + fn entry_seqno(&self) -> SeqNo; + fn filter_tombstone(self) -> Option; +} + +/// Lookup result for standard `get()` — entry only, no block retained. +type TableEntry = InternalValue; + +impl TablePointLookup for TableEntry { + fn lookup( + table: &Table, + key: &[u8], + seqno: SeqNo, + key_hash: u64, + ) -> crate::Result> { + table.get(key, seqno, key_hash) + } + + fn entry_seqno(&self) -> SeqNo { + self.key.seqno + } + + fn filter_tombstone(self) -> Option { + ignore_tombstone_value(self) + } +} + +/// Lookup result for `get_pinned()` — entry + block for zero-copy pinning. +type TableEntryWithBlock = (InternalValue, crate::table::Block); + +impl TablePointLookup for TableEntryWithBlock { + fn lookup( + table: &Table, + key: &[u8], + seqno: SeqNo, + key_hash: u64, + ) -> crate::Result> { + table.get_with_block(key, seqno, key_hash) + } + + fn entry_seqno(&self) -> SeqNo { + self.0.key.seqno + } + + fn filter_tombstone(self) -> Option { + ignore_tombstone_value(self.0).map(|iv| (iv, self.1)) + } +} + fn ignore_tombstone_value(item: InternalValue) -> Option { if item.is_tombstone() { None @@ -710,24 +770,138 @@ impl AbstractTree for Tree { ) } + fn get_pinned>( + &self, + key: K, + seqno: SeqNo, + ) -> crate::Result> { + let key = key.as_ref(); + + #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")] + let super_version = self + .version_history + .read() + .expect("lock is poisoned") + .get_version_for_snapshot(seqno); + + Self::resolve_or_passthrough_pinned( + &super_version, + key, + seqno, + self.config.merge_operator.as_ref(), + self.config.comparator.as_ref(), + ) + } + + #[expect( + clippy::indexing_slicing, + reason = "indices are generated from 0..n range, always in bounds" + )] fn multi_get>( &self, keys: impl IntoIterator, seqno: SeqNo, ) -> crate::Result>> { let super_version = self.get_version_for_snapshot(seqno); + let comparator = self.config.comparator.as_ref(); + let merge_operator = self.config.merge_operator.as_ref(); + + // Collect keys up front; bloom hashes computed lazily in Phase 2 + let keys: Vec<_> = keys.into_iter().collect(); + let n = keys.len(); + if n == 0 { + return Ok(Vec::new()); + } - keys.into_iter() - .map(|key| { - Self::resolve_or_passthrough( - &super_version, - key.as_ref(), - seqno, - self.config.merge_operator.as_ref(), - self.config.comparator.as_ref(), - ) - }) - .collect() + // For small batches, use the simple per-key path + if n <= 2 { + return keys + .iter() + .map(|key| { + Self::resolve_or_passthrough( + &super_version, + key.as_ref(), + seqno, + merge_operator, + comparator, + ) + }) + .collect(); + } + + // Phase 1: Check active + sealed memtables (unsorted — memtable lookup + // is O(log n) per key regardless of order, skip sort+hash overhead for + // memtable-only batches). + let mut internal_entries: Vec> = vec![None; n]; + let mut remaining: Vec = Vec::with_capacity(n); + + for idx in 0..n { + let key = keys[idx].as_ref(); + + // Active memtable + if let Some(entry) = super_version.active_memtable.get(key, seqno) { + internal_entries[idx] = Some(entry); + continue; + } + + // Sealed memtables (newest first) + if let Some(entry) = + Self::get_internal_entry_from_sealed_memtables(&super_version, key, seqno) + { + internal_entries[idx] = Some(entry); + continue; + } + + remaining.push(idx); + } + + // Phase 2: Sort remaining keys + compute bloom hashes only if needed + // (memtable-only batches skip this entirely). + if !remaining.is_empty() { + remaining.sort_by(|&a, &b| comparator.compare(keys[a].as_ref(), keys[b].as_ref())); + + // Build (idx, hash) pairs only for miss keys — O(remaining) not O(n). + let miss_keys: Vec<(usize, u64)> = remaining + .iter() + .map(|&idx| { + let hash = + crate::table::filter::standard_bloom::Builder::get_hash(keys[idx].as_ref()); + (idx, hash) + }) + .collect(); + + Self::batch_get_from_tables( + &super_version.version, + &keys, + miss_keys, + seqno, + comparator, + &mut internal_entries, + )?; + } + + // Phase 3: Resolve entries (tombstones, RT suppression, merge operands) + let mut results = vec![None; n]; + for idx in 0..n { + let entry = internal_entries[idx].take(); + results[idx] = Self::resolve_entry( + &super_version, + keys[idx].as_ref(), + entry, + seqno, + merge_operator, + comparator, + )?; + } + + Ok(results) + } + + fn apply_batch(&self, batch: crate::WriteBatch, seqno: SeqNo) -> crate::Result<(u64, u64)> { + if batch.is_empty() { + return Ok((0, self.active_memtable().size())); + } + Ok(self.append_batch(batch.materialize(seqno)?)) } fn insert, V: Into>( @@ -816,6 +990,122 @@ impl Tree { } } + /// Shared post-lookup resolution for `get_pinned` and `multi_get`: + /// tombstone filter, range-tombstone suppression, merge operand resolution. + /// Returns `None` if entry is tombstoned or suppressed. + fn resolve_pinned_entry( + super_version: &SuperVersion, + key: &[u8], + entry: InternalValue, + seqno: SeqNo, + merge_operator: Option<&Arc>, + comparator: &dyn crate::comparator::UserComparator, + wrap: impl FnOnce(UserValue) -> crate::PinnableSlice, + ) -> crate::Result> { + use crate::PinnableSlice; + + let Some(entry) = ignore_tombstone_value(entry) else { + return Ok(None); + }; + if Self::is_suppressed_by_range_tombstones( + super_version, + key, + entry.key.seqno, + seqno, + comparator, + ) { + return Ok(None); + } + if entry.key.value_type == ValueType::MergeOperand + && let Some(merge_op) = merge_operator + { + // Merge resolution always produces Owned (pipeline result). + return Self::resolve_merge_via_pipeline( + super_version.clone(), + key, + seqno, + Arc::clone(merge_op), + ) + .map(|opt| opt.map(PinnableSlice::owned)); + } + Ok(Some(wrap(entry.value))) + } + + /// Like [`Tree::resolve_or_passthrough`], but returns a [`PinnableSlice`] + /// that may keep the decompressed block buffer alive. + fn resolve_or_passthrough_pinned( + super_version: &SuperVersion, + key: &[u8], + seqno: SeqNo, + merge_operator: Option<&Arc>, + comparator: &dyn crate::comparator::UserComparator, + ) -> crate::Result> { + use crate::PinnableSlice; + + // Check memtables first — always Owned + if let Some(entry) = super_version.active_memtable.get(key, seqno) { + return Self::resolve_pinned_entry( + super_version, + key, + entry, + seqno, + merge_operator, + comparator, + PinnableSlice::owned, + ); + } + + // Sealed memtables — always Owned + if let Some(entry) = + Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno) + { + return Self::resolve_pinned_entry( + super_version, + key, + entry, + seqno, + merge_operator, + comparator, + PinnableSlice::owned, + ); + } + + // Tables — Pinned (value shares decompressed block buffer) + let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key); + + if let Some((entry, block)) = Self::get_internal_entry_with_block_from_tables( + &super_version.version, + key, + seqno, + key_hash, + comparator, + )? { + return Self::resolve_pinned_entry( + super_version, + key, + entry, + seqno, + merge_operator, + comparator, + |value| PinnableSlice::pinned(block, value), + ); + } + + Ok(None) + } + + /// Like [`Tree::get_internal_entry_from_tables`], but returns the block + /// along with the entry for pinned zero-copy access. + fn get_internal_entry_with_block_from_tables( + version: &Version, + key: &[u8], + seqno: SeqNo, + key_hash: u64, + comparator: &dyn crate::comparator::UserComparator, + ) -> crate::Result> { + Self::find_in_tables::(version, key, seqno, key_hash, comparator) + } + /// Resolves merge operands for a point read via a bloom-filtered iterator pipeline. /// /// Builds a single-key range (`key..=key`) with bloom pre-filtering, wraps @@ -825,7 +1115,7 @@ impl Tree { /// /// Bloom pre-filtering can reject many disk tables at the filter level, /// which typically improves point-read performance on deep LSM trees. - fn resolve_merge_via_pipeline( + pub(crate) fn resolve_merge_via_pipeline( version: SuperVersion, key: &[u8], seqno: SeqNo, @@ -1001,7 +1291,7 @@ impl Tree { /// Checks if a key at `key_seqno` is suppressed by any range tombstone /// in the active memtable, sealed memtables, or SST tables, visible at `read_seqno`. - fn is_suppressed_by_range_tombstones( + pub(crate) fn is_suppressed_by_range_tombstones( super_version: &SuperVersion, key: &[u8], key_seqno: SeqNo, @@ -1066,43 +1356,178 @@ impl Tree { false } + /// Resolves a single internal entry into a user value, handling tombstones, + /// range tombstone suppression, and merge operand resolution. + /// Resolves an entry for `multi_get`: tombstone filter, RT suppression, + /// merge operand resolution. Delegates to [`resolve_pinned_entry`] with + /// `Owned` wrapping, then extracts the value. + fn resolve_entry( + super_version: &SuperVersion, + key: &[u8], + entry: Option, + seqno: SeqNo, + merge_operator: Option<&Arc>, + comparator: &dyn crate::comparator::UserComparator, + ) -> crate::Result> { + let Some(entry) = entry else { + return Ok(None); + }; + Self::resolve_pinned_entry( + super_version, + key, + entry, + seqno, + merge_operator, + comparator, + crate::PinnableSlice::owned, + ) + .map(|opt| opt.map(crate::PinnableSlice::into_value)) + } + + /// Queries tables for multiple keys using sorted access order. + /// + /// `miss_keys` contains `(key_index, bloom_hash)` pairs for keys not yet + /// found, in comparator-sorted order. Keys are looked up individually via + /// `Table::get`, but sorted order improves I/O locality. The precomputed + /// bloom hash in each pair is reused across all table probes. Per-SST + /// batched bloom checks and block walks are tracked in `#223`. + #[expect( + clippy::indexing_slicing, + reason = "miss_keys entries carry batch-local indices; callers must pass a results slice aligned with keys" + )] + pub(crate) fn batch_get_from_tables>( + version: &Version, + keys: &[K], + miss_keys: Vec<(usize, u64)>, + seqno: SeqNo, + comparator: &dyn crate::comparator::UserComparator, + results: &mut [Option], + ) -> crate::Result<()> { + debug_assert_eq!(results.len(), keys.len()); + debug_assert!(miss_keys.iter().all(|&(i, _)| i < keys.len())); + + // Consume the caller's Vec directly — no allocation+copy. + let mut still_remaining = miss_keys; + + for (level_idx, level) in version.iter_levels().enumerate() { + if still_remaining.is_empty() { + break; + } + + if level_idx == 0 { + // L0: must check ALL runs, keep highest seqno per key. + // Track keys at the seqno ceiling (seqno + 1 == read_seqno) — + // no other L0 run can beat them, so skip in subsequent runs. + // Bitmap: idx is always in 0..keys.len(), dense enough for Vec. + let mut at_ceiling = vec![false; keys.len()]; + + for run in level.iter() { + for &(idx, hash) in &still_remaining { + if at_ceiling[idx] { + continue; + } + let key = keys[idx].as_ref(); + if let Some(table) = run.get_for_key_cmp(key, comparator) + && let Some(item) = table.get(key, seqno, hash)? + { + match &results[idx] { + Some(current) if current.key.seqno >= item.key.seqno => {} + _ => { + if item.key.seqno.checked_add(1) == Some(seqno) { + at_ceiling[idx] = true; + } + results[idx] = Some(item); + } + } + } + } + } + + // Remove found keys (both values and tombstones) + still_remaining.retain(|&(idx, _)| results[idx].is_none()); + } else { + // L1+ runs have non-overlapping key ranges within a level. + // Once get_for_key_cmp identifies a covering run for a key, + // no other run in this level can contain it. We split into: + // - `not_covered`: key range didn't match any run yet → try next run + // - `covered_miss`: covering run found but table.get returned None + // (bloom false negative or key absent) → skip remaining runs in + // this level, but keep for lower levels + let mut covered_miss: Vec<(usize, u64)> = Vec::new(); + + for run in level.iter() { + let mut not_covered = Vec::with_capacity(still_remaining.len()); + for &(idx, hash) in &still_remaining { + let key = keys[idx].as_ref(); + if let Some(table) = run.get_for_key_cmp(key, comparator) { + if let Some(item) = table.get(key, seqno, hash)? { + results[idx] = Some(item); + } else { + // Covering run found, but key not present — no other + // run in this level can have it. Keep for lower levels. + covered_miss.push((idx, hash)); + } + } else { + not_covered.push((idx, hash)); + } + } + still_remaining = not_covered; + } + + // Merge back: keys without a covering run + keys with a covering + // miss both proceed to lower levels. Re-sort to preserve + // comparator order for the next level's sequential scan. + let needs_sort = !covered_miss.is_empty(); + still_remaining.extend(covered_miss); + if needs_sort { + still_remaining.sort_by(|&(a, _), &(b, _)| { + comparator.compare(keys[a].as_ref(), keys[b].as_ref()) + }); + } + } + } + + Ok(()) + } + fn get_internal_entry_from_tables( version: &Version, key: &[u8], seqno: SeqNo, comparator: &dyn crate::comparator::UserComparator, ) -> crate::Result> { - // NOTE: Create key hash for hash sharing - // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/ let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key); + Self::find_in_tables::(version, key, seqno, key_hash, comparator) + } - // L0: optimize_runs may merge disjoint SSTs from different temporal - // epochs into the same run, so run iteration order does not guarantee - // newest-first. We must check ALL runs and keep the highest seqno. - // - // L1+: key ranges within a level do not overlap, so at most one run - // can contain the key — return on the first match. - // - // Once a level yields a match, lower levels cannot contain newer data, - // so we stop early. + /// Generic level-walk for point reads, monomorphized over the lookup result type. + /// + /// L0: check ALL runs, keep highest seqno (runs may not be newest-first). + /// L1+: at most one run contains the key — return on first match. + /// Once a level yields a match, lower levels cannot have newer data. + fn find_in_tables( + version: &Version, + key: &[u8], + seqno: SeqNo, + key_hash: u64, + comparator: &dyn crate::comparator::UserComparator, + ) -> crate::Result> { for (level_idx, level) in version.iter_levels().enumerate() { if level_idx == 0 { - let mut best: Option = None; + let mut best: Option = None; for run in level.iter() { if let Some(table) = run.get_for_key_cmp(key, comparator) - && let Some(item) = table.get(key, seqno, key_hash)? + && let Some(item) = T::lookup(table, key, seqno, key_hash)? { match &best { - // >= keeps first-seen on tie. Seqno is monotonically - // unique per write; equal seqno for the same user key - // across tables is impossible in normal operation. - Some(current) if current.key.seqno >= item.key.seqno => {} + Some(current) if current.entry_seqno() >= item.entry_seqno() => {} _ => { - // Short-circuit: seqno is the read horizon, so no - // other run in this level can have a higher one. - if item.key.seqno == seqno { - return Ok(ignore_tombstone_value(item)); + // Short-circuit: point reads use exclusive upper bound, + // so the highest visible seqno is read_seqno - 1. + // If matched, no other L0 run can have a higher one. + if item.entry_seqno().checked_add(1) == Some(seqno) { + return Ok(item.filter_tombstone()); } best = Some(item); } @@ -1111,14 +1536,18 @@ impl Tree { } if let Some(entry) = best { - return Ok(ignore_tombstone_value(entry)); + return Ok(entry.filter_tombstone()); } } else { + // L1+ runs have non-overlapping key ranges. Once we find the + // covering run (get_for_key_cmp returns Some), no other run in + // this level can contain the key — break regardless of hit/miss. for run in level.iter() { - if let Some(table) = run.get_for_key_cmp(key, comparator) - && let Some(item) = table.get(key, seqno, key_hash)? - { - return Ok(ignore_tombstone_value(item)); + if let Some(table) = run.get_for_key_cmp(key, comparator) { + if let Some(item) = T::lookup(table, key, seqno, key_hash)? { + return Ok(item.filter_tombstone()); + } + break; } } } @@ -1127,7 +1556,7 @@ impl Tree { Ok(None) } - fn get_internal_entry_from_sealed_memtables( + pub(crate) fn get_internal_entry_from_sealed_memtables( super_version: &SuperVersion, key: &[u8], seqno: SeqNo, @@ -1347,6 +1776,27 @@ impl Tree { .insert(value) } + /// Adds multiple items to the active memtable in bulk. + /// + /// Acquires the version-history lock once and delegates to + /// [`Memtable::insert_batch`] for batch size accounting. + /// + /// Returns the total bytes added and new size of the memtable. + #[doc(hidden)] + #[must_use] + pub(crate) fn append_batch(&self, items: Vec) -> (u64, u64) { + // Hold the read guard for the entire insert to prevent rotate_memtable() + // from sealing this memtable mid-batch (which could cause data loss if + // a concurrent flush persists only a prefix of the batch). + #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")] + self.version_history + .read() + .expect("lock is poisoned") + .latest_version() + .active_memtable + .insert_batch(items) + } + /// Recovers previous state, by loading the level manifest, tables and blob files. /// /// # Errors diff --git a/src/write_batch.rs b/src/write_batch.rs new file mode 100644 index 000000000..d6f2089ce --- /dev/null +++ b/src/write_batch.rs @@ -0,0 +1,193 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! Write batch for bulk memtable insertion with shared seqno. +//! +//! A [`WriteBatch`] collects multiple write operations (insert, remove, merge) +//! and applies them to the active memtable with a single seqno. +//! This reduces per-operation overhead: +//! +//! - **One version-history lock** acquisition instead of N +//! - **Batch size accounting**: single `fetch_add` for total size +//! - **Shared seqno**: all entries in a batch share the same sequence number +//! +//! **Visibility contract:** entries are inserted into the memtable one at a time +//! and become individually visible to concurrent readers as they are written. +//! Atomic batch visibility requires the **caller** to publish the batch seqno +//! (via `visible_seqno.fetch_max(batch_seqno + 1)`) only **after** +//! [`AbstractTree::apply_batch`] returns. This is the same pattern used by +//! fjall's keyspace for single-writer batches. + +use crate::{UserKey, UserValue, ValueType, value::InternalValue}; + +/// A single entry in a [`WriteBatch`]. +#[derive(Clone, Debug)] +enum WriteBatchEntry { + /// Insert or update a key-value pair. + Insert { key: UserKey, value: UserValue }, + + /// Delete a key (standard tombstone). + Remove { key: UserKey }, + + /// Delete a key (weak/single-delete tombstone). + RemoveWeak { key: UserKey }, + + /// Write a merge operand for a key. + Merge { key: UserKey, value: UserValue }, +} + +/// Batch of write operations applied with a shared seqno. +/// +/// **Duplicate keys:** all entries receive the same seqno. The memtable +/// skiplist orders by `(user_key, Reverse(seqno))` — `value_type` does NOT +/// break ties. Two entries with the same `(user_key, seqno)` compare equal +/// regardless of operation type, so one may silently overwrite the other. +/// +/// - **Repeated `merge()` on the same key:** safe. All merge operands are +/// collected during reads regardless of skiplist position. +/// - **Mixed ops on the same key** (e.g. `insert` + `remove`): not allowed. +/// `materialize()` rejects these batches with `Error::MixedOperationBatch` +/// in all builds. Callers must canonicalize mixed-op duplicates into a +/// single final operation before batching. +/// +/// # Examples +/// +/// ``` +/// use lsm_tree::WriteBatch; +/// +/// let mut batch = WriteBatch::new(); +/// batch.insert("key1", "value1"); +/// batch.insert("key2", "value2"); +/// batch.remove("key3"); +/// +/// assert_eq!(batch.len(), 3); +/// assert!(!batch.is_empty()); +/// ``` +#[derive(Clone, Debug, Default)] +pub struct WriteBatch { + entries: Vec, +} + +impl WriteBatch { + /// Creates an empty write batch. + #[must_use] + pub fn new() -> Self { + Self { + entries: Vec::new(), + } + } + + /// Creates an empty write batch with the given capacity. + #[must_use] + pub fn with_capacity(capacity: usize) -> Self { + Self { + entries: Vec::with_capacity(capacity), + } + } + + /// Inserts a key-value pair into the batch. + pub fn insert, V: Into>(&mut self, key: K, value: V) { + self.entries.push(WriteBatchEntry::Insert { + key: key.into(), + value: value.into(), + }); + } + + /// Adds a delete (tombstone) for a key. + pub fn remove>(&mut self, key: K) { + self.entries + .push(WriteBatchEntry::Remove { key: key.into() }); + } + + /// Adds a weak delete (single-delete tombstone) for a key. + pub fn remove_weak>(&mut self, key: K) { + self.entries + .push(WriteBatchEntry::RemoveWeak { key: key.into() }); + } + + /// Adds a merge operand for a key. + /// + /// Multiple `merge()` calls for the same key within one batch are supported: + /// they produce distinct merge operands that are resolved together during + /// reads (via the configured [`MergeOperator`](crate::MergeOperator)). + /// The duplicate-key warning in the struct doc applies to mixed operation + /// types (e.g. `insert` + `remove` on the same key), not to multiple merges. + pub fn merge, V: Into>(&mut self, key: K, value: V) { + self.entries.push(WriteBatchEntry::Merge { + key: key.into(), + value: value.into(), + }); + } + + /// Returns the number of operations in the batch. + #[must_use] + pub fn len(&self) -> usize { + self.entries.len() + } + + /// Returns `true` if the batch contains no operations. + #[must_use] + pub fn is_empty(&self) -> bool { + self.entries.is_empty() + } + + /// Clears the batch, removing all entries. + pub fn clear(&mut self) { + self.entries.clear(); + } + + /// Materializes all entries into [`InternalValue`]s with the given seqno. + /// + /// # Errors + /// + /// Returns [`Error::MixedOperationBatch`](crate::Error::MixedOperationBatch) + /// if any user key appears with differing operation types (e.g. insert + remove), + /// which would make equal-key entries with different operation types ambiguous + /// to later reads and merges. + #[doc(hidden)] + pub(crate) fn materialize(self, seqno: crate::SeqNo) -> crate::Result> { + // Reject mixed-op duplicates unconditionally — `InternalKey` ordering + // ties on `(user_key, seqno)` without `value_type` as tie-breaker, + // making the read/compaction outcome ambiguous. + { + let mut seen: std::collections::HashMap<&[u8], ValueType, rustc_hash::FxBuildHasher> = + std::collections::HashMap::with_capacity_and_hasher( + self.entries.len(), + rustc_hash::FxBuildHasher, + ); + for entry in &self.entries { + let (key_bytes, vtype): (&[u8], _) = match entry { + WriteBatchEntry::Insert { key, .. } => (key.as_ref(), ValueType::Value), + WriteBatchEntry::Remove { key } => (key.as_ref(), ValueType::Tombstone), + WriteBatchEntry::RemoveWeak { key } => (key.as_ref(), ValueType::WeakTombstone), + WriteBatchEntry::Merge { key, .. } => (key.as_ref(), ValueType::MergeOperand), + }; + if let Some(&prev_type) = seen.get(key_bytes) { + if prev_type != vtype { + return Err(crate::Error::MixedOperationBatch); + } + } else { + seen.insert(key_bytes, vtype); + } + } + } + + Ok(self + .entries + .into_iter() + .map(|entry| match entry { + WriteBatchEntry::Insert { key, value } => { + InternalValue::from_components(key, value, seqno, ValueType::Value) + } + WriteBatchEntry::Remove { key } => InternalValue::new_tombstone(key, seqno), + WriteBatchEntry::RemoveWeak { key } => { + InternalValue::new_weak_tombstone(key, seqno) + } + WriteBatchEntry::Merge { key, value } => { + InternalValue::new_merge_operand(key, value, seqno) + } + }) + .collect()) + } +} diff --git a/tests/multi_get.rs b/tests/multi_get.rs index d421d63f6..d0b275811 100644 --- a/tests/multi_get.rs +++ b/tests/multi_get.rs @@ -1,6 +1,8 @@ use lsm_tree::{ - AbstractTree, Config, KvSeparationOptions, SeqNo, SequenceNumberCounter, get_tmp_folder, + AbstractTree, Config, KvSeparationOptions, MergeOperator, SeqNo, SequenceNumberCounter, + UserValue, get_tmp_folder, }; +use std::sync::Arc; use test_log::test; #[test] @@ -235,3 +237,396 @@ fn multi_get_unsorted_and_duplicate_keys() -> lsm_tree::Result<()> { Ok(()) } + +#[test] +fn multi_get_with_range_tombstones() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("a", "val_a", 0); + tree.insert("b", "val_b", 1); + tree.insert("c", "val_c", 2); + tree.insert("d", "val_d", 3); + tree.remove_range("b", "d", 4); // deletes [b, d) + + let results = tree.multi_get(["a", "b", "c", "d"], 5)?; + assert_eq!(results[0].as_deref(), Some(b"val_a".as_slice())); + assert_eq!(results[1], None); // range tombstoned + assert_eq!(results[2], None); // range tombstoned + assert_eq!(results[3].as_deref(), Some(b"val_d".as_slice())); // end is exclusive + + Ok(()) +} + +#[test] +fn multi_get_spanning_multiple_levels() -> lsm_tree::Result<()> { + use lsm_tree::compaction::Leveled; + + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + // Write batch 1 → flush → compact to L1 + for i in 0..10u32 { + tree.insert(format!("key_{i:04}"), format!("batch1_{i}"), u64::from(i)); + } + tree.flush_active_memtable(0)?; + tree.compact(Arc::new(Leveled::default()), SeqNo::MAX)?; + + // Write batch 2 → flush (stays in L0) + for i in 5..15u32 { + tree.insert( + format!("key_{i:04}"), + format!("batch2_{i}"), + u64::from(i + 100), + ); + } + tree.flush_active_memtable(0)?; + + // Write batch 3 → memtable only + for i in 10..20u32 { + tree.insert( + format!("key_{i:04}"), + format!("batch3_{i}"), + u64::from(i + 200), + ); + } + + // multi_get with keys spanning memtable (10-19), L0 (5-14), L1 (0-9) + let keys: Vec = (0..25u32).map(|i| format!("key_{i:04}")).collect(); + let results = tree.multi_get(&keys, SeqNo::MAX)?; + + assert_eq!(results.len(), 25); + + // 0-4: from L1 only (batch1) + for i in 0..5u32 { + assert_eq!( + results[i as usize].as_deref(), + Some(format!("batch1_{i}").as_bytes()), + "key_{i:04} should come from L1", + ); + } + + // 5-9: from L0 (batch2 shadows batch1 in L1) + for i in 5..10u32 { + assert_eq!( + results[i as usize].as_deref(), + Some(format!("batch2_{i}").as_bytes()), + "key_{i:04} should come from L0 (shadowing L1)", + ); + } + + // 10-14: from memtable (batch3 shadows batch2 in L0) + for i in 10..15u32 { + assert_eq!( + results[i as usize].as_deref(), + Some(format!("batch3_{i}").as_bytes()), + "key_{i:04} should come from memtable (shadowing L0)", + ); + } + + // 15-19: from memtable (batch3, no shadowing) + for i in 15..20u32 { + assert_eq!( + results[i as usize].as_deref(), + Some(format!("batch3_{i}").as_bytes()), + "key_{i:04} should come from memtable", + ); + } + + // 20-24: missing + for i in 20..25u32 { + assert_eq!(results[i as usize], None, "key_{i:04} should not exist"); + } + + Ok(()) +} + +#[test] +fn multi_get_large_batch_all_from_disk() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + // Write 500 keys and flush to disk + for i in 0..500u64 { + tree.insert(format!("key_{i:05}"), format!("value_{i}"), i); + } + tree.flush_active_memtable(0)?; + + // Batch get all 500 in reverse order (exercises sorting) + let keys: Vec = (0..500u64).rev().map(|i| format!("key_{i:05}")).collect(); + let results = tree.multi_get(&keys, SeqNo::MAX)?; + + assert_eq!(results.len(), 500); + for (result_idx, i) in (0..500u64).rev().enumerate() { + let expected = format!("value_{i}"); + assert_eq!( + results[result_idx].as_deref(), + Some(expected.as_bytes()), + "mismatch at result index {result_idx} (key_{i:05})", + ); + } + + Ok(()) +} + +struct ConcatMerge; + +impl MergeOperator for ConcatMerge { + fn merge( + &self, + _key: &[u8], + base: Option<&[u8]>, + operands: &[&[u8]], + ) -> lsm_tree::Result { + let mut result = base.unwrap_or_default().to_vec(); + for op in operands { + result.extend_from_slice(op); + } + Ok(result.into()) + } +} + +#[test] +fn multi_get_with_merge_operands() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_merge_operator(Some(Arc::new(ConcatMerge))) + .open()?; + + tree.insert("a", "base_a", 0); + tree.merge("a", "_merged", 1); + tree.insert("b", "val_b", 2); + + // multi_get should resolve merge operand for "a" via pipeline + let results = tree.multi_get(["a", "b", "c"], 3)?; + assert_eq!(results[0].as_deref(), Some(b"base_a_merged".as_slice())); + assert_eq!(results[1].as_deref(), Some(b"val_b".as_slice())); + assert_eq!(results[2], None); + + Ok(()) +} + +#[test] +fn multi_get_with_merge_operands_on_disk() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_merge_operator(Some(Arc::new(ConcatMerge))) + .open()?; + + tree.insert("k1", "A", 0); + tree.merge("k1", "B", 1); + tree.insert("k2", "plain", 2); + tree.flush_active_memtable(0)?; + + // Use 3+ keys to exercise the batch code path (≤2 keys uses simple per-key path) + let results = tree.multi_get(["k1", "k2", "missing"], SeqNo::MAX)?; + assert_eq!(results[0].as_deref(), Some(b"AB".as_slice())); + assert_eq!(results[1].as_deref(), Some(b"plain".as_slice())); + assert_eq!(results[2], None); + + Ok(()) +} + +#[test] +fn multi_get_tombstones_on_disk_with_l0() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + // Flush batch 1 to L0 + for i in 0..5u32 { + tree.insert(format!("key_{i:04}"), format!("val_{i}"), u64::from(i)); + } + tree.flush_active_memtable(0)?; + + // Flush batch 2 to L0 with tombstones for some keys + tree.remove("key_0001", 10); + tree.remove("key_0003", 11); + tree.insert("key_0002", "updated", 12); + tree.flush_active_memtable(0)?; + + // Multi-get: exercises L0 batch path with tombstones + let keys: Vec = (0..5u32).map(|i| format!("key_{i:04}")).collect(); + let results = tree.multi_get(&keys, SeqNo::MAX)?; + + assert_eq!(results[0].as_deref(), Some(b"val_0".as_slice())); + assert_eq!(results[1], None); // tombstoned + assert_eq!(results[2].as_deref(), Some(b"updated".as_slice())); + assert_eq!(results[3], None); // tombstoned + assert_eq!(results[4].as_deref(), Some(b"val_4".as_slice())); + + Ok(()) +} + +#[test] +fn multi_get_blob_tree_range_tombstone_suppresses() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions { + separation_threshold: 1, + ..Default::default() + })) + .open()?; + + let val_a = b"a".repeat(100); + let val_b = b"b".repeat(100); + let val_c = b"c".repeat(100); + let val_d = b"d".repeat(100); + + tree.insert("a", val_a.as_slice(), 0); + tree.insert("b", val_b.as_slice(), 1); + tree.insert("c", val_c.as_slice(), 2); + tree.insert("d", val_d.as_slice(), 3); + tree.flush_active_memtable(0)?; + + // RT suppresses [b, d) + tree.remove_range("b", "d", 4); + + // 4 keys → batch path (>2) + let results = tree.multi_get(["a", "b", "c", "d"], 5)?; + assert_eq!(results[0].as_deref(), Some(val_a.as_slice())); + assert_eq!(results[1], None, "b suppressed by RT"); + assert_eq!(results[2], None, "c suppressed by RT"); + assert_eq!( + results[3].as_deref(), + Some(val_d.as_slice()), + "d at exclusive end" + ); + + Ok(()) +} + +#[test] +fn multi_get_blob_tree_merge_operands() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions { + separation_threshold: 100, + ..Default::default() + })) + .with_merge_operator(Some(Arc::new(ConcatMerge))) + .open()?; + + // Base insert (4 bytes) stays inline (< 100 threshold). + // Merge operands are always inline in BlobTree. + // k2 value (200 bytes) goes to blob. + tree.insert("k1", "BASE", 0); + tree.merge("k1", "_EXT", 1); + tree.insert("k2", b"x".repeat(200).as_slice(), 2); + tree.flush_active_memtable(0)?; + + // 3 keys → batch path; k1 has merge operand on disk + let results = tree.multi_get(["k1", "k2", "k3"], SeqNo::MAX)?; + + assert_eq!(results[0].as_deref(), Some(b"BASE_EXT".as_slice())); + assert_eq!(results[1].as_deref(), Some(b"x".repeat(200).as_slice())); + assert_eq!(results[2], None); + + Ok(()) +} + +#[test] +fn multi_get_blob_tree_memtable_hits_skip_sst() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions { + separation_threshold: 1, + ..Default::default() + })) + .open()?; + + // Some keys on disk + tree.insert("a", b"disk_a".repeat(50).as_slice(), 0); + tree.insert("b", b"disk_b".repeat(50).as_slice(), 1); + tree.flush_active_memtable(0)?; + + // Some keys in memtable (shadow disk) + tree.insert("a", b"mem_a".repeat(50).as_slice(), 2); + tree.insert("c", b"mem_c".repeat(50).as_slice(), 3); + + // 4 keys → batch path; "a" from memtable, "b" from disk, "c" from memtable, "d" missing + let results = tree.multi_get(["a", "b", "c", "d"], SeqNo::MAX)?; + assert_eq!(results[0].as_deref(), Some(b"mem_a".repeat(50).as_slice())); + assert_eq!(results[1].as_deref(), Some(b"disk_b".repeat(50).as_slice())); + assert_eq!(results[2].as_deref(), Some(b"mem_c".repeat(50).as_slice())); + assert_eq!(results[3], None); + + Ok(()) +} + +#[test] +fn multi_get_blob_tree_merge_without_operator_returns_raw() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions { + separation_threshold: 1, + ..Default::default() + })) + .open()?; + + // No merge operator configured — merge operand should return raw value + tree.insert("k1", b"x".repeat(100).as_slice(), 0); + tree.merge("k2", "raw_operand", 1); + tree.insert("k3", b"y".repeat(100).as_slice(), 2); + tree.flush_active_memtable(0)?; + + let results = tree.multi_get(["k1", "k2", "k3"], SeqNo::MAX)?; + assert_eq!(results[0].as_deref(), Some(b"x".repeat(100).as_slice())); + assert_eq!(results[1].as_deref(), Some(b"raw_operand".as_slice())); + assert_eq!(results[2].as_deref(), Some(b"y".repeat(100).as_slice())); + + Ok(()) +} diff --git a/tests/pinnable_slice.rs b/tests/pinnable_slice.rs new file mode 100644 index 000000000..aedeb32f3 --- /dev/null +++ b/tests/pinnable_slice.rs @@ -0,0 +1,465 @@ +use lsm_tree::{ + AbstractTree, AnyTree, Config, KvSeparationOptions, MergeOperator, PinnableSlice, SeqNo, + SequenceNumberCounter, UserValue, get_tmp_folder, +}; +use std::sync::Arc; +use test_log::test; + +fn setup_tree() -> lsm_tree::Result<(AnyTree, tempfile::TempDir)> { + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + Ok((tree, folder)) +} + +struct ConcatMerge; + +impl MergeOperator for ConcatMerge { + fn merge( + &self, + _key: &[u8], + base: Option<&[u8]>, + operands: &[&[u8]], + ) -> lsm_tree::Result { + let mut result = base.unwrap_or_default().to_vec(); + for op in operands { + result.extend_from_slice(op); + } + Ok(result.into()) + } +} + +#[test] +fn get_pinned_memtable_returns_owned() -> lsm_tree::Result<()> { + let (tree, _folder) = setup_tree()?; + + tree.insert("a", "value_a", 0); + + let result = tree.get_pinned("a", 1)?; + assert!(result.is_some()); + + let ps = result.expect("should exist"); + // Memtable values are always Owned + assert!(!ps.is_pinned()); + assert_eq!(ps.value(), b"value_a"); + assert_eq!(ps.len(), 7); + assert!(!ps.is_empty()); + + Ok(()) +} + +#[test] +fn get_pinned_disk_returns_pinned() -> lsm_tree::Result<()> { + let (tree, _folder) = setup_tree()?; + + tree.insert("a", "disk_value", 0); + tree.flush_active_memtable(0)?; + + let result = tree.get_pinned("a", SeqNo::MAX)?; + assert!(result.is_some()); + + let ps = result.expect("should exist"); + // Disk values should be Pinned (block cache) + assert!(ps.is_pinned()); + assert_eq!(ps.value(), b"disk_value"); + + Ok(()) +} + +#[test] +fn get_pinned_missing_key_returns_none() -> lsm_tree::Result<()> { + let (tree, _folder) = setup_tree()?; + + let result = tree.get_pinned("nonexistent", 1)?; + assert!(result.is_none()); + + Ok(()) +} + +#[test] +fn get_pinned_tombstoned_key_returns_none() -> lsm_tree::Result<()> { + let (tree, _folder) = setup_tree()?; + + tree.insert("a", "value", 0); + tree.remove("a", 1); + + let result = tree.get_pinned("a", 2)?; + assert!(result.is_none()); + + Ok(()) +} + +#[test] +fn get_pinned_into_value_conversion() -> lsm_tree::Result<()> { + let (tree, _folder) = setup_tree()?; + + tree.insert("a", "my_value", 0); + + let ps = tree.get_pinned("a", 1)?.expect("should exist"); + let user_value = ps.into_value(); + assert_eq!(&*user_value, b"my_value"); + + Ok(()) +} + +#[test] +fn get_pinned_matches_get() -> lsm_tree::Result<()> { + let (tree, _folder) = setup_tree()?; + + for i in 0..20u32 { + tree.insert(format!("key_{i:04}"), format!("val_{i}"), u64::from(i)); + } + tree.flush_active_memtable(0)?; + + // Insert some in memtable too + for i in 20..30u32 { + tree.insert(format!("key_{i:04}"), format!("val_{i}"), u64::from(i)); + } + + // Verify get_pinned returns same data as get for all keys + for i in 0..35u32 { + let key = format!("key_{i:04}"); + let regular = tree.get(&key, SeqNo::MAX)?; + let pinned = tree.get_pinned(&key, SeqNo::MAX)?; + + match (®ular, &pinned) { + (Some(r), Some(p)) => { + assert_eq!(r.as_ref(), p.value(), "mismatch at key {key}"); + } + (None, None) => {} + _ => panic!("get and get_pinned disagree for key {key}"), + } + } + + Ok(()) +} + +#[test] +fn pinnable_slice_partial_eq() { + let ps = PinnableSlice::owned(b"hello".as_slice().into()); + assert_eq!(ps.value(), b"hello"); + assert!(ps == b"hello".as_slice()); +} + +#[test] +fn pinnable_slice_debug_format() { + let ps = PinnableSlice::owned(b"hello".as_slice().into()); + let debug = format!("{ps:?}"); + assert!(debug.contains("Owned")); + + // Clone + let ps2 = ps.clone(); + assert_eq!(ps2.value(), b"hello"); +} + +#[test] +fn get_pinned_blob_tree_returns_owned() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions { + separation_threshold: 1, + ..Default::default() + })) + .open()?; + + let big_val = b"x".repeat(500); + tree.insert("a", big_val.as_slice(), 0); + tree.flush_active_memtable(0)?; + + // BlobTree uses default get_pinned impl → always Owned + let ps = tree.get_pinned("a", SeqNo::MAX)?; + assert!(ps.is_some()); + let ps = ps.expect("should exist"); + assert!(!ps.is_pinned()); // blob-resolved values are Owned + assert_eq!(ps.value(), big_val.as_slice()); + + Ok(()) +} + +#[test] +fn get_pinned_with_range_tombstone_returns_none() -> lsm_tree::Result<()> { + let (tree, _folder) = setup_tree()?; + + tree.insert("a", "val_a", 0); + tree.insert("b", "val_b", 1); + tree.insert("c", "val_c", 2); + tree.remove_range("a", "c", 3); // deletes [a, c) + + let result_a = tree.get_pinned("a", 4)?; + let result_b = tree.get_pinned("b", 4)?; + let result_c = tree.get_pinned("c", 4)?; + + assert!( + result_a.is_none(), + "a should be suppressed by range tombstone" + ); + assert!( + result_b.is_none(), + "b should be suppressed by range tombstone" + ); + assert!( + result_c.is_some(), + "c is at the exclusive end, not suppressed" + ); + assert_eq!(result_c.expect("should exist").value(), b"val_c"); + + Ok(()) +} + +#[test] +fn get_pinned_after_compaction_returns_pinned() -> lsm_tree::Result<()> { + use lsm_tree::compaction::Leveled; + + let (tree, _folder) = setup_tree()?; + + // Write and flush multiple times to create L0 tables + for batch in 0..3u32 { + for i in 0..10u32 { + let key = format!("key_{i:04}"); + let val = format!("val_{batch}_{i}"); + tree.insert(key, val, u64::from(batch * 10 + i)); + } + tree.flush_active_memtable(0)?; + } + + // Compact to push data to L1+ + tree.compact(Arc::new(Leveled::default()), SeqNo::MAX)?; + + // Values from compacted L1+ tables should be Pinned + let ps = tree.get_pinned("key_0005", SeqNo::MAX)?; + assert!(ps.is_some()); + let ps = ps.expect("should exist"); + assert!(ps.is_pinned(), "compacted L1+ value should be Pinned"); + + Ok(()) +} + +#[test] +fn get_pinned_sealed_memtable_returns_owned() -> lsm_tree::Result<()> { + let (tree, _folder) = setup_tree()?; + + tree.insert("a", "val_sealed", 0); + + // Rotate memtable (seals it) without flushing + tree.rotate_memtable(); + + let ps = tree.get_pinned("a", 1)?; + assert!(ps.is_some()); + let ps = ps.expect("should exist"); + // Sealed memtable values are still Owned (in-memory) + assert!(!ps.is_pinned()); + assert_eq!(ps.value(), b"val_sealed"); + + Ok(()) +} + +#[test] +fn get_pinned_disk_exercises_pinned_methods() -> lsm_tree::Result<()> { + let (tree, _folder) = setup_tree()?; + + tree.insert("a", "pinned_value", 0); + tree.flush_active_memtable(0)?; + + let ps = tree.get_pinned("a", SeqNo::MAX)?.expect("should exist"); + assert!(ps.is_pinned()); + + // Exercise len/is_empty/value on Pinned variant + assert_eq!(ps.len(), 12); + assert!(!ps.is_empty()); + assert_eq!(ps.value(), b"pinned_value"); + + // Exercise AsRef<[u8]> on Pinned + let bytes: &[u8] = ps.as_ref(); + assert_eq!(bytes, b"pinned_value"); + + // Exercise PartialEq<&[u8]> on Pinned + assert!(ps == b"pinned_value".as_slice()); + + // Exercise Debug on Pinned + let debug = format!("{ps:?}"); + assert!(debug.contains("Pinned")); + + // Exercise Clone on Pinned + let ps2 = ps.clone(); + assert_eq!(ps2.value(), b"pinned_value"); + assert!(ps2.is_pinned()); + + // Exercise into_value on Pinned + let uv: lsm_tree::UserValue = ps.into_value(); + assert_eq!(&*uv, b"pinned_value"); + + // Exercise From for UserValue on Pinned + let uv2: lsm_tree::UserValue = ps2.into(); + assert_eq!(&*uv2, b"pinned_value"); + + Ok(()) +} + +#[test] +fn get_pinned_empty_value_on_disk() -> lsm_tree::Result<()> { + let (tree, _folder) = setup_tree()?; + + // Insert with empty value + tree.insert("empty", "", 0); + tree.flush_active_memtable(0)?; + + let ps = tree.get_pinned("empty", SeqNo::MAX)?.expect("should exist"); + assert!(ps.is_pinned()); + assert!(ps.is_empty()); + assert_eq!(ps.len(), 0); + + Ok(()) +} + +#[test] +fn get_pinned_tombstone_on_disk_returns_none() -> lsm_tree::Result<()> { + let (tree, _folder) = setup_tree()?; + + tree.insert("a", "value", 0); + tree.remove("a", 1); + tree.flush_active_memtable(0)?; + + // Tombstone on disk — get_pinned should return None + let result = tree.get_pinned("a", SeqNo::MAX)?; + assert!(result.is_none()); + + Ok(()) +} + +#[test] +fn get_pinned_range_tombstone_on_disk_suppresses() -> lsm_tree::Result<()> { + let (tree, _folder) = setup_tree()?; + + tree.insert("a", "v1", 0); + tree.insert("b", "v2", 1); + tree.flush_active_memtable(0)?; + + // Range tombstone in a later SST suppresses older disk values + tree.remove_range("a", "c", 2); + tree.flush_active_memtable(0)?; + + let result_a = tree.get_pinned("a", 3)?; + let result_b = tree.get_pinned("b", 3)?; + assert!( + result_a.is_none(), + "disk value should be suppressed by on-disk RT" + ); + assert!( + result_b.is_none(), + "disk value should be suppressed by on-disk RT" + ); + + Ok(()) +} + +#[test] +fn get_pinned_with_merge_operator_in_memtable() -> lsm_tree::Result<()> { + // ConcatMerge defined at module scope + + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_merge_operator(Some(Arc::new(ConcatMerge))) + .open()?; + + tree.insert("k", "A", 0); + tree.merge("k", "B", 1); + + // Merge operand in active memtable → get_pinned resolves via pipeline → Owned + let ps = tree.get_pinned("k", 2)?.expect("should resolve merge"); + assert!(!ps.is_pinned()); + assert_eq!(ps.value(), b"AB"); + + Ok(()) +} + +#[test] +fn get_pinned_with_merge_operator_in_sealed_memtable() -> lsm_tree::Result<()> { + // ConcatMerge defined at module scope + + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_merge_operator(Some(Arc::new(ConcatMerge))) + .open()?; + + tree.insert("k", "X", 0); + tree.merge("k", "Y", 1); + tree.rotate_memtable(); + + // Merge operand in sealed memtable → resolves via pipeline → Owned + let ps = tree.get_pinned("k", 2)?.expect("should resolve merge"); + assert!(!ps.is_pinned()); + assert_eq!(ps.value(), b"XY"); + + Ok(()) +} + +#[test] +fn get_pinned_with_merge_operator_on_disk() -> lsm_tree::Result<()> { + // ConcatMerge defined at module scope + + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_merge_operator(Some(Arc::new(ConcatMerge))) + .open()?; + + tree.insert("k", "D", 0); + tree.flush_active_memtable(0)?; + tree.merge("k", "E", 1); + tree.flush_active_memtable(0)?; + + // Cross-SST merge: base in first SST, operand in second → resolves via pipeline → Owned + let ps = tree.get_pinned("k", 2)?.expect("should resolve merge"); + assert!(!ps.is_pinned()); + assert_eq!(ps.value(), b"DE"); + + Ok(()) +} + +#[test] +fn get_pinned_sealed_memtable_tombstone_returns_none() -> lsm_tree::Result<()> { + let (tree, _folder) = setup_tree()?; + + tree.insert("a", "value", 0); + tree.remove("a", 1); + tree.rotate_memtable(); + + let result = tree.get_pinned("a", 2)?; + assert!(result.is_none()); + + Ok(()) +} + +#[test] +fn get_pinned_sealed_memtable_range_tombstone_suppresses() -> lsm_tree::Result<()> { + let (tree, _folder) = setup_tree()?; + + tree.insert("b", "value", 0); + tree.remove_range("a", "c", 1); + tree.rotate_memtable(); + + // Value and RT both in sealed memtable + let result = tree.get_pinned("b", 2)?; + assert!(result.is_none(), "sealed memtable value suppressed by RT"); + + Ok(()) +} diff --git a/tests/write_batch.rs b/tests/write_batch.rs new file mode 100644 index 000000000..6c38c0f72 --- /dev/null +++ b/tests/write_batch.rs @@ -0,0 +1,341 @@ +use lsm_tree::{ + AbstractTree, Config, KvSeparationOptions, MergeOperator, SeqNo, SequenceNumberCounter, + UserValue, WriteBatch, get_tmp_folder, +}; +use std::sync::Arc; +use test_log::test; + +#[test] +fn write_batch_insert_and_read() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + let mut batch = WriteBatch::new(); + batch.insert("a", "val_a"); + batch.insert("b", "val_b"); + batch.insert("c", "val_c"); + + let (bytes_added, _memtable_size) = tree.apply_batch(batch, 0)?; + assert!(bytes_added > 0); + + assert_eq!(tree.get("a", 1)?.as_deref(), Some(b"val_a".as_slice())); + assert_eq!(tree.get("b", 1)?.as_deref(), Some(b"val_b".as_slice())); + assert_eq!(tree.get("c", 1)?.as_deref(), Some(b"val_c".as_slice())); + + Ok(()) +} + +#[test] +fn write_batch_insert_and_remove_different_keys() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + // Pre-insert a value + tree.insert("existing", "old_value", 0); + + let mut batch = WriteBatch::new(); + batch.insert("new_key", "new_value"); + batch.remove("existing"); + tree.apply_batch(batch, 1)?; + + assert_eq!( + tree.get("new_key", 2)?.as_deref(), + Some(b"new_value".as_slice()) + ); + assert_eq!(tree.get("existing", 2)?, None); // tombstoned + + Ok(()) +} + +#[test] +fn write_batch_empty_is_noop() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + let batch = WriteBatch::new(); + let (bytes_added, _) = tree.apply_batch(batch, 0)?; + assert_eq!(bytes_added, 0); + + Ok(()) +} + +#[test] +fn write_batch_shared_seqno_atomic_visibility() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + let mut batch = WriteBatch::new(); + batch.insert("x", "vx"); + batch.insert("y", "vy"); + batch.insert("z", "vz"); + tree.apply_batch(batch, 5)?; + + // At seqno=5, none should be visible (memtable uses seqno-1 as upper bound) + assert_eq!(tree.get("x", 5)?, None); + assert_eq!(tree.get("y", 5)?, None); + assert_eq!(tree.get("z", 5)?, None); + + // At seqno=6, all should be visible atomically + assert_eq!(tree.get("x", 6)?.as_deref(), Some(b"vx".as_slice())); + assert_eq!(tree.get("y", 6)?.as_deref(), Some(b"vy".as_slice())); + assert_eq!(tree.get("z", 6)?.as_deref(), Some(b"vz".as_slice())); + + Ok(()) +} + +#[test] +fn write_batch_with_capacity() { + let batch = WriteBatch::with_capacity(100); + assert!(batch.is_empty()); + assert_eq!(batch.len(), 0); +} + +#[test] +fn write_batch_clear() { + let mut batch = WriteBatch::new(); + batch.insert("a", "b"); + batch.remove("c"); + assert_eq!(batch.len(), 2); + + batch.clear(); + assert!(batch.is_empty()); +} + +#[test] +fn write_batch_survives_flush() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + let mut batch = WriteBatch::new(); + for i in 0..50u32 { + batch.insert(format!("key_{i:04}"), format!("val_{i}")); + } + tree.apply_batch(batch, 0)?; + tree.flush_active_memtable(0)?; + + for i in 0..50u32 { + let expected = format!("val_{i}"); + assert_eq!( + tree.get(format!("key_{i:04}"), 1)?.as_deref(), + Some(expected.as_bytes()), + "mismatch at key {i} after flush", + ); + } + + Ok(()) +} + +#[test] +fn write_batch_blob_tree_kv_separation() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some(KvSeparationOptions { + separation_threshold: 1, + ..Default::default() + })) + .open()?; + + let big_val = b"x".repeat(1000); + + let mut batch = WriteBatch::new(); + batch.insert("k1", big_val.as_slice()); + batch.insert("k2", big_val.as_slice()); + batch.remove("k3"); // tombstone for non-existent key + tree.apply_batch(batch, 0)?; + + tree.flush_active_memtable(0)?; + assert!(tree.blob_file_count() > 0); + + assert_eq!( + tree.get("k1", SeqNo::MAX)?.as_deref(), + Some(big_val.as_slice()) + ); + assert_eq!( + tree.get("k2", SeqNo::MAX)?.as_deref(), + Some(big_val.as_slice()) + ); + assert_eq!(tree.get("k3", SeqNo::MAX)?, None); + + Ok(()) +} + +struct ConcatMerge; + +impl MergeOperator for ConcatMerge { + fn merge( + &self, + _key: &[u8], + base: Option<&[u8]>, + operands: &[&[u8]], + ) -> lsm_tree::Result { + let mut result = base.unwrap_or_default().to_vec(); + for op in operands { + result.extend_from_slice(op); + } + Ok(result.into()) + } +} + +#[test] +fn write_batch_with_merge_operand() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_merge_operator(Some(Arc::new(ConcatMerge))) + .open()?; + + // Base value + tree.insert("counter", "A", 0); + + // Batch with merge operands + let mut batch = WriteBatch::new(); + batch.merge("counter", "B"); + batch.merge("counter", "C"); + tree.apply_batch(batch, 1)?; + + let result = tree.get("counter", 2)?; + assert_eq!(result.as_deref(), Some(b"ABC".as_slice())); + + Ok(()) +} + +#[test] +fn write_batch_remove_weak() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("a", "val", 0); + + let mut batch = WriteBatch::new(); + batch.remove_weak("a"); + tree.apply_batch(batch, 1)?; + + assert_eq!(tree.get("a", 2)?, None); + + Ok(()) +} + +#[test] +fn write_batch_multi_get_after_batch() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + let mut batch = WriteBatch::new(); + for i in 0..20u32 { + batch.insert(format!("key_{i:04}"), format!("val_{i}")); + } + tree.apply_batch(batch, 0)?; + + // Flush half, keep half in memtable + tree.flush_active_memtable(0)?; + + let mut batch2 = WriteBatch::new(); + for i in 20..40u32 { + batch2.insert(format!("key_{i:04}"), format!("val_{i}")); + } + tree.apply_batch(batch2, 1)?; + + // multi_get spanning disk (0-19) + memtable (20-39) + missing (40-44) + let keys: Vec = (0..45u32).map(|i| format!("key_{i:04}")).collect(); + let results = tree.multi_get(&keys, SeqNo::MAX)?; + + assert_eq!(results.len(), 45); + for i in 0..40u32 { + let expected = format!("val_{i}"); + assert_eq!( + results[i as usize].as_deref(), + Some(expected.as_bytes()), + "mismatch at key_{i:04}", + ); + } + for i in 40..45u32 { + assert_eq!(results[i as usize], None, "key_{i:04} should not exist"); + } + + Ok(()) +} + +#[test] +fn write_batch_mixed_ops_rejected() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + let mut batch = WriteBatch::new(); + batch.insert("conflict", "value"); + batch.remove("conflict"); // mixed op on same key + + let result = tree.apply_batch(batch, 0); + assert!( + matches!(result, Err(lsm_tree::Error::MixedOperationBatch)), + "mixed insert+remove on same key must be rejected", + ); + + Ok(()) +} + +#[test] +fn write_batch_repeated_merge_accepted() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = Config::new( + &folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_merge_operator(Some(Arc::new(ConcatMerge))) + .open()?; + + let mut batch = WriteBatch::new(); + batch.merge("k", "A"); + batch.merge("k", "B"); // same op type on same key = ok + + tree.apply_batch(batch, 0)?; // must not error + + Ok(()) +}