diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index 411be9352..dcd9f9120 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -62,10 +62,24 @@ cargo fmt --all -- --check # Format check | `bytes_1` | Use `bytes` crate for Slice type | | `metrics` | Expose prometheus metrics | +## Design Decision Analysis (CRITICAL) + +**Before flagging any potential issue, trace the call chain visible in the PR diff:** + +1. **Read the caller** — does the caller already handle the case you're about to flag? If a function returns a value that looks problematic in isolation, check call sites visible in the diff. The caller may handle the edge case explicitly (e.g., empty collections, `None` vs `Some(empty)`). +2. **Check type-level guarantees** — does the type system prevent the issue? Different enum variants, wrapper types, or visibility modifiers may make a "collision" or "misuse" structurally impossible. +3. **Read adjacent comments** — comments starting with "NOTE:", "Use X instead of Y because...", or explaining WHY a specific approach was chosen document deliberate design decisions. If the comment accurately describes the code behavior, the design is intentional. + +**Only suppress Tier 3/Tier 4 findings if they fail these checks.** If the pattern still indicates a Tier 1 (logic/correctness) or Tier 2 (safety/crash) issue, flag it regardless of documented rationale or caller handling. + ## Architecture Notes - `src/table/block/` — On-disk block format (header + compressed payload) - `src/vlog/blob_file/` — Value log for large values (separate from LSM blocks) - `src/compaction/` — Compaction strategies (leveled, FIFO, tiered) - `src/seqno.rs` — Sequence number generator (MVCC versioning) +- `src/range_tombstone.rs` — Range tombstone data model and serialization +- `src/range_tombstone_filter.rs` — MVCC-aware range tombstone filtering for iterators +- `src/active_tombstone_set.rs` — Tracks active range tombstones during compaction +- `src/memtable/interval_tree.rs` — Interval tree for memtable range tombstone queries - Compression is pluggable via `CompressionType` enum with `#[cfg(feature)]` variants diff --git a/.github/instructions/rust.instructions.md b/.github/instructions/rust.instructions.md index c3bcf0684..2292370e4 100644 --- a/.github/instructions/rust.instructions.md +++ b/.github/instructions/rust.instructions.md @@ -43,6 +43,10 @@ Focus review effort on real bugs, not cosmetics. Stop after finding issues in hi These are not actionable review findings. Do not raise them: +- **Caller-handled edge cases**: Before flagging a function for not handling an edge case (empty collection, `None` vs `Some(empty)`, missing guard), check call sites visible in the PR diff. If all visible callers already handle the case, the function's behavior is part of a deliberate contract — not a bug. Only flag if the edge case is truly unhandled end-to-end within the scope of the PR. +- **Type-system-prevented issues**: Before flagging a potential collision, overlap, or misuse, check whether distinct enum variants, wrapper types, or visibility modifiers make the issue structurally impossible. A `WeakTombstone` variant that never appears in user-facing merge paths cannot collide with user data regardless of key/seqno overlap. +- **Documented design decisions** (Tier 3-4 only): When code has a comment explaining WHY a specific approach was chosen, trust the documented reasoning for style and API design choices. Flag only if the comment contradicts the actual code behavior — not if you would have chosen a different approach. This exclusion does NOT apply to Tier 1 (logic bugs, data corruption) or Tier 2 (safety, crash recovery) — always flag those regardless of documentation. + - **Comment wording vs code behavior**: If a comment says "flush when full" but the threshold is checked with `>=` not `>`, the intent is clear — the boundary condition is a design choice. Do not suggest rewording comments to match exact comparison operators. - **Comment precision**: "returns the block" when it technically returns `Result` — the comment conveys meaning, not type signature. - **Magic numbers with context**: `4` in `assert_eq!(header.len(), 4, "expected u32 checksum")` — the assertion message provides the context. Do not suggest a named constant when the value is used once in a test with an explanatory message. diff --git a/README.md b/README.md index 182126455..ff1e382aa 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,11 @@ > **Maintained fork** by [Structured World Foundation](https://sw.foundation) for the [CoordiNode](https://github.com/structured-world/coordinode) database engine. > Based on [fjall-rs/lsm-tree](https://github.com/fjall-rs/lsm-tree). We contribute patches upstream and maintain additional features needed for CoordiNode (zstd compression, custom sequence number generators, batch get, intra-L0 compaction, security hardening). +> [!IMPORTANT] +> This fork now introduces a fork-specific **disk format V4** compatibility boundary. +> `V4` is a breaking on-disk change relative to `V3` because the fork persists new semantics such as range tombstones and merge operands. +> New code may continue reading supported `V3` databases, but databases written with these `V4` semantics must not be opened by older `V3` binaries. + A K.I.S.S. implementation of log-structured merge trees (LSM-trees/LSMTs) in Rust. > [!NOTE] diff --git a/src/abstract_tree.rs b/src/abstract_tree.rs index c6f1aee9b..eb39d92f3 100644 --- a/src/abstract_tree.rs +++ b/src/abstract_tree.rs @@ -15,9 +15,22 @@ pub type RangeItem = crate::Result; type FlushToTablesResult = (Vec, Option>); +// Sealed on purpose: this trait is still public as a consumer-side bound +// (`&impl AbstractTree`), but external implementations are no longer part of +// the supported extension surface. Internal flush/version hooks keep evolving +// with crate-owned tree types and must not create downstream semver traps. +// +// `sealed` stays `pub` only so sibling modules in this crate can write +// `crate::abstract_tree::sealed::Sealed` in their impls. The parent module +// `abstract_tree` is not publicly exported from the crate root, so downstream +// crates still cannot name or implement this trait. +pub mod sealed { + pub trait Sealed {} +} + /// Generic Tree API #[enum_dispatch::enum_dispatch] -pub trait AbstractTree { +pub trait AbstractTree: sealed::Sealed { /// Debug method for tracing the MVCC history of a key. #[doc(hidden)] fn print_trace(&self, key: &[u8]) -> crate::Result<()>; @@ -76,7 +89,9 @@ pub trait AbstractTree { _lock: &MutexGuard<'_, ()>, seqno_threshold: SeqNo, ) -> crate::Result> { - use crate::{compaction::stream::CompactionStream, merge::Merger}; + use crate::{ + compaction::stream::CompactionStream, merge::Merger, range_tombstone::RangeTombstone, + }; let version_history = self.get_version_history_lock(); let latest = version_history.latest_version(); @@ -93,6 +108,14 @@ pub trait AbstractTree { let flushed_size = latest.sealed_memtables.iter().map(|mt| mt.size()).sum(); + // Collect range tombstones from sealed memtables + let mut range_tombstones: Vec = Vec::new(); + for mt in latest.sealed_memtables.iter() { + range_tombstones.extend(mt.range_tombstones_sorted()); + } + range_tombstones.sort(); + range_tombstones.dedup(); + let merger = Merger::new( latest .sealed_memtables @@ -104,7 +127,22 @@ pub trait AbstractTree { drop(version_history); - if let Some((tables, blob_files)) = self.flush_to_tables(stream)? { + // Clone needed: flush_to_tables_with_rt consumes the Vec, but on the + // RT-only path (no KV data, tables.is_empty()) we re-insert RTs into the + // active memtable. Flush is infrequent and RT count is small. + if let Some((tables, blob_files)) = + self.flush_to_tables_with_rt(stream, range_tombstones.clone())? + { + // If no tables were produced (RT-only memtable), re-insert RTs + // into active memtable so they aren't lost + if tables.is_empty() && !range_tombstones.is_empty() { + let active = self.active_memtable(); + for rt in &range_tombstones { + let _ = + active.insert_range_tombstone(rt.start.clone(), rt.end.clone(), rt.seqno); + } + } + self.register_tables( &tables, blob_files.as_deref(), @@ -216,10 +254,26 @@ pub trait AbstractTree { /// # Errors /// /// Will return `Err` if an IO error occurs. - #[warn(clippy::type_complexity)] fn flush_to_tables( &self, stream: impl Iterator>, + ) -> crate::Result> { + self.flush_to_tables_with_rt(stream, Vec::new()) + } + + /// Like [`AbstractTree::flush_to_tables`], but also writes range tombstones. + /// + /// This is an internal extension hook on the crate's sealed tree types and + /// is hidden from generated documentation. + /// + /// # Errors + /// + /// Will return `Err` if an IO error occurs. + #[doc(hidden)] + fn flush_to_tables_with_rt( + &self, + stream: impl Iterator>, + range_tombstones: Vec, ) -> crate::Result>; /// Atomically registers flushed tables into the tree, removing their associated sealed memtables. @@ -680,4 +734,36 @@ pub trait AbstractTree { /// Will return `Err` if an IO error occurs. #[doc(hidden)] fn remove_weak>(&self, key: K, seqno: SeqNo) -> (u64, u64); + + /// Deletes all keys in the range `[start, end)` by inserting a range tombstone. + /// + /// This is much more efficient than deleting keys individually when + /// removing a contiguous range of keys. + /// + /// Returns the approximate size added to the memtable. + /// Returns 0 if `start >= end` (invalid interval is silently ignored). + /// + /// This is a required method on the crate's sealed tree types. + fn remove_range>(&self, start: K, end: K, seqno: SeqNo) -> u64; + + /// Deletes all keys with the given prefix by inserting a range tombstone. + /// + /// This is sugar over [`AbstractTree::remove_range`] using prefix bounds. + /// + /// Returns the approximate size added to the memtable. + /// Returns 0 for empty prefixes or all-`0xFF` prefixes (cannot form valid half-open range). + fn remove_prefix>(&self, prefix: K, seqno: SeqNo) -> u64 { + use crate::range::prefix_to_range; + use std::ops::Bound; + + let (lo, hi) = prefix_to_range(prefix.as_ref()); + + let Bound::Included(start) = lo else { return 0 }; + + // Bound::Unbounded means the prefix is all 0xFF — no representable + // exclusive upper bound exists, so we cannot form a valid range tombstone. + let Bound::Excluded(end) = hi else { return 0 }; + + self.remove_range(start, end, seqno) + } } diff --git a/src/active_tombstone_set.rs b/src/active_tombstone_set.rs new file mode 100644 index 000000000..245a1fe1b --- /dev/null +++ b/src/active_tombstone_set.rs @@ -0,0 +1,411 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! Active tombstone sets for tracking range tombstones during iteration. +//! +//! During forward or reverse scans, range tombstones must be activated when +//! the scan enters their range and expired when it leaves. These sets use +//! a seqno multiset (`BTreeMap`) for O(log t) max-seqno queries, +//! and a heap for efficient expiry tracking. +//! +//! A unique monotonic `id` on each heap entry ensures total ordering in the +//! heap (no equality on the tuple), which makes expiry deterministic. + +use crate::{range_tombstone::RangeTombstone, SeqNo, UserKey}; +use std::cmp::Reverse; +use std::collections::{BTreeMap, BinaryHeap}; + +/// Tracks active range tombstones during forward iteration. +/// +/// Tombstones are activated when the scan reaches their `start` key, and +/// expired when the scan reaches or passes their `end` key. +/// +/// Uses a min-heap (via `Reverse`) keyed by `(end, id, seqno)` so the +/// tombstone expiring soonest (smallest `end`) is at the top. +pub struct ActiveTombstoneSet { + seqno_counts: BTreeMap, + pending_expiry: BinaryHeap>, + cutoff_seqno: SeqNo, + next_id: u64, +} + +impl ActiveTombstoneSet { + /// Creates a new forward active tombstone set. + /// + /// Only tombstones with `seqno < cutoff_seqno` will be activated. + #[must_use] + pub fn new(cutoff_seqno: SeqNo) -> Self { + Self { + seqno_counts: BTreeMap::new(), + pending_expiry: BinaryHeap::new(), + cutoff_seqno, + next_id: 0, + } + } + + /// Activates a range tombstone, adding it to the active set. + /// + /// The tombstone is only activated if it is visible at the cutoff seqno + /// (i.e., `rt.seqno < cutoff_seqno`). Duplicate activations (same seqno + /// from different sources) are handled correctly via multiset accounting. + pub fn activate(&mut self, rt: &RangeTombstone) { + if !rt.visible_at(self.cutoff_seqno) { + return; + } + let id = self.next_id; + self.next_id += 1; + *self.seqno_counts.entry(rt.seqno).or_insert(0) += 1; + self.pending_expiry + .push(Reverse((rt.end.clone(), id, rt.seqno))); + } + + /// Expires tombstones whose `end <= current_key`. + /// + /// In the half-open convention `[start, end)`, a tombstone stops covering + /// keys at `end`. So when `current_key >= end`, the tombstone no longer + /// applies and is removed from the active set. + /// + /// # Panics + /// + /// Panics if an expiry pop has no matching activation in the seqno multiset. + pub fn expire_until(&mut self, current_key: &[u8]) { + while let Some(Reverse((ref end, _, seqno))) = self.pending_expiry.peek() { + if current_key >= end.as_ref() { + let seqno = *seqno; + self.pending_expiry.pop(); + #[expect( + clippy::expect_used, + reason = "expiry pop must have matching activation" + )] + let count = self + .seqno_counts + .get_mut(&seqno) + .expect("expiry pop must have matching activation"); + *count -= 1; + if *count == 0 { + self.seqno_counts.remove(&seqno); + } + } else { + break; + } + } + } + + /// Returns the highest seqno among all active tombstones, or `None` if + /// no tombstones are active. + #[must_use] + pub fn max_active_seqno(&self) -> Option { + self.seqno_counts.keys().next_back().copied() + } + + /// Returns `true` if a KV with the given seqno is suppressed by any + /// active tombstone (i.e., there exists an active tombstone with a + /// higher seqno). + #[must_use] + pub fn is_suppressed(&self, key_seqno: SeqNo) -> bool { + self.max_active_seqno().is_some_and(|max| key_seqno < max) + } + + /// Bulk-activates tombstones at a seek position. + /// + /// # Invariant + /// + /// At any iterator position, the active set contains only tombstones + /// where `start <= current_key < end` (and visible at `cutoff_seqno`). + /// Seek prefill must collect truly overlapping tombstones + /// (`start <= key < end`); `expire_until` immediately enforces the + /// `end` bound. + #[cfg_attr( + not(test), + expect(dead_code, reason = "used by iterator initialization logic") + )] + pub fn initialize_from(&mut self, tombstones: impl IntoIterator) { + for rt in tombstones { + self.activate(&rt); + } + } + + /// Returns `true` if there are no active tombstones. + #[cfg_attr( + not(test), + expect(dead_code, reason = "helper for callers to inspect active tombstones") + )] + #[must_use] + pub fn is_empty(&self) -> bool { + self.seqno_counts.is_empty() + } +} + +/// Tracks active range tombstones during reverse iteration. +/// +/// During reverse scans, tombstones are activated when the scan reaches +/// a key < `end` (strict `>`: `rt.end > current_key`), and expired when +/// `current_key < rt.start`. +/// +/// Uses a max-heap keyed by `(start, id, seqno)` so the tombstone +/// expiring soonest (largest `start`) is at the top. +pub struct ActiveTombstoneSetReverse { + seqno_counts: BTreeMap, + pending_expiry: BinaryHeap<(UserKey, u64, SeqNo)>, + cutoff_seqno: SeqNo, + next_id: u64, +} + +impl ActiveTombstoneSetReverse { + /// Creates a new reverse active tombstone set. + /// + /// Only tombstones with `seqno < cutoff_seqno` will be activated. + #[must_use] + pub fn new(cutoff_seqno: SeqNo) -> Self { + Self { + seqno_counts: BTreeMap::new(), + pending_expiry: BinaryHeap::new(), + cutoff_seqno, + next_id: 0, + } + } + + /// Activates a range tombstone, adding it to the active set. + /// + /// The tombstone is only activated if it is visible at the cutoff seqno + /// (i.e., `rt.seqno < cutoff_seqno`). Duplicate activations (same seqno + /// from different sources) are handled correctly via multiset accounting. + /// + /// For reverse iteration, activation uses strict `>`: tombstones with + /// `rt.end > current_key` are activated. `key == end` is NOT covered + /// (half-open). + pub fn activate(&mut self, rt: &RangeTombstone) { + if !rt.visible_at(self.cutoff_seqno) { + return; + } + let id = self.next_id; + self.next_id += 1; + *self.seqno_counts.entry(rt.seqno).or_insert(0) += 1; + self.pending_expiry.push((rt.start.clone(), id, rt.seqno)); + } + + /// Expires tombstones whose `start > current_key`. + /// + /// During reverse iteration, when the scan moves to a key that is + /// before a tombstone's start, that tombstone no longer applies. + /// + /// # Panics + /// + /// Panics if an expiry pop has no matching activation in the seqno multiset. + pub fn expire_until(&mut self, current_key: &[u8]) { + while let Some((ref start, _, seqno)) = self.pending_expiry.peek() { + if current_key < start.as_ref() { + let seqno = *seqno; + self.pending_expiry.pop(); + #[expect( + clippy::expect_used, + reason = "expiry pop must have matching activation" + )] + let count = self + .seqno_counts + .get_mut(&seqno) + .expect("expiry pop must have matching activation"); + *count -= 1; + if *count == 0 { + self.seqno_counts.remove(&seqno); + } + } else { + break; + } + } + } + + /// Returns the highest seqno among all active tombstones, or `None` if + /// no tombstones are active. + #[must_use] + pub fn max_active_seqno(&self) -> Option { + self.seqno_counts.keys().next_back().copied() + } + + /// Returns `true` if a KV with the given seqno is suppressed by any + /// active tombstone (i.e., there exists an active tombstone with a + /// higher seqno). + #[must_use] + pub fn is_suppressed(&self, key_seqno: SeqNo) -> bool { + self.max_active_seqno().is_some_and(|max| key_seqno < max) + } + + /// Bulk-activates tombstones at a seek position (for reverse). + #[cfg_attr( + not(test), + expect(dead_code, reason = "used by iterator initialization logic") + )] + pub fn initialize_from(&mut self, tombstones: impl IntoIterator) { + for rt in tombstones { + self.activate(&rt); + } + } + + /// Returns `true` if there are no active tombstones. + #[cfg_attr( + not(test), + expect(dead_code, reason = "helper for callers to inspect active tombstones") + )] + #[must_use] + pub fn is_empty(&self) -> bool { + self.seqno_counts.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::UserKey; + + fn rt(start: &[u8], end: &[u8], seqno: SeqNo) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + // ──── Forward tests ──── + + #[test] + fn forward_activate_and_suppress() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + assert!(set.is_suppressed(5)); + assert!(!set.is_suppressed(10)); + assert!(!set.is_suppressed(15)); + } + + #[test] + fn forward_expire_at_end() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + assert!(set.is_suppressed(5)); + set.expire_until(b"m"); // key == end, tombstone expires + assert!(!set.is_suppressed(5)); + } + + #[test] + fn forward_expire_past_end() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.expire_until(b"z"); + assert!(set.is_empty()); + } + + #[test] + fn forward_not_expired_before_end() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.expire_until(b"l"); + assert!(set.is_suppressed(5)); // still active + } + + #[test] + fn forward_invisible_tombstone_not_activated() { + let mut set = ActiveTombstoneSet::new(5); + set.activate(&rt(b"a", b"m", 10)); // seqno 10 > cutoff 5 + assert!(!set.is_suppressed(1)); + assert!(set.is_empty()); + } + + #[test] + fn forward_multiple_tombstones_max_seqno() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.activate(&rt(b"b", b"n", 20)); + assert_eq!(set.max_active_seqno(), Some(20)); + assert!(set.is_suppressed(15)); // 15 < 20 + } + + #[test] + fn forward_duplicate_end_seqno_accounting() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.activate(&rt(b"b", b"m", 10)); + assert_eq!(set.max_active_seqno(), Some(10)); + + set.expire_until(b"m"); + assert_eq!(set.max_active_seqno(), None); + assert!(set.is_empty()); + } + + #[test] + fn forward_initialize_from() { + let mut set = ActiveTombstoneSet::new(100); + set.initialize_from(vec![rt(b"a", b"m", 10), rt(b"b", b"z", 20)]); + assert_eq!(set.max_active_seqno(), Some(20)); + } + + #[test] + fn forward_initialize_and_expire() { + let mut set = ActiveTombstoneSet::new(100); + set.initialize_from(vec![rt(b"a", b"d", 10), rt(b"b", b"f", 20)]); + set.expire_until(b"e"); // expires [a,d) but not [b,f) + assert_eq!(set.max_active_seqno(), Some(20)); + set.expire_until(b"f"); // expires [b,f) + assert!(set.is_empty()); + } + + // ──── Reverse tests ──── + + #[test] + fn reverse_activate_and_suppress() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"a", b"m", 10)); + assert!(set.is_suppressed(5)); + assert!(!set.is_suppressed(10)); + } + + #[test] + fn reverse_expire_before_start() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"d", b"m", 10)); + + set.expire_until(b"c"); + assert!(set.is_empty()); + } + + #[test] + fn reverse_initialize_from() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.initialize_from(vec![rt(b"a", b"m", 10), rt(b"b", b"z", 20)]); + assert_eq!(set.max_active_seqno(), Some(20)); + } + + #[test] + fn reverse_not_expired_at_start() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"d", b"m", 10)); + + set.expire_until(b"d"); + assert!(set.is_suppressed(5)); + } + + #[test] + fn reverse_invisible_tombstone_not_activated() { + let mut set = ActiveTombstoneSetReverse::new(5); + set.activate(&rt(b"a", b"m", 10)); + assert!(set.is_empty()); + } + + #[test] + fn reverse_duplicate_end_seqno_accounting() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"d", b"m", 10)); + set.activate(&rt(b"d", b"n", 10)); + assert_eq!(set.max_active_seqno(), Some(10)); + + set.expire_until(b"c"); + assert_eq!(set.max_active_seqno(), None); + assert!(set.is_empty()); + } + + #[test] + fn reverse_multiple_tombstones() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.activate(&rt(b"f", b"z", 20)); + assert_eq!(set.max_active_seqno(), Some(20)); + + set.expire_until(b"e"); + assert_eq!(set.max_active_seqno(), Some(10)); + } +} diff --git a/src/any_tree.rs b/src/any_tree.rs index 29cfbaf6b..7e3259df9 100644 --- a/src/any_tree.rs +++ b/src/any_tree.rs @@ -15,3 +15,5 @@ pub enum AnyTree { /// Key-value separated LSM-tree, see [`BlobTree`] Blob(BlobTree), } + +impl crate::abstract_tree::sealed::Sealed for AnyTree {} diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 58777024c..7de9ae4ed 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -188,6 +188,8 @@ impl BlobTree { } } +impl crate::abstract_tree::sealed::Sealed for BlobTree {} + impl AbstractTree for BlobTree { fn print_trace(&self, key: &[u8]) -> crate::Result<()> { self.index.print_trace(key) @@ -356,9 +358,10 @@ impl AbstractTree for BlobTree { } #[expect(clippy::too_many_lines, reason = "flush logic is inherently complex")] - fn flush_to_tables( + fn flush_to_tables_with_rt( &self, stream: impl Iterator>, + range_tombstones: Vec, ) -> crate::Result, Option>)>> { use crate::{ coding::Encode, file::BLOBS_FOLDER, file::TABLES_FOLDER, @@ -439,6 +442,11 @@ impl AbstractTree for BlobTree { let separation_threshold = kv_opts.separation_threshold; + // Set range tombstones BEFORE writing KV items so that if MultiWriter + // rotates to a new table during the write loop, earlier tables already + // carry the RT metadata. + table_writer.set_range_tombstones(range_tombstones); + for item in stream { let item = item?; @@ -503,6 +511,9 @@ impl AbstractTree for BlobTree { }) .collect::>>()?; + // Return Some even when tables is empty (RT-only flush): the caller + // (AbstractTree::flush) handles empty tables by re-inserting RTs into + // the active memtable and still needs to delete sealed memtables. Ok(Some((tables, Some(blob_files)))) } @@ -643,4 +654,8 @@ impl AbstractTree for BlobTree { fn remove_weak>(&self, key: K, seqno: SeqNo) -> (u64, u64) { self.index.remove_weak(key, seqno) } + + fn remove_range>(&self, start: K, end: K, seqno: SeqNo) -> u64 { + self.index.remove_range(start, end, seqno) + } } diff --git a/src/compaction/flavour.rs b/src/compaction/flavour.rs index 9a5b34723..816a4a42b 100644 --- a/src/compaction/flavour.rs +++ b/src/compaction/flavour.rs @@ -8,6 +8,7 @@ use crate::coding::{Decode, Encode}; use crate::compaction::worker::Options; use crate::compaction::Input as CompactionPayload; use crate::file::TABLES_FOLDER; +use crate::range_tombstone::RangeTombstone; use crate::table::multi_writer::MultiWriter; use crate::version::{SuperVersions, Version}; use crate::vlog::blob_file::scanner::ScanEntry; @@ -77,7 +78,9 @@ pub(super) fn prepare_table_writer( opts.table_id_generator.clone(), payload.target_size, payload.dest_level, - )?; + )? + // Compaction consumes input tables, so clip RTs to each output table's key range. + .use_clip_range_tombstones(); if index_partitioning { table_writer = table_writer.use_partitioned_index(); @@ -120,7 +123,9 @@ pub(super) fn prepare_table_writer( pub(super) trait CompactionFlavour { fn write(&mut self, item: InternalValue) -> crate::Result<()>; - #[warn(clippy::too_many_arguments)] + /// Writes range tombstones to the current output table. + fn write_range_tombstones(&mut self, tombstones: &[RangeTombstone]); + fn finish( self: Box, super_version: &mut SuperVersions, @@ -164,6 +169,10 @@ impl RelocatingCompaction { } impl CompactionFlavour for RelocatingCompaction { + fn write_range_tombstones(&mut self, tombstones: &[RangeTombstone]) { + self.inner.write_range_tombstones(tombstones); + } + fn write(&mut self, item: InternalValue) -> crate::Result<()> { if item.key.value_type.is_indirection() { let mut reader = &item.value[..]; @@ -371,6 +380,10 @@ impl StandardCompaction { } impl CompactionFlavour for StandardCompaction { + fn write_range_tombstones(&mut self, tombstones: &[RangeTombstone]) { + self.table_writer.set_range_tombstones(tombstones.to_vec()); + } + fn write(&mut self, item: InternalValue) -> crate::Result<()> { let indirection = if item.key.value_type.is_indirection() { Some({ diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index 951fadbbf..e69bf6ba6 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -369,6 +369,16 @@ fn merge_tables( return Ok(()); }; + // Collect range tombstones from input tables before they are moved. + // Canonicalize to avoid duplicate RTs across input tables (MultiWriter + // rotation copies the same RT into every output table during flush). + let mut input_range_tombstones: Vec = tables + .iter() + .flat_map(|t| t.range_tombstones().iter().cloned()) + .collect(); + input_range_tombstones.sort(); + input_range_tombstones.dedup(); + let mut blob_frag_map = FragmentationMap::default(); let Some(mut merge_iter) = create_compaction_stream( @@ -384,11 +394,7 @@ fn merge_tables( }; let dst_lvl = payload.canonical_level.into(); - let last_level = opts.config.level_count - 1; - - // NOTE: Only evict tombstones when reaching the last level, - // That way we don't resurrect data beneath the tombstone - let is_last_level = payload.dest_level == last_level; + let is_last_level = payload.dest_level == opts.config.level_count - 1; merge_iter = merge_iter .evict_tombstones(is_last_level) @@ -489,6 +495,21 @@ fn merge_tables( drop(compaction_state); hidden_guard(payload, opts, || { + // Propagate range tombstones to output tables BEFORE writing KV items, + // so that if the compactor rotates tables during the merge loop, + // earlier tables already carry the RT metadata. + // + // Keep RTs even at the last level until compaction itself becomes + // RT-aware and can physically drop covered KVs. Dropping the RT first + // would only remove the logical delete marker and can resurrect data. + if !input_range_tombstones.is_empty() { + log::debug!( + "Propagating {} range tombstones to compaction output", + input_range_tombstones.len(), + ); + compactor.write_range_tombstones(&input_range_tombstones); + } + for (idx, item) in merge_iter.enumerate() { let item = item?; diff --git a/src/format_version.rs b/src/format_version.rs index 2e0db97d3..b4c5db4f3 100644 --- a/src/format_version.rs +++ b/src/format_version.rs @@ -13,6 +13,9 @@ pub enum FormatVersion { /// Version for 3.x.x releases V3, + + /// Version for range-tombstone SST semantics + V4, } impl std::fmt::Display for FormatVersion { @@ -27,6 +30,7 @@ impl From for u8 { FormatVersion::V1 => 1, FormatVersion::V2 => 2, FormatVersion::V3 => 3, + FormatVersion::V4 => 4, } } } @@ -39,6 +43,7 @@ impl TryFrom for FormatVersion { 1 => Ok(Self::V1), 2 => Ok(Self::V2), 3 => Ok(Self::V3), + 4 => Ok(Self::V4), _ => Err(()), } } diff --git a/src/lib.rs b/src/lib.rs index ea505e657..6c0ea7d95 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -125,6 +125,10 @@ mod path; #[doc(hidden)] pub mod range; +pub(crate) mod active_tombstone_set; +pub(crate) mod range_tombstone; +pub(crate) mod range_tombstone_filter; + #[doc(hidden)] pub mod table; diff --git a/src/memtable/interval_tree.rs b/src/memtable/interval_tree.rs new file mode 100644 index 000000000..9151f840f --- /dev/null +++ b/src/memtable/interval_tree.rs @@ -0,0 +1,492 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! AVL-balanced interval tree for efficient range tombstone queries in memtables. +//! +//! Keyed by `start`, augmented with `subtree_max_end`, `subtree_max_seqno`, +//! and `subtree_min_seqno` for pruning during queries. + +use crate::range_tombstone::CoveringRt; +use crate::range_tombstone::RangeTombstone; +use crate::{SeqNo, UserKey}; +use std::cmp::Ordering; + +/// An AVL-balanced BST keyed by range tombstone `start`, augmented with +/// subtree-level metadata for efficient interval queries. +pub struct IntervalTree { + root: Option>, + len: usize, +} + +struct Node { + tombstone: RangeTombstone, + + // AVL metadata + height: i32, + left: Option>, + right: Option>, + + // Augmented metadata + subtree_max_end: UserKey, + subtree_max_seqno: SeqNo, + subtree_min_seqno: SeqNo, +} + +impl Node { + fn new(tombstone: RangeTombstone) -> Self { + let subtree_max_end = tombstone.end.clone(); + let seqno = tombstone.seqno; + Self { + tombstone, + height: 1, + left: None, + right: None, + subtree_max_end, + subtree_max_seqno: seqno, + subtree_min_seqno: seqno, + } + } + + fn update_augmentation(&mut self) { + self.subtree_max_end = self.tombstone.end.clone(); + self.subtree_max_seqno = self.tombstone.seqno; + self.subtree_min_seqno = self.tombstone.seqno; + self.height = 1; + + if let Some(ref left) = self.left { + if left.subtree_max_end > self.subtree_max_end { + self.subtree_max_end = left.subtree_max_end.clone(); + } + if left.subtree_max_seqno > self.subtree_max_seqno { + self.subtree_max_seqno = left.subtree_max_seqno; + } + if left.subtree_min_seqno < self.subtree_min_seqno { + self.subtree_min_seqno = left.subtree_min_seqno; + } + self.height = left.height + 1; + } + + if let Some(ref right) = self.right { + if right.subtree_max_end > self.subtree_max_end { + self.subtree_max_end = right.subtree_max_end.clone(); + } + if right.subtree_max_seqno > self.subtree_max_seqno { + self.subtree_max_seqno = right.subtree_max_seqno; + } + if right.subtree_min_seqno < self.subtree_min_seqno { + self.subtree_min_seqno = right.subtree_min_seqno; + } + let rh = right.height + 1; + if rh > self.height { + self.height = rh; + } + } + } + + fn balance_factor(&self) -> i32 { + let lh = self.left.as_ref().map_or(0, |n| n.height); + let rh = self.right.as_ref().map_or(0, |n| n.height); + lh - rh + } +} + +#[expect( + clippy::expect_used, + reason = "rotation invariant: left child must exist" +)] +fn rotate_right(mut node: Box) -> Box { + let mut new_root = node.left.take().expect("rotate_right requires left child"); + node.left = new_root.right.take(); + node.update_augmentation(); + new_root.right = Some(node); + new_root.update_augmentation(); + new_root +} + +#[expect( + clippy::expect_used, + reason = "rotation invariant: right child must exist" +)] +fn rotate_left(mut node: Box) -> Box { + let mut new_root = node.right.take().expect("rotate_left requires right child"); + node.right = new_root.left.take(); + node.update_augmentation(); + new_root.left = Some(node); + new_root.update_augmentation(); + new_root +} + +#[expect( + clippy::expect_used, + reason = "balance factor guarantees child existence" +)] +fn balance(mut node: Box) -> Box { + node.update_augmentation(); + let bf = node.balance_factor(); + + if bf > 1 { + // Left-heavy + if let Some(ref left) = node.left { + if left.balance_factor() < 0 { + // Left-Right case + node.left = Some(rotate_left(node.left.take().expect("just checked"))); + } + } + return rotate_right(node); + } + + if bf < -1 { + // Right-heavy + if let Some(ref right) = node.right { + if right.balance_factor() > 0 { + // Right-Left case + node.right = Some(rotate_right(node.right.take().expect("just checked"))); + } + } + return rotate_left(node); + } + + node +} + +/// Returns `(node, was_new)` — `was_new` is false when a duplicate was replaced. +fn insert_node(node: Option>, tombstone: RangeTombstone) -> (Box, bool) { + let Some(mut node) = node else { + return (Box::new(Node::new(tombstone)), true); + }; + + let was_new; + match tombstone.cmp(&node.tombstone) { + Ordering::Less => { + let (child, new) = insert_node(node.left.take(), tombstone); + node.left = Some(child); + was_new = new; + } + Ordering::Greater => { + let (child, new) = insert_node(node.right.take(), tombstone); + node.right = Some(child); + was_new = new; + } + Ordering::Equal => { + // Duplicate — replace (shouldn't normally happen) + node.tombstone = tombstone; + node.update_augmentation(); + return (node, false); + } + } + + (balance(node), was_new) +} + +/// Like `collect_overlapping`, but returns `true` as soon as any overlapping +/// tombstone with `seqno > key_seqno` is found. Avoids Vec allocation on the +/// hot read path. +fn any_overlapping_suppresses( + node: Option<&Node>, + key: &[u8], + key_seqno: SeqNo, + read_seqno: SeqNo, +) -> bool { + let Some(n) = node else { return false }; + + if n.subtree_min_seqno >= read_seqno { + return false; + } + + if n.subtree_max_end.as_ref() <= key { + return false; + } + + if any_overlapping_suppresses(n.left.as_deref(), key, key_seqno, read_seqno) { + return true; + } + + if n.tombstone.start.as_ref() <= key { + if n.tombstone.contains_key(key) + && n.tombstone.visible_at(read_seqno) + && n.tombstone.seqno > key_seqno + { + return true; + } + return any_overlapping_suppresses(n.right.as_deref(), key, key_seqno, read_seqno); + } + + false +} + +/// In-order traversal to produce sorted output. +fn inorder(node: Option<&Node>, result: &mut Vec) { + let Some(n) = node else { return }; + inorder(n.left.as_deref(), result); + result.push(n.tombstone.clone()); + inorder(n.right.as_deref(), result); +} + +/// Collects tombstones that fully cover `[min, max]` and are visible at `read_seqno`. +fn collect_covering( + node: Option<&Node>, + min: &[u8], + max: &[u8], + read_seqno: SeqNo, + best: &mut Option, +) { + let Some(n) = node else { return }; + + // Prune: no tombstone visible at this read_seqno + if n.subtree_min_seqno >= read_seqno { + return; + } + + // Prune: max_end <= max means no interval in subtree can fully cover [min, max] + // (need end > max, i.e., max_end > max for half-open covering) + if n.subtree_max_end.as_ref() <= max { + return; + } + + // Recurse left + collect_covering(n.left.as_deref(), min, max, read_seqno, best); + + // Check current node: must have start <= min AND max < end + if n.tombstone.start.as_ref() <= min + && n.tombstone.fully_covers(min, max) + && n.tombstone.visible_at(read_seqno) + { + let dominated = best.as_ref().is_some_and(|b| n.tombstone.seqno <= b.seqno); + if !dominated { + *best = Some(CoveringRt::from(&n.tombstone)); + } + } + + // Only go right if some right-subtree entry might have start <= min + if n.tombstone.start.as_ref() <= min { + collect_covering(n.right.as_deref(), min, max, read_seqno, best); + } +} + +impl IntervalTree { + /// Creates a new empty interval tree. + #[must_use] + pub fn new() -> Self { + Self { root: None, len: 0 } + } + + /// Inserts a range tombstone into the tree. O(log n). + pub fn insert(&mut self, tombstone: RangeTombstone) { + let (root, was_new) = insert_node(self.root.take(), tombstone); + self.root = Some(root); + if was_new { + self.len += 1; + } + } + + /// Returns `true` if the given key at the given seqno is suppressed by + /// any range tombstone visible at `read_seqno`. + /// + /// O(log n + k) where k is the number of overlapping tombstones. + /// Uses early-exit traversal to avoid allocating a Vec. + pub fn query_suppression(&self, key: &[u8], key_seqno: SeqNo, read_seqno: SeqNo) -> bool { + any_overlapping_suppresses(self.root.as_deref(), key, key_seqno, read_seqno) + } + + /// Returns the highest-seqno visible tombstone that fully covers `[min, max]`, + /// or `None` if no such tombstone exists. + /// + /// Used for table-skip decisions. + #[cfg_attr(not(test), expect(dead_code, reason = "used for table-skip decisions"))] + pub fn query_covering_rt_for_range( + &self, + min: &[u8], + max: &[u8], + read_seqno: SeqNo, + ) -> Option { + let mut best = None; + collect_covering(self.root.as_deref(), min, max, read_seqno, &mut best); + best + } + + /// Returns all tombstones in sorted order (by `RangeTombstone::Ord`). + /// + /// Used for flush. + pub fn iter_sorted(&self) -> Vec { + let mut result = Vec::with_capacity(self.len); + inorder(self.root.as_deref(), &mut result); + result + } + + /// Returns the number of tombstones in the tree. + #[must_use] + pub fn len(&self) -> usize { + self.len + } + + /// Returns `true` if the tree is empty. + #[expect( + dead_code, + reason = "tree may have tombstones but is_empty not called in all paths" + )] + #[must_use] + pub fn is_empty(&self) -> bool { + self.len == 0 + } +} + +impl Default for IntervalTree { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +#[expect( + clippy::unwrap_used, + clippy::indexing_slicing, + reason = "tests intentionally use direct unwraps and indexing for compact fixtures" +)] +mod tests { + use super::*; + + fn rt(start: &[u8], end: &[u8], seqno: SeqNo) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + #[test] + fn empty_tree_no_suppression() { + let tree = IntervalTree::new(); + assert!(!tree.query_suppression(b"key", 5, 100)); + } + + #[test] + fn single_tombstone_suppresses() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(tree.query_suppression(b"c", 5, 100)); + } + + #[test] + fn single_tombstone_no_suppress_newer_kv() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(!tree.query_suppression(b"c", 15, 100)); + } + + #[test] + fn single_tombstone_exclusive_end() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(!tree.query_suppression(b"y", 5, 100)); + } + + #[test] + fn single_tombstone_before_start() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(!tree.query_suppression(b"a", 5, 100)); + } + + #[test] + fn tombstone_not_visible() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(!tree.query_suppression(b"c", 5, 9)); + } + + #[test] + fn multiple_tombstones() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"a", b"f", 10)); + tree.insert(rt(b"d", b"m", 20)); + tree.insert(rt(b"p", b"z", 5)); + + assert!(tree.query_suppression(b"e", 15, 100)); + assert!(tree.query_suppression(b"e", 5, 100)); + assert!(!tree.query_suppression(b"e", 25, 100)); + + assert!(tree.query_suppression(b"q", 3, 100)); + assert!(!tree.query_suppression(b"q", 10, 100)); + } + + #[test] + fn covering_rt_found() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"a", b"z", 50)); + tree.insert(rt(b"c", b"g", 10)); + + let crt = tree.query_covering_rt_for_range(b"b", b"y", 100); + assert!(crt.is_some()); + let crt = crt.unwrap(); + assert_eq!(crt.seqno, 50); + } + + #[test] + fn covering_rt_not_found_partial() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"c", b"g", 10)); + + let crt = tree.query_covering_rt_for_range(b"b", b"y", 100); + assert!(crt.is_none()); + } + + #[test] + fn covering_rt_highest_seqno() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"a", b"z", 50)); + tree.insert(rt(b"a", b"z", 100)); + + let crt = tree.query_covering_rt_for_range(b"b", b"y", 200); + assert!(crt.is_some()); + assert_eq!(crt.unwrap().seqno, 100); + } + + #[test] + fn iter_sorted_empty() { + let tree = IntervalTree::new(); + assert!(tree.iter_sorted().is_empty()); + } + + #[test] + fn iter_sorted_multiple() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"d", b"f", 10)); + tree.insert(rt(b"a", b"c", 20)); + tree.insert(rt(b"m", b"z", 5)); + + let sorted = tree.iter_sorted(); + assert_eq!(sorted.len(), 3); + assert_eq!(sorted[0].start.as_ref(), b"a"); + assert_eq!(sorted[1].start.as_ref(), b"d"); + assert_eq!(sorted[2].start.as_ref(), b"m"); + } + + #[test] + fn avl_balance_maintained() { + let mut tree = IntervalTree::new(); + for i in 0u8..20 { + let s = vec![i]; + let e = vec![i + 1]; + tree.insert(rt(&s, &e, u64::from(i))); + } + assert_eq!(tree.len(), 20); + if let Some(ref root) = tree.root { + assert!(root.height <= 6, "AVL height too large: {}", root.height); + } + } + + #[test] + fn seqno_pruning() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"a", b"z", 100)); + tree.insert(rt(b"b", b"y", 200)); + + assert!(!tree.query_suppression(b"c", 5, 50)); + } + + #[test] + fn max_end_pruning() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"a", b"c", 10)); + tree.insert(rt(b"b", b"d", 10)); + + assert!(!tree.query_suppression(b"e", 5, 100)); + } +} diff --git a/src/memtable/mod.rs b/src/memtable/mod.rs index ced9c12b7..d3771292d 100644 --- a/src/memtable/mod.rs +++ b/src/memtable/mod.rs @@ -2,14 +2,18 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) +pub mod interval_tree; + use crate::key::InternalKey; +use crate::range_tombstone::RangeTombstone; use crate::{ value::{InternalValue, SeqNo, UserValue}, - ValueType, + UserKey, ValueType, }; use crossbeam_skiplist::SkipMap; use std::ops::RangeBounds; use std::sync::atomic::{AtomicBool, AtomicU64}; +use std::sync::Mutex; pub use crate::tree::inner::MemtableId; @@ -24,6 +28,13 @@ pub struct Memtable { #[doc(hidden)] pub items: SkipMap, + /// Range tombstones stored in an interval tree. + /// + /// Protected by a `Mutex` since `IntervalTree` is not lock-free. + /// Contention is expected to be low — range deletes are infrequent. + /// Future optimization: `RwLock` or atomic RT count for lock-free empty check. + pub(crate) range_tombstones: Mutex, + /// Approximate active memtable size. /// /// If this grows too large, a flush is triggered. @@ -61,6 +72,7 @@ impl Memtable { Self { id, items: SkipMap::default(), + range_tombstones: Mutex::new(interval_tree::IntervalTree::new()), approximate_size: AtomicU64::default(), highest_seqno: AtomicU64::default(), requested_rotation: AtomicBool::default(), @@ -135,10 +147,10 @@ impl Memtable { self.items.len() } - /// Returns `true` if the memtable is empty. + /// Returns `true` if the memtable has no KV items and no range tombstones. #[must_use] pub fn is_empty(&self) -> bool { - self.items.is_empty() + self.items.is_empty() && self.range_tombstone_count() == 0 } /// Inserts an item into the memtable @@ -166,6 +178,98 @@ impl Memtable { (item_size, size_before + item_size) } + /// Inserts a range tombstone covering `[start, end)` at the given seqno. + /// + /// Returns the approximate size added to the memtable. + /// + /// Returns 0 if `start >= end` or if either bound exceeds `u16::MAX` bytes. + /// + /// # Panics + /// + /// Panics if the internal mutex is poisoned. + #[must_use] + pub fn insert_range_tombstone(&self, start: UserKey, end: UserKey, seqno: SeqNo) -> u64 { + // Reject invalid intervals in release builds (debug_assert is not enough) + if start >= end { + return 0; + } + + // On-disk RT format writes key lengths as u16, enforce at insertion time. + // Emit a warning when rejecting an oversized bound so this failure is diagnosable. + if u16::try_from(start.len()).is_err() || u16::try_from(end.len()).is_err() { + log::warn!( + "insert_range_tombstone: rejecting oversized range tombstone \ + bounds (start_len = {}, end_len = {}, max = {})", + start.len(), + end.len(), + u16::MAX, + ); + return 0; + } + + let size = (start.len() + end.len() + std::mem::size_of::()) as u64; + + #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")] + self.range_tombstones + .lock() + .expect("lock is poisoned") + .insert(RangeTombstone::new(start, end, seqno)); + + self.approximate_size + .fetch_add(size, std::sync::atomic::Ordering::AcqRel); + + self.highest_seqno + .fetch_max(seqno, std::sync::atomic::Ordering::AcqRel); + + size + } + + /// Returns `true` if the key at `key_seqno` is suppressed by a range tombstone + /// visible at `read_seqno`. + /// + /// # Panics + /// + /// Panics if the internal mutex is poisoned. + pub(crate) fn is_key_suppressed_by_range_tombstone( + &self, + key: &[u8], + key_seqno: SeqNo, + read_seqno: SeqNo, + ) -> bool { + #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")] + self.range_tombstones + .lock() + .expect("lock is poisoned") + .query_suppression(key, key_seqno, read_seqno) + } + + /// Returns all range tombstones in sorted order (for flush). + /// + /// # Panics + /// + /// Panics if the internal mutex is poisoned. + pub(crate) fn range_tombstones_sorted(&self) -> Vec { + #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")] + self.range_tombstones + .lock() + .expect("lock is poisoned") + .iter_sorted() + } + + /// Returns the number of range tombstones. + /// + /// # Panics + /// + /// Panics if the internal mutex is poisoned. + #[must_use] + pub fn range_tombstone_count(&self) -> usize { + #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")] + self.range_tombstones + .lock() + .expect("lock is poisoned") + .len() + } + /// Returns the highest sequence number in the memtable. pub fn get_highest_seqno(&self) -> Option { if self.is_empty() { diff --git a/src/range.rs b/src/range.rs index c0cd5df94..4cac370d7 100644 --- a/src/range.rs +++ b/src/range.rs @@ -7,6 +7,8 @@ use crate::{ memtable::Memtable, merge::Merger, mvcc_stream::MvccStream, + range_tombstone::RangeTombstone, + range_tombstone_filter::RangeTombstoneFilter, run_reader::RunReader, value::{SeqNo, UserKey}, version::SuperVersion, @@ -95,57 +97,94 @@ impl DoubleEndedIterator for TreeIter { } } +fn range_tombstone_overlaps_bounds( + rt: &RangeTombstone, + bounds: &(Bound, Bound), +) -> bool { + let overlaps_lo = match &bounds.0 { + Bound::Included(key) | Bound::Excluded(key) => rt.end.as_ref() > key.as_ref(), + Bound::Unbounded => true, + }; + + let overlaps_hi = match &bounds.1 { + Bound::Included(key) => rt.start.as_ref() <= key.as_ref(), + Bound::Excluded(key) => rt.start.as_ref() < key.as_ref(), + Bound::Unbounded => true, + }; + + overlaps_lo && overlaps_hi +} + impl TreeIter { + #[expect( + clippy::too_many_lines, + reason = "create_range wires up multiple iterator sources, filters, and tombstone handling; splitting further would reduce clarity" + )] pub fn create_range, R: RangeBounds>( guard: IterState, range: R, seqno: SeqNo, ) -> Self { Self::new(guard, |lock| { - let lo = match range.start_bound() { - // NOTE: See memtable.rs for range explanation - Bound::Included(key) => Bound::Included(InternalKey::new( - key.as_ref(), - SeqNo::MAX, - crate::ValueType::Tombstone, - )), - Bound::Excluded(key) => Bound::Excluded(InternalKey::new( - key.as_ref(), - 0, - crate::ValueType::Tombstone, - )), - Bound::Unbounded => Bound::Unbounded, - }; - - let hi = match range.end_bound() { - // NOTE: See memtable.rs for range explanation, this is the reverse case - // where we need to go all the way to the last seqno of an item - // - // Example: We search for (Unbounded..Excluded(abdef)) - // - // key -> seqno - // - // a -> 7 <<< This is the lowest key that matches the range - // abc -> 5 - // abc -> 4 - // abc -> 3 <<< This is the highest key that matches the range - // abcdef -> 6 - // abcdef -> 5 - // - Bound::Included(key) => { - Bound::Included(InternalKey::new(key.as_ref(), 0, crate::ValueType::Value)) - } - Bound::Excluded(key) => Bound::Excluded(InternalKey::new( - key.as_ref(), - SeqNo::MAX, - crate::ValueType::Value, - )), - Bound::Unbounded => Bound::Unbounded, - }; - - let range = (lo, hi); + let user_range = ( + match range.start_bound() { + Bound::Included(key) => Bound::Included(UserKey::from(key.as_ref())), + Bound::Excluded(key) => Bound::Excluded(UserKey::from(key.as_ref())), + Bound::Unbounded => Bound::Unbounded, + }, + match range.end_bound() { + Bound::Included(key) => Bound::Included(UserKey::from(key.as_ref())), + Bound::Excluded(key) => Bound::Excluded(UserKey::from(key.as_ref())), + Bound::Unbounded => Bound::Unbounded, + }, + ); + + let range = ( + match &user_range.0 { + // NOTE: See memtable.rs for range explanation + Bound::Included(key) => Bound::Included(InternalKey::new( + key.as_ref(), + SeqNo::MAX, + crate::ValueType::Tombstone, + )), + Bound::Excluded(key) => Bound::Excluded(InternalKey::new( + key.as_ref(), + 0, + crate::ValueType::Tombstone, + )), + Bound::Unbounded => Bound::Unbounded, + }, + match &user_range.1 { + // NOTE: See memtable.rs for range explanation, this is the reverse case + // where we need to go all the way to the last seqno of an item + // + // Example: We search for (Unbounded..Excluded(abdef)) + // + // key -> seqno + // + // a -> 7 <<< This is the lowest key that matches the range + // abc -> 5 + // abc -> 4 + // abc -> 3 <<< This is the highest key that matches the range + // abcdef -> 6 + // abcdef -> 5 + // + Bound::Included(key) => { + Bound::Included(InternalKey::new(key.as_ref(), 0, crate::ValueType::Value)) + } + Bound::Excluded(key) => Bound::Excluded(InternalKey::new( + key.as_ref(), + SeqNo::MAX, + crate::ValueType::Value, + )), + Bound::Unbounded => Bound::Unbounded, + }, + ); let mut iters: Vec> = Vec::with_capacity(5); + let mut all_range_tombstones: Vec = Vec::new(); + let mut single_tables = Vec::new(); + let mut multi_runs = Vec::new(); for run in lock .version @@ -161,42 +200,85 @@ impl TreeIter { #[expect(clippy::expect_used, reason = "we checked for length")] let table = run.first().expect("should exist"); + all_range_tombstones.extend( + table + .range_tombstones() + .iter() + .filter(|rt| range_tombstone_overlaps_bounds(rt, &user_range)) + .cloned(), + ); + + // Check key range overlap first (cheap metadata check) before + // running the O(rt_count) table-skip scan. if table.check_key_range_overlap(&( - range.start_bound().map(|x| &*x.user_key), - range.end_bound().map(|x| &*x.user_key), + user_range.0.as_ref().map(std::convert::AsRef::as_ref), + user_range.1.as_ref().map(std::convert::AsRef::as_ref), )) { - let reader = table - .range(( - range.start_bound().map(|x| &x.user_key).cloned(), - range.end_bound().map(|x| &x.user_key).cloned(), - )) - .filter(move |item| match item { - Ok(item) => seqno_filter(item.key.seqno, seqno), - Err(_) => true, - }); - - iters.push(Box::new(reader)); + single_tables.push(table.clone()); } } _ => { - if let Some(reader) = RunReader::new( - run.clone(), - ( - range.start_bound().map(|x| &x.user_key).cloned(), - range.end_bound().map(|x| &x.user_key).cloned(), - ), - ) { - iters.push(Box::new(reader.filter(move |item| match item { - Ok(item) => seqno_filter(item.key.seqno, seqno), - Err(_) => true, - }))); + for table in run.iter() { + all_range_tombstones.extend( + table + .range_tombstones() + .iter() + .filter(|rt| range_tombstone_overlaps_bounds(rt, &user_range)) + .cloned(), + ); } + + multi_runs.push(run.clone()); } } } + for table in single_tables { + // Table-skip: if a range tombstone fully covers this table + // with a higher seqno, skip it entirely (avoid I/O). + // NOTE: get_highest_seqno() includes RT seqnos, so a covering + // RT stored in the same table won't trigger skip (conservative + // but correct). Separate KV/RT seqno bounds would improve this. + // key_range.max() is inclusive, fully_covers uses half-open: max < rt.end + let is_covered = all_range_tombstones.iter().any(|rt| { + rt.visible_at(seqno) + && rt.fully_covers( + table.metadata.key_range.min(), + table.metadata.key_range.max(), + ) + && rt.seqno > table.get_highest_seqno() + }); + + if !is_covered { + let reader = table + .range(user_range.clone()) + .filter(move |item| match item { + Ok(item) => seqno_filter(item.key.seqno, seqno), + Err(_) => true, + }); + + iters.push(Box::new(reader)); + } + } + + for run in multi_runs { + if let Some(reader) = RunReader::new(run, user_range.clone()) { + iters.push(Box::new(reader.filter(move |item| match item { + Ok(item) => seqno_filter(item.key.seqno, seqno), + Err(_) => true, + }))); + } + } + // Sealed memtables for memtable in lock.version.sealed_memtables.iter() { + all_range_tombstones.extend( + memtable + .range_tombstones_sorted() + .into_iter() + .filter(|rt| range_tombstone_overlaps_bounds(rt, &user_range)), + ); + let iter = memtable.range(range.clone()); iters.push(Box::new( @@ -207,6 +289,14 @@ impl TreeIter { // Active memtable { + all_range_tombstones.extend( + lock.version + .active_memtable + .range_tombstones_sorted() + .into_iter() + .filter(|rt| range_tombstone_overlaps_bounds(rt, &user_range)), + ); + let iter = lock.version.active_memtable.range(range.clone()); iters.push(Box::new( @@ -216,6 +306,12 @@ impl TreeIter { } if let Some((mt, seqno)) = &lock.ephemeral { + all_range_tombstones.extend( + mt.range_tombstones_sorted() + .into_iter() + .filter(|rt| range_tombstone_overlaps_bounds(rt, &user_range)), + ); + let iter = Box::new( mt.range(range) .filter(move |item| seqno_filter(item.key.seqno, *seqno)) @@ -227,10 +323,24 @@ impl TreeIter { let merged = Merger::new(iters); let iter = MvccStream::new(merged); - Box::new(iter.filter(|x| match x { + let iter = iter.filter(|x| match x { Ok(value) => !value.key.is_tombstone(), Err(_) => true, - })) + }); + + // Deduplicate: MultiWriter rotation copies the same RTs into each + // output table, so collected tombstones can contain duplicates. + all_range_tombstones.sort(); + all_range_tombstones.dedup(); + + // Fast path: skip filter wrapping when no tombstone is visible at this + // read seqno. We collect RTs while building the iterator inputs to + // avoid a separate pre-scan over every memtable and SST. + if all_range_tombstones.iter().all(|rt| !rt.visible_at(seqno)) { + Box::new(iter) + } else { + Box::new(RangeTombstoneFilter::new(iter, all_range_tombstones, seqno)) + } }) } } diff --git a/src/range_tombstone.rs b/src/range_tombstone.rs new file mode 100644 index 000000000..4861f86a2 --- /dev/null +++ b/src/range_tombstone.rs @@ -0,0 +1,396 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +use crate::{SeqNo, UserKey}; +use std::cmp::Reverse; + +/// A range tombstone that deletes all keys in `[start, end)` at a given sequence number. +/// +/// Half-open interval: `start` is inclusive, `end` is exclusive. +/// A key `k` is covered iff `start <= k < end`. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct RangeTombstone { + /// Inclusive start bound + pub start: UserKey, + /// Exclusive end bound + pub end: UserKey, + /// Sequence number at which this tombstone was written + pub seqno: SeqNo, +} + +impl RangeTombstone { + /// Creates a new range tombstone for `[start, end)` at the given seqno. + /// + /// # Panics (debug only) + /// + /// Debug-asserts that `start < end`. Callers must validate untrusted input + /// before constructing a `RangeTombstone`. + #[must_use] + pub fn new(start: UserKey, end: UserKey, seqno: SeqNo) -> Self { + debug_assert!(start < end, "range tombstone start must be < end"); + Self { start, end, seqno } + } + + /// Returns `true` if `key` is within `[start, end)`. + #[must_use] + pub fn contains_key(&self, key: &[u8]) -> bool { + self.start.as_ref() <= key && key < self.end.as_ref() + } + + /// Returns `true` if this tombstone is visible at the given read seqno. + /// + /// Uses exclusive boundary (`self.seqno < read_seqno`) consistent with + /// the codebase convention where `seqno` is an exclusive snapshot boundary. + #[must_use] + pub fn visible_at(&self, read_seqno: SeqNo) -> bool { + self.seqno < read_seqno + } + + /// Returns `true` if this tombstone should suppress a KV with the given seqno + /// at the given read snapshot. + /// + /// Suppress iff: `kv_seqno < self.seqno AND self.contains_key(key) AND self.visible_at(read_seqno)` + #[must_use] + pub fn should_suppress(&self, key: &[u8], kv_seqno: SeqNo, read_seqno: SeqNo) -> bool { + self.visible_at(read_seqno) && self.contains_key(key) && kv_seqno < self.seqno + } + + /// Returns the intersection of this tombstone with `[min, max)`, or `None` + /// if the ranges do not overlap. + /// + /// The resulting tombstone has the same seqno as `self`. + #[must_use] + pub fn intersect_opt(&self, min: &[u8], max: &[u8]) -> Option { + let new_start_ref = if self.start.as_ref() > min { + self.start.as_ref() + } else { + min + }; + let new_end_ref = if self.end.as_ref() < max { + self.end.as_ref() + } else { + max + }; + + if new_start_ref < new_end_ref { + Some(Self { + start: UserKey::from(new_start_ref), + end: UserKey::from(new_end_ref), + seqno: self.seqno, + }) + } else { + None + } + } + + /// Returns `true` if this tombstone fully covers the key range `[min, max]`. + /// + /// "Fully covers" means `self.start <= min` AND `max < self.end`. + /// This uses the half-open convention: the inclusive `max` must be + /// strictly less than the exclusive `end`. + #[must_use] + pub fn fully_covers(&self, min: &[u8], max: &[u8]) -> bool { + self.start.as_ref() <= min && max < self.end.as_ref() + } +} + +/// Ordered by `(start asc, seqno desc, end asc)`. +/// +/// The `end` tiebreaker ensures deterministic ordering for debug output +/// and property tests. +impl Ord for RangeTombstone { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + (&self.start, Reverse(self.seqno), &self.end).cmp(&( + &other.start, + Reverse(other.seqno), + &other.end, + )) + } +} + +impl PartialOrd for RangeTombstone { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// Information about a covering range tombstone, used for table-skip decisions. +/// +/// A covering tombstone fully covers a table's key range and has a seqno +/// greater than the table's max seqno, meaning the entire table can be skipped. +#[derive(Clone, Debug)] +pub struct CoveringRt { + /// The start key of the covering tombstone (inclusive) + pub start: UserKey, + /// The end key of the covering tombstone (exclusive) + pub end: UserKey, + /// The seqno of the covering tombstone + pub seqno: SeqNo, +} + +impl CoveringRt { + /// Returns `true` if this covering tombstone fully covers the given + /// key range `[min, max]` and has a higher seqno than the table's max. + #[must_use] + #[cfg_attr( + not(test), + expect(dead_code, reason = "wired up in table-skip optimization") + )] + pub fn covers_table(&self, table_min: &[u8], table_max: &[u8], table_max_seqno: SeqNo) -> bool { + self.start.as_ref() <= table_min + && table_max < self.end.as_ref() + && self.seqno > table_max_seqno + } +} + +impl From<&RangeTombstone> for CoveringRt { + fn from(rt: &RangeTombstone) -> Self { + Self { + start: rt.start.clone(), + end: rt.end.clone(), + seqno: rt.seqno, + } + } +} + +/// Computes the upper bound exclusive key for use in range queries. +/// +/// Given a key, returns the next key in lexicographic order by appending `0x00`. +/// This is useful for converting inclusive upper bounds to exclusive ones +/// in range-cover queries. +/// +/// Returns `None` only when the key is already the lexicographically largest +/// encodable user key in this bounded key domain. +#[must_use] +pub fn upper_bound_exclusive(key: &[u8]) -> Option { + // The codebase enforces that user keys fit in a u16 length + // (see `InternalKey::new`). For shorter keys, appending `0x00` + // yields the immediate strict upper bound while preserving prefix order. + if key.len() < usize::from(u16::MAX) { + let mut result = Vec::with_capacity(key.len() + 1); + result.extend_from_slice(key); + result.push(0x00); + return Some(UserKey::from(result)); + } + + // At max length we cannot append, but a strict upper bound still exists + // unless the key is already the absolute maximum (all `0xFF`). + let mut result = key.to_vec(); + + for (idx, byte) in result.iter_mut().enumerate().rev() { + if *byte < 0xFF { + *byte += 1; + result.truncate(idx + 1); + return Some(UserKey::from(result)); + } + } + + None +} + +#[cfg(test)] +#[expect( + clippy::unwrap_used, + clippy::expect_used, + reason = "tests use unwrap/expect on controlled fixtures for brevity" +)] +mod tests { + use super::*; + + fn rt(start: &[u8], end: &[u8], seqno: SeqNo) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + #[test] + fn contains_key_inclusive_start() { + let t = rt(b"b", b"d", 10); + assert!(t.contains_key(b"b")); + } + + #[test] + fn contains_key_exclusive_end() { + let t = rt(b"b", b"d", 10); + assert!(!t.contains_key(b"d")); + } + + #[test] + fn contains_key_middle() { + let t = rt(b"b", b"d", 10); + assert!(t.contains_key(b"c")); + } + + #[test] + fn contains_key_before_start() { + let t = rt(b"b", b"d", 10); + assert!(!t.contains_key(b"a")); + } + + #[test] + fn not_visible_at_equal() { + // Exclusive boundary: tombstone@10 is NOT visible at read_seqno=10 + let t = rt(b"a", b"z", 10); + assert!(!t.visible_at(10)); + } + + #[test] + fn visible_at_higher() { + let t = rt(b"a", b"z", 10); + assert!(t.visible_at(20)); + } + + #[test] + fn not_visible_at_lower() { + let t = rt(b"a", b"z", 10); + assert!(!t.visible_at(9)); + } + + #[test] + fn should_suppress_yes() { + let t = rt(b"b", b"d", 10); + // read_seqno=11 (exclusive: tombstone@10 visible at 11) + assert!(t.should_suppress(b"c", 5, 11)); + } + + #[test] + fn should_suppress_no_at_equal_seqno() { + let t = rt(b"b", b"d", 10); + // read_seqno=10: tombstone@10 NOT visible (exclusive boundary) + assert!(!t.should_suppress(b"c", 5, 10)); + } + + #[test] + fn should_suppress_no_newer_kv() { + let t = rt(b"b", b"d", 10); + assert!(!t.should_suppress(b"c", 15, 20)); + } + + #[test] + fn should_suppress_no_not_visible() { + let t = rt(b"b", b"d", 10); + assert!(!t.should_suppress(b"c", 5, 9)); + } + + #[test] + fn should_suppress_no_outside_range() { + let t = rt(b"b", b"d", 10); + assert!(!t.should_suppress(b"e", 5, 11)); + } + + #[test] + fn ordering_by_start_asc() { + let a = rt(b"a", b"z", 10); + let b = rt(b"b", b"z", 10); + assert!(a < b); + } + + #[test] + fn ordering_by_seqno_desc() { + let a = rt(b"a", b"z", 20); + let b = rt(b"a", b"z", 10); + assert!(a < b); // higher seqno comes first + } + + #[test] + fn ordering_by_end_asc_tiebreaker() { + let a = rt(b"a", b"m", 10); + let b = rt(b"a", b"z", 10); + assert!(a < b); + } + + #[test] + fn intersect_overlap() { + let t = rt(b"b", b"y", 10); + let clipped = t.intersect_opt(b"d", b"g").unwrap(); + assert_eq!(clipped.start.as_ref(), b"d"); + assert_eq!(clipped.end.as_ref(), b"g"); + assert_eq!(clipped.seqno, 10); + } + + #[test] + fn intersect_no_overlap() { + let t = rt(b"b", b"d", 10); + assert!(t.intersect_opt(b"e", b"g").is_none()); + } + + #[test] + fn intersect_partial_left() { + let t = rt(b"b", b"f", 10); + let clipped = t.intersect_opt(b"a", b"d").unwrap(); + assert_eq!(clipped.start.as_ref(), b"b"); + assert_eq!(clipped.end.as_ref(), b"d"); + } + + #[test] + fn intersect_partial_right() { + let t = rt(b"b", b"f", 10); + let clipped = t.intersect_opt(b"d", b"z").unwrap(); + assert_eq!(clipped.start.as_ref(), b"d"); + assert_eq!(clipped.end.as_ref(), b"f"); + } + + #[test] + fn fully_covers_yes() { + let t = rt(b"a", b"z", 10); + assert!(t.fully_covers(b"b", b"y")); + } + + #[test] + fn fully_covers_exact_start() { + let t = rt(b"a", b"z", 10); + assert!(t.fully_covers(b"a", b"y")); + } + + #[test] + fn fully_covers_no_end_equal() { + let t = rt(b"a", b"z", 10); + assert!(!t.fully_covers(b"a", b"z")); + } + + #[test] + fn fully_covers_no_start_before() { + let t = rt(b"b", b"z", 10); + assert!(!t.fully_covers(b"a", b"y")); + } + + #[test] + fn covering_rt_covers_table() { + let crt = CoveringRt { + start: UserKey::from(b"a" as &[u8]), + end: UserKey::from(b"z" as &[u8]), + seqno: 100, + }; + assert!(crt.covers_table(b"b", b"y", 50)); + } + + #[test] + fn covering_rt_no_cover_seqno_too_low() { + let crt = CoveringRt { + start: UserKey::from(b"a" as &[u8]), + end: UserKey::from(b"z" as &[u8]), + seqno: 50, + }; + assert!(!crt.covers_table(b"b", b"y", 100)); + } + + #[test] + fn upper_bound_exclusive_appends_zero() { + let key = b"hello"; + let result = upper_bound_exclusive(key).unwrap(); + assert_eq!(result.as_ref(), b"hello\x00"); + } + + #[test] + fn upper_bound_exclusive_max_length_non_max_key_has_successor() { + let key = vec![0xAA; usize::from(u16::MAX)]; + let successor = upper_bound_exclusive(&key).expect("non-max key should have successor"); + assert!(key.as_slice() < successor.as_ref()); + assert!(successor.len() <= usize::from(u16::MAX)); + } + + #[test] + fn upper_bound_exclusive_true_max_returns_none() { + let key = vec![0xFF; usize::from(u16::MAX)]; + assert!(upper_bound_exclusive(&key).is_none()); + } +} diff --git a/src/range_tombstone_filter.rs b/src/range_tombstone_filter.rs new file mode 100644 index 000000000..41e0b5d22 --- /dev/null +++ b/src/range_tombstone_filter.rs @@ -0,0 +1,262 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! Bidirectional range tombstone filter for iteration. +//! +//! Wraps a sorted KV stream and suppresses entries covered by range tombstones. +//! Forward: tombstones sorted by `(start asc, seqno desc)`, activated when +//! `start <= key`, expired when `end <= key`. +//! Reverse: tombstones sorted by `(end desc, seqno desc)`, activated when +//! `end > key`, expired when `key < start`. + +use crate::active_tombstone_set::{ActiveTombstoneSet, ActiveTombstoneSetReverse}; +use crate::range_tombstone::RangeTombstone; +use crate::{InternalValue, SeqNo}; + +/// Wraps a bidirectional KV stream and suppresses entries covered by range tombstones. +pub struct RangeTombstoneFilter { + inner: I, + + // Forward state + fwd_tombstones: Vec, + fwd_idx: usize, + fwd_active: ActiveTombstoneSet, + + // Reverse state + rev_tombstones: Vec, + rev_idx: usize, + rev_active: ActiveTombstoneSetReverse, +} + +impl RangeTombstoneFilter { + /// Creates a new bidirectional filter. + /// + /// `fwd_tombstones` need not be pre-sorted — the constructor sorts internally by natural Ord. + /// Internally, a second copy sorted by `(end desc, seqno desc)` is created for reverse. + #[must_use] + pub fn new(inner: I, mut fwd_tombstones: Vec, read_seqno: SeqNo) -> Self { + // Ensure forward tombstones are sorted by natural order (start asc, seqno desc, end asc) + fwd_tombstones.sort(); + + // Build reverse-sorted copy: (end desc, seqno desc) + let mut rev_tombstones = fwd_tombstones.clone(); + rev_tombstones.sort_by(|a, b| (&b.end, &b.seqno).cmp(&(&a.end, &a.seqno))); + + Self { + inner, + fwd_tombstones, + fwd_idx: 0, + fwd_active: ActiveTombstoneSet::new(read_seqno), + rev_tombstones, + rev_idx: 0, + rev_active: ActiveTombstoneSetReverse::new(read_seqno), + } + } + + /// Activates forward tombstones whose start <= `current_key`. + fn fwd_activate_up_to(&mut self, key: &[u8]) { + while let Some(rt) = self.fwd_tombstones.get(self.fwd_idx) { + if rt.start.as_ref() <= key { + self.fwd_active.activate(rt); + self.fwd_idx += 1; + } else { + break; + } + } + } + + /// Activates reverse tombstones whose end > `current_key`. + fn rev_activate_up_to(&mut self, key: &[u8]) { + while let Some(rt) = self.rev_tombstones.get(self.rev_idx) { + if rt.end.as_ref() > key { + self.rev_active.activate(rt); + self.rev_idx += 1; + } else { + break; + } + } + } +} + +impl>> Iterator for RangeTombstoneFilter { + type Item = crate::Result; + + fn next(&mut self) -> Option { + loop { + let item = self.inner.next()?; + + let Ok(kv) = &item else { return Some(item) }; + + let key = kv.key.user_key.as_ref(); + let kv_seqno = kv.key.seqno; + + // Activate tombstones whose start <= this key + self.fwd_activate_up_to(key); + + // Expire tombstones whose end <= this key + self.fwd_active.expire_until(key); + + // Check suppression + if self.fwd_active.is_suppressed(kv_seqno) { + continue; + } + + return Some(item); + } + } +} + +impl>> DoubleEndedIterator + for RangeTombstoneFilter +{ + fn next_back(&mut self) -> Option { + loop { + let item = self.inner.next_back()?; + + let Ok(kv) = &item else { return Some(item) }; + + let key = kv.key.user_key.as_ref(); + let kv_seqno = kv.key.seqno; + + // Activate tombstones whose end > this key (strict >) + self.rev_activate_up_to(key); + + // Expire tombstones whose start > this key (key < start) + self.rev_active.expire_until(key); + + // Check suppression + if self.rev_active.is_suppressed(kv_seqno) { + continue; + } + + return Some(item); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{UserKey, ValueType}; + + fn kv(key: &[u8], seqno: SeqNo) -> InternalValue { + InternalValue::from_components(key, b"v", seqno, ValueType::Value) + } + + fn rt(start: &[u8], end: &[u8], seqno: SeqNo) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + #[test] + fn items_no_tombstones_return_all() { + let items: Vec> = + vec![Ok(kv(b"a", 1)), Ok(kv(b"b", 2)), Ok(kv(b"c", 3))]; + + let filter = RangeTombstoneFilter::new(items.into_iter(), vec![], SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + assert_eq!(results.len(), 3); + } + + #[test] + fn items_with_range_tombstone_suppress_covered_keys() { + let items: Vec> = vec![ + Ok(kv(b"a", 5)), + Ok(kv(b"b", 5)), + Ok(kv(b"c", 5)), + Ok(kv(b"d", 5)), + Ok(kv(b"e", 5)), + ]; + + let tombstones = vec![rt(b"b", b"d", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + assert_eq!(keys, vec![b"a".as_ref(), b"d", b"e"]); + } + + #[test] + fn items_newer_than_tombstone_survive() { + let items: Vec> = vec![Ok(kv(b"b", 10)), Ok(kv(b"c", 3))]; + + let tombstones = vec![rt(b"a", b"z", 5)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + assert_eq!(keys, vec![b"b".as_ref()]); + } + + #[test] + fn range_end_exclusive_preserves_boundary_key() { + let items: Vec> = + vec![Ok(kv(b"b", 5)), Ok(kv(b"c", 5)), Ok(kv(b"d", 5))]; + + let tombstones = vec![rt(b"b", b"d", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + assert_eq!(keys, vec![b"d".as_ref()]); + } + + #[test] + fn overlapping_tombstones_suppress_union_of_ranges() { + let items: Vec> = vec![ + Ok(kv(b"a", 1)), + Ok(kv(b"b", 3)), + Ok(kv(b"c", 6)), + Ok(kv(b"d", 1)), + ]; + + let tombstones = vec![rt(b"a", b"c", 5), rt(b"b", b"e", 4)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + assert_eq!(keys, vec![b"c".as_ref()]); + } + + #[test] + fn tombstone_newer_than_read_seqno_not_visible() { + let items: Vec> = vec![Ok(kv(b"b", 3))]; + + let tombstones = vec![rt(b"a", b"z", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, 5); + let results: Vec<_> = filter.flatten().collect(); + + assert_eq!(results.len(), 1); + } + + #[test] + fn rev_items_with_range_tombstone_suppress_covered_keys() { + let items: Vec> = vec![ + Ok(kv(b"a", 5)), + Ok(kv(b"b", 5)), + Ok(kv(b"c", 5)), + Ok(kv(b"d", 5)), + Ok(kv(b"e", 5)), + ]; + + let tombstones = vec![rt(b"b", b"d", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.rev().flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + assert_eq!(keys, vec![b"e".as_ref(), b"d", b"a"]); + } + + #[test] + fn rev_range_end_exclusive_preserves_boundary_key() { + let items: Vec> = + vec![Ok(kv(b"a", 5)), Ok(kv(b"l", 5)), Ok(kv(b"m", 5))]; + + let tombstones = vec![rt(b"a", b"m", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.rev().flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + assert_eq!(keys, vec![b"m".as_ref()]); + } +} diff --git a/src/table/block/type.rs b/src/table/block/type.rs index 82eadf11b..1a90a0104 100644 --- a/src/table/block/type.rs +++ b/src/table/block/type.rs @@ -8,6 +8,7 @@ pub enum BlockType { Index, Filter, Meta, + RangeTombstone, } impl From for u8 { @@ -17,6 +18,7 @@ impl From for u8 { BlockType::Index => 1, BlockType::Filter => 2, BlockType::Meta => 3, + BlockType::RangeTombstone => 4, } } } @@ -30,6 +32,7 @@ impl TryFrom for BlockType { 1 => Ok(Self::Index), 2 => Ok(Self::Filter), 3 => Ok(Self::Meta), + 4 => Ok(Self::RangeTombstone), _ => Err(crate::Error::InvalidTag(("BlockType", value))), } } diff --git a/src/table/inner.rs b/src/table/inner.rs index 5611ca135..e59d47170 100644 --- a/src/table/inner.rs +++ b/src/table/inner.rs @@ -9,6 +9,7 @@ use super::{block_index::BlockIndexImpl, meta::ParsedMeta, regions::ParsedRegion use crate::{ cache::Cache, file_accessor::FileAccessor, + range_tombstone::RangeTombstone, table::{filter::block::FilterBlock, IndexBlock}, tree::inner::TreeId, Checksum, GlobalTableId, SeqNo, @@ -65,6 +66,9 @@ pub struct Inner { /// Cached sum of referenced blob file bytes for this table. /// Lazily computed on first access to avoid repeated I/O in compaction decisions. pub(crate) cached_blob_bytes: OnceLock, + + /// Range tombstones stored in this table. Loaded on open. + pub(crate) range_tombstones: Vec, } impl Inner { diff --git a/src/table/iter.rs b/src/table/iter.rs index b8114c2b5..53e4767c8 100644 --- a/src/table/iter.rs +++ b/src/table/iter.rs @@ -119,9 +119,13 @@ pub struct Iter { } impl Iter { - #[expect( - clippy::too_many_arguments, - reason = "cfg(metrics) adds an extra parameter" + // cfg_attr: expect only fires when metrics feature adds the extra parameter + #[cfg_attr( + feature = "metrics", + expect( + clippy::too_many_arguments, + reason = "metrics adds the extra parameter; without that feature this stays at the lint threshold" + ) )] pub fn new( table_id: GlobalTableId, diff --git a/src/table/mod.rs b/src/table/mod.rs index 10ac8a2eb..02a1fef42 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -31,6 +31,7 @@ use crate::{ cache::Cache, descriptor_table::DescriptorTable, file_accessor::FileAccessor, + range_tombstone::RangeTombstone, table::{ block::{BlockType, ParsedItem}, block_index::{BlockIndex, FullBlockIndex, TwoLevelBlockIndex, VolatileBlockIndex}, @@ -566,6 +567,23 @@ impl Table { None }; + // Load range tombstones (if present) + let range_tombstones = if let Some(rt_handle) = regions.range_tombstones { + log::trace!("Loading range tombstone block, with rt_ptr={rt_handle:?}"); + let block = Block::from_file(&file, rt_handle, crate::CompressionType::None)?; + + if block.header.block_type != BlockType::RangeTombstone { + return Err(crate::Error::InvalidTag(( + "BlockType", + block.header.block_type.into(), + ))); + } + + Self::decode_range_tombstones(&block)? + } else { + Vec::new() + }; + log::debug!( "Recovered table #{} from {}", metadata.id, @@ -598,6 +616,7 @@ impl Table { metrics, cached_blob_bytes: std::sync::OnceLock::new(), + range_tombstones, }))) } @@ -606,6 +625,86 @@ impl Table { self.0.checksum } + /// Decodes range tombstones from a raw block. + /// + /// Wire format (repeated): `[start_len:u16_le][start][end_len:u16_le][end][seqno:u64_le]` + /// + /// # Errors + /// + /// Will return `Err` if the block data is malformed. + fn decode_range_tombstones(block: &Block) -> crate::Result> { + use byteorder::{ReadBytesExt, LE}; + use std::io::{Cursor, Read}; + + let mut tombstones = Vec::new(); + let data = block.data.as_ref(); + let mut cursor = Cursor::new(data); + + #[expect( + clippy::cast_possible_truncation, + reason = "block size always fits in usize" + )] + while (cursor.position() as usize) < data.len() { + let start_len = cursor + .read_u16::() + .map_err(|_| crate::Error::Unrecoverable)? as usize; + + // Validate length against remaining data before allocating + let remaining = data.len() - cursor.position() as usize; + if start_len > remaining { + log::error!( + "Range tombstone block: start_len {start_len} exceeds remaining {remaining}" + ); + return Err(crate::Error::Unrecoverable); + } + + let mut start_buf = vec![0u8; start_len]; + cursor + .read_exact(&mut start_buf) + .map_err(|_| crate::Error::Unrecoverable)?; + + let end_len = cursor + .read_u16::() + .map_err(|_| crate::Error::Unrecoverable)? as usize; + + let remaining = data.len() - cursor.position() as usize; + if end_len > remaining { + log::error!( + "Range tombstone block: end_len {end_len} exceeds remaining {remaining}" + ); + return Err(crate::Error::Unrecoverable); + } + + let mut end_buf = vec![0u8; end_len]; + cursor + .read_exact(&mut end_buf) + .map_err(|_| crate::Error::Unrecoverable)?; + + let seqno = cursor + .read_u64::() + .map_err(|_| crate::Error::Unrecoverable)?; + + let start = UserKey::from(start_buf); + let end = UserKey::from(end_buf); + + // Validate invariant: start < end (reject corrupted data) + if start >= end { + log::error!("Range tombstone block: invalid interval (start >= end)"); + return Err(crate::Error::Unrecoverable); + } + + tombstones.push(RangeTombstone::new(start, end, seqno)); + } + + Ok(tombstones) + } + + /// Returns the range tombstones stored in this table. + #[must_use] + pub(crate) fn range_tombstones(&self) -> &[RangeTombstone] { + &self.0.range_tombstones + } + pub(crate) fn mark_as_deleted(&self) { self.0 .is_deleted diff --git a/src/table/multi_writer.rs b/src/table/multi_writer.rs index fefed7ddf..f860e97a6 100644 --- a/src/table/multi_writer.rs +++ b/src/table/multi_writer.rs @@ -4,8 +4,9 @@ use super::{filter::BloomConstructionPolicy, writer::Writer}; use crate::{ - blob_tree::handle::BlobIndirection, table::writer::LinkedFile, value::InternalValue, - vlog::BlobFileId, Checksum, CompressionType, HashMap, SequenceNumberCounter, TableId, UserKey, + blob_tree::handle::BlobIndirection, range_tombstone::RangeTombstone, table::writer::LinkedFile, + value::InternalValue, vlog::BlobFileId, Checksum, CompressionType, HashMap, + SequenceNumberCounter, TableId, UserKey, }; use std::path::PathBuf; @@ -46,6 +47,16 @@ pub struct MultiWriter { linked_blobs: HashMap, + /// Range tombstones to distribute across output tables. + /// During compaction these are clipped to each table's key range; + /// during flush they are written unmodified (they must cover keys in older SSTs). + range_tombstones: Vec, + + /// When true, range tombstones are clipped to each output table's KV key range + /// via `intersect_opt`. This is correct for compaction (input tables are consumed) + /// but wrong for flush (RTs must cover keys in older SSTs outside the memtable's range). + clip_range_tombstones: bool, + /// Level the tables are written to initial_level: u8, } @@ -91,9 +102,110 @@ impl MultiWriter { current_key: None, linked_blobs: HashMap::default(), + range_tombstones: Vec::new(), + clip_range_tombstones: false, }) } + /// Enables RT clipping: each tombstone is intersected with the output + /// table's KV key range. Use this for compaction where input tables are + /// consumed; do NOT use for flush where RTs must cover older SSTs. + #[must_use] + pub fn use_clip_range_tombstones(mut self) -> Self { + self.clip_range_tombstones = true; + self + } + + /// Sets range tombstones to be distributed across output tables. + pub fn set_range_tombstones(&mut self, tombstones: Vec) { + self.range_tombstones = tombstones; + } + + /// Writes range tombstones to the given writer, respecting the clip mode. + /// + /// - **clip=true** (compaction): intersect each RT with the table's KV key range. + /// - **clip=false** (flush): write all overlapping RTs unmodified so they cover + /// keys in older SSTs outside this memtable's key range. + fn write_rts_to_writer(tombstones: &[RangeTombstone], clip: bool, writer: &mut Writer) { + if let (Some(first_key), Some(last_key)) = + (writer.meta.first_key.clone(), writer.meta.last_key.clone()) + { + if clip { + // Compaction mode: clip RTs to this table's key range. + if let Some(max_exclusive) = + crate::range_tombstone::upper_bound_exclusive(last_key.as_ref()) + { + for rt in tombstones { + if let Some(clipped) = + rt.intersect_opt(first_key.as_ref(), max_exclusive.as_ref()) + { + writer.write_range_tombstone(clipped); + } + } + } else { + // `last_key` is the lexicographically maximal encodable user + // key, so there is no strict successor. In that case clip + // only on the lower bound and keep the persisted key_range + // unchanged; widening it during compaction would break the + // disjoint-run invariant that point reads rely on. + for rt in tombstones { + let clipped_start = if rt.start.as_ref() > first_key.as_ref() { + rt.start.as_ref() + } else { + first_key.as_ref() + }; + + if clipped_start < rt.end.as_ref() { + writer.write_range_tombstone(RangeTombstone::new( + UserKey::from(clipped_start), + rt.end.clone(), + rt.seqno, + )); + } + } + } + } else { + // Flush mode: write ALL RTs without clipping so they cover keys + // in older SSTs outside this memtable's key range. No overlap + // filter — an RT disjoint from this table's KV range (e.g., + // delete_range on keys only in older SSTs) must still be persisted. + // + // Conservatively widen key_range to include RT coverage so leveled + // compaction overlap selection can discover these RTs. Using rt.end + // (exclusive) as an inclusive upper bound over-approximates the + // actual KV max but does not lose entries. + for rt in tombstones { + match &mut writer.meta.first_key { + Some(existing) => { + if rt.start.as_ref() < existing.as_ref() { + *existing = rt.start.clone(); + } + } + None => { + writer.meta.first_key = Some(rt.start.clone()); + } + } + match &mut writer.meta.last_key { + Some(existing) => { + if rt.end.as_ref() > existing.as_ref() { + *existing = rt.end.clone(); + } + } + None => { + writer.meta.last_key = Some(rt.end.clone()); + } + } + writer.write_range_tombstone(rt.clone()); + } + } + } else { + // RT-only table (no KV items yet) — write all tombstones unclipped. + for rt in tombstones { + writer.write_range_tombstone(rt.clone()); + } + } + } + pub fn register_blob(&mut self, indirection: BlobIndirection) { self.linked_blobs .entry(indirection.vhandle.blob_file_id) @@ -201,6 +313,20 @@ impl MultiWriter { } let mut old_writer = std::mem::replace(&mut self.writer, new_writer); + old_writer.spill_block()?; + + // Write range tombstones to the finishing writer. + // In flush mode (clip=false) tombstones are written unmodified because + // they must cover keys in older SSTs outside this memtable's key range. + // In compaction mode (clip=true) tombstones are clipped to the output + // table's KV range because the input tables are consumed. + if !self.range_tombstones.is_empty() { + Self::write_rts_to_writer( + &self.range_tombstones, + self.clip_range_tombstones, + &mut old_writer, + ); + } for linked in self.linked_blobs.values() { old_writer.link_blob_file( @@ -240,6 +366,17 @@ impl MultiWriter { /// /// Returns the metadata of created tables pub fn finish(mut self) -> crate::Result> { + self.writer.spill_block()?; + + // Write range tombstones to the last writer (same logic as rotate). + if !self.range_tombstones.is_empty() { + Self::write_rts_to_writer( + &self.range_tombstones, + self.clip_range_tombstones, + &mut self.writer, + ); + } + for linked in self.linked_blobs.values() { self.writer.link_blob_file( linked.blob_file_id, diff --git a/src/table/regions.rs b/src/table/regions.rs index 5644d2f2b..da0a46f9b 100644 --- a/src/table/regions.rs +++ b/src/table/regions.rs @@ -31,7 +31,7 @@ fn toc_entry_to_handle(entry: &TocEntry) -> BlockHandle { /// |--------------| /// | filter | <- may not exist /// |--------------| -/// | ... | +/// | range_tomb | <- may not exist /// |--------------| /// | linked blobs | <- may not exist /// |--------------| @@ -47,6 +47,7 @@ pub struct ParsedRegions { pub index: Option, pub filter_tli: Option, pub filter: Option, + pub range_tombstones: Option, pub linked_blob_files: Option, pub metadata: BlockHandle, } @@ -64,6 +65,7 @@ impl ParsedRegions { })?, index: toc.section(b"index").map(toc_entry_to_handle), filter: toc.section(b"filter").map(toc_entry_to_handle), + range_tombstones: toc.section(b"range_tombstones").map(toc_entry_to_handle), linked_blob_files: toc.section(b"linked_blob_files").map(toc_entry_to_handle), metadata: toc .section(b"meta") diff --git a/src/table/util.rs b/src/table/util.rs index a85b31e2e..c4aa913aa 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -28,9 +28,13 @@ pub struct SliceIndexes(pub usize, pub usize); /// Loads a block from disk or block cache, if cached. /// /// Also handles file descriptor opening and caching. -#[expect( - clippy::too_many_arguments, - reason = "block loading requires many context parameters" +// cfg_attr: expect only fires when metrics feature adds the extra parameter +#[cfg_attr( + feature = "metrics", + expect( + clippy::too_many_arguments, + reason = "metrics adds the extra parameter; without that feature this stays at the lint threshold" + ) )] pub fn load_block( table_id: GlobalTableId, @@ -56,7 +60,9 @@ pub fn load_block( BlockType::Index => { metrics.index_block_load_cached.fetch_add(1, Relaxed); } - BlockType::Data | BlockType::Meta => { + // TODO(#34): RangeTombstone counted under data_block metrics — add + // dedicated range_tombstone_block_load_cached/miss counters + BlockType::Data | BlockType::Meta | BlockType::RangeTombstone => { metrics.data_block_load_cached.fetch_add(1, Relaxed); } } @@ -103,7 +109,8 @@ pub fn load_block( .index_block_io_requested .fetch_add(handle.size().into(), Relaxed); } - BlockType::Data | BlockType::Meta => { + // TODO(#34): same as above — RangeTombstone uses data_block IO counters + BlockType::Data | BlockType::Meta | BlockType::RangeTombstone => { metrics.data_block_load_io.fetch_add(1, Relaxed); metrics diff --git a/src/table/writer/mod.rs b/src/table/writer/mod.rs index f85d8c845..b0439ef80 100644 --- a/src/table/writer/mod.rs +++ b/src/table/writer/mod.rs @@ -14,6 +14,7 @@ use crate::{ checksum::{ChecksumType, ChecksummedWriter}, coding::Encode, file::fsync_directory, + range_tombstone::RangeTombstone, table::{ writer::{ filter::{FilterWriter, FullFilterWriter}, @@ -91,6 +92,9 @@ pub struct Writer { linked_blob_files: Vec, + /// Range tombstones to be written as a separate block + range_tombstones: Vec, + initial_level: u8, } @@ -140,6 +144,7 @@ impl Writer { previous_item: None, linked_blob_files: Vec::new(), + range_tombstones: Vec::new(), }) } @@ -233,6 +238,13 @@ impl Writer { self } + /// Adds a range tombstone to be written into this table's RT block. + pub(crate) fn write_range_tombstone(&mut self, rt: RangeTombstone) { + self.meta.lowest_seqno = self.meta.lowest_seqno.min(rt.seqno); + self.meta.highest_seqno = self.meta.highest_seqno.max(rt.seqno); + self.range_tombstones.push(rt); + } + /// Writes an item. /// /// # Note @@ -373,12 +385,83 @@ impl Writer { self.spill_block()?; - // No items written! Just delete table file and return nothing - if self.meta.item_count == 0 { + // No items and no range tombstones — delete the empty table file. + if self.meta.item_count == 0 && self.range_tombstones.is_empty() { std::fs::remove_file(&self.path)?; return Ok(None); } + // If we have range tombstones but no KV items, write a synthetic + // weak tombstone at the first RT's start key to produce a valid index. + // Preserve seqno bounds for real entries by saving/restoring metadata + // around the sentinel write. The sentinel uses the table's lowest RT + // seqno and should not influence user-visible metadata. + // Also ensure the table metadata key range covers all range tombstones. + if self.meta.item_count == 0 { + // Compute the coverage of all range tombstones. + let mut min_start: Option = None; + let mut max_end: Option = None; + let mut sentinel_start: Option = None; + let mut sentinel_seqno: Option = None; + for rt in &self.range_tombstones { + match &min_start { + None => min_start = Some(rt.start.clone()), + Some(cur_min) if rt.start < *cur_min => min_start = Some(rt.start.clone()), + _ => {} + } + match &max_end { + None => max_end = Some(rt.end.clone()), + Some(cur_max) if rt.end > *cur_max => max_end = Some(rt.end.clone()), + _ => {} + } + + match (sentinel_seqno, &sentinel_start) { + (None, _) => { + sentinel_seqno = Some(rt.seqno); + sentinel_start = Some(rt.start.clone()); + } + (Some(cur_seqno), Some(cur_start)) + if rt.seqno < cur_seqno + || (rt.seqno == cur_seqno && rt.start < *cur_start) => + { + sentinel_seqno = Some(rt.seqno); + sentinel_start = Some(rt.start.clone()); + } + _ => {} + } + } + + if let (Some(start), Some(end), Some(sentinel_key), Some(sentinel_seqno)) = + (min_start, max_end, sentinel_start, sentinel_seqno) + { + let saved_lo = self.meta.lowest_seqno; + let saved_hi = self.meta.highest_seqno; + + // Write a sentinel key to force index block creation in RT-only + // tables. The sentinel must use the start key of the same + // tombstone that contributes the lowest seqno; otherwise it can + // become visible at a key that is not yet covered by any visible + // range tombstone and incorrectly mask older values. + self.write(InternalValue::new_weak_tombstone( + sentinel_key, + sentinel_seqno, + ))?; + self.spill_block()?; + + // Restore seqno bounds — sentinel seqno is derived from RT + // metadata, not from user data, so it should not shift the + // table's seqno range. Item/key counts are NOT decremented: + // the sentinel IS an on-disk entry and counts must match + // actual block contents for consistency with recovery/tests. + self.meta.lowest_seqno = saved_lo; + self.meta.highest_seqno = saved_hi; + + // Ensure the table's key range covers all range tombstones. + self.meta.first_key = Some(start); + self.meta.last_key = Some(end); + } + } + // Write index log::trace!("Finishing index writer"); let index_block_count = self.index_writer.finish(&mut self.file_writer)?; @@ -387,6 +470,43 @@ impl Writer { log::trace!("Finishing filter writer"); let filter_block_count = self.filter_writer.finish(&mut self.file_writer)?; + // Write range tombstones block (if any) + if !self.range_tombstones.is_empty() { + use byteorder::{WriteBytesExt, LE}; + + self.file_writer.start("range_tombstones")?; + + // Wire format (repeated): [start_len:u16_le][start][end_len:u16_le][end][seqno:u64_le] + self.block_buffer.clear(); + for rt in &self.range_tombstones { + let start_len = u16::try_from(rt.start.len()).map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "range tombstone start key length exceeds u16::MAX", + ) + })?; + let end_len = u16::try_from(rt.end.len()).map_err(|_| { + std::io::Error::new( + std::io::ErrorKind::InvalidData, + "range tombstone end key length exceeds u16::MAX", + ) + })?; + + self.block_buffer.write_u16::(start_len)?; + self.block_buffer.extend_from_slice(&rt.start); + self.block_buffer.write_u16::(end_len)?; + self.block_buffer.extend_from_slice(&rt.end); + self.block_buffer.write_u64::(rt.seqno)?; + } + + Block::write_into( + &mut self.file_writer, + &self.block_buffer, + crate::table::block::BlockType::RangeTombstone, + CompressionType::None, + )?; + } + if !self.linked_blob_files.is_empty() { use byteorder::{WriteBytesExt, LE}; @@ -466,6 +586,10 @@ impl Writer { meta("key_count", &(self.meta.key_count as u64).to_le_bytes()), meta("prefix_truncation#data", &[1]), // NOTE: currently prefix truncation can not be disabled meta("prefix_truncation#index", &[1]), // NOTE: currently prefix truncation can not be disabled + meta( + "range_tombstone_count", + &(self.range_tombstones.len() as u64).to_le_bytes(), + ), meta( "restart_interval#data", &self.data_block_restart_interval.to_le_bytes(), diff --git a/src/tree/mod.rs b/src/tree/mod.rs index e63b61031..9f8979718 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -84,6 +84,8 @@ impl std::ops::Deref for Tree { } } +impl crate::abstract_tree::sealed::Sealed for Tree {} + impl AbstractTree for Tree { fn table_file_cache_size(&self) -> usize { self.config @@ -338,9 +340,10 @@ impl AbstractTree for Tree { .len() } - fn flush_to_tables( + fn flush_to_tables_with_rt( &self, stream: impl Iterator>, + range_tombstones: Vec, ) -> crate::Result, Option>)>> { use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter}; use std::time::Instant; @@ -396,6 +399,11 @@ impl AbstractTree for Tree { table_writer = table_writer.use_partitioned_filter(); } + // Set range tombstones BEFORE writing KV items so that if MultiWriter + // rotates to a new table during the write loop, earlier tables already + // carry the RT metadata. + table_writer.set_range_tombstones(range_tombstones); + for item in stream { table_writer.write(item?)?; } @@ -426,6 +434,9 @@ impl AbstractTree for Tree { }) .collect::>>()?; + // Return Some even when tables is empty (RT-only flush): the caller + // (AbstractTree::flush) handles empty tables by re-inserting RTs into + // the active memtable and still needs to delete sealed memtables. Ok(Some((tables, None))) } @@ -681,6 +692,20 @@ impl AbstractTree for Tree { let value = InternalValue::new_weak_tombstone(key, seqno); self.append_entry(value) } + + fn remove_range>(&self, start: K, end: K, seqno: SeqNo) -> u64 { + #[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")] + let memtable = Arc::clone( + &self + .version_history + .read() + .expect("lock is poisoned") + .latest_version() + .active_memtable, + ); + + memtable.insert_range_tombstone(start.into(), end.into(), seqno) + } } impl Tree { @@ -718,19 +743,95 @@ impl Tree { key: &[u8], seqno: SeqNo, ) -> crate::Result> { + // Search order: active → sealed → SST (newest first). A point + // tombstone in a newer source is authoritative — no older source + // can contain a newer value, so returning None is correct. if let Some(entry) = super_version.active_memtable.get(key, seqno) { - return Ok(ignore_tombstone_value(entry)); + let Some(entry) = ignore_tombstone_value(entry) else { + return Ok(None); + }; + + // Check if any range tombstone suppresses this entry + if Self::is_suppressed_by_range_tombstones(super_version, key, entry.key.seqno, seqno) { + return Ok(None); + } + return Ok(Some(entry)); } // Now look in sealed memtables if let Some(entry) = Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno) { - return Ok(ignore_tombstone_value(entry)); + let Some(entry) = ignore_tombstone_value(entry) else { + return Ok(None); + }; + + if Self::is_suppressed_by_range_tombstones(super_version, key, entry.key.seqno, seqno) { + return Ok(None); + } + return Ok(Some(entry)); } // Now look in tables... this may involve disk I/O - Self::get_internal_entry_from_tables(&super_version.version, key, seqno) + let entry = Self::get_internal_entry_from_tables(&super_version.version, key, seqno)?; + + if let Some(entry) = entry { + if Self::is_suppressed_by_range_tombstones(super_version, key, entry.key.seqno, seqno) { + return Ok(None); + } + return Ok(Some(entry)); + } + + Ok(None) + } + + /// Checks if a key at `key_seqno` is suppressed by any range tombstone + /// in the active memtable, sealed memtables, or SST tables, visible at `read_seqno`. + fn is_suppressed_by_range_tombstones( + super_version: &SuperVersion, + key: &[u8], + key_seqno: SeqNo, + read_seqno: SeqNo, + ) -> bool { + // Check active memtable range tombstones. + // Future optimization: skip lock when memtable has no RTs (atomic count). + if super_version + .active_memtable + .is_key_suppressed_by_range_tombstone(key, key_seqno, read_seqno) + { + return true; + } + + // Check sealed memtable range tombstones + for mt in super_version.sealed_memtables.iter().rev() { + if mt.is_key_suppressed_by_range_tombstone(key, key_seqno, read_seqno) { + return true; + } + } + + // Check SST table range tombstones. + // + // Flush/RT-only writes widen persisted table key ranges to include RT + // coverage, and compaction either clips RTs to the output table range + // or widens metadata in the inclusive-upper-bound fallback. That makes + // `metadata.key_range.contains_key(key)` a sound early reject here and + // avoids scanning RT blocks for unrelated SSTs on point reads. + for table in super_version + .version + .iter_levels() + .flat_map(|lvl| lvl.iter()) + .flat_map(|run| run.iter()) + .filter(|t| !t.range_tombstones().is_empty()) + .filter(|t| t.metadata.key_range.contains_key(key)) + { + for rt in table.range_tombstones() { + if rt.should_suppress(key, key_seqno, read_seqno) { + return true; + } + } + } + + false } fn get_internal_entry_from_tables( @@ -961,7 +1062,7 @@ impl Tree { let reader = sfa::Reader::new(&manifest_path)?; let manifest = Manifest::decode_from(&manifest_path, &reader)?; - if manifest.version != FormatVersion::V3 { + if !matches!(manifest.version, FormatVersion::V3 | FormatVersion::V4) { return Err(crate::Error::InvalidVersion(manifest.version.into())); } diff --git a/src/version/mod.rs b/src/version/mod.rs index 6586783c5..4abb70db2 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -350,11 +350,20 @@ impl Version { }) .collect::>(); - let mut runs = Vec::with_capacity(prev_runs.len() + 1); + let mut runs = Vec::with_capacity(prev_runs.len() + run.len()); - if let Some(run) = Run::new(run.to_vec()) { - runs.push(run); - } + // Start each freshly flushed table as its own run. `optimize_runs` + // will fuse them back together when their key ranges stay truly + // disjoint, but RT-bearing flush tables may intentionally widen + // their persisted key_range and must not be forced into a single + // run where `Run::get_for_key` assumes non-overlap. + runs.extend(run.iter().cloned().map(|table| { + let Some(run) = Run::new(vec![table]) else { + unreachable!("single-table run should never be empty"); + }; + + run + })); runs.extend(prev_runs); @@ -632,7 +641,7 @@ impl Version { // writer.start("format_version")?; - writer.write_u8(FormatVersion::V3.into())?; + writer.write_u8(FormatVersion::V4.into())?; writer.start("crate_version")?; writer.write_all(env!("CARGO_PKG_VERSION").as_bytes())?; diff --git a/tests/range_tombstone.rs b/tests/range_tombstone.rs new file mode 100644 index 000000000..82fac4c9e --- /dev/null +++ b/tests/range_tombstone.rs @@ -0,0 +1,1183 @@ +// Guard: trait import required for .key() method on iterator items (IterGuard trait) +use lsm_tree::{get_tmp_folder, AbstractTree, AnyTree, Config, Guard, SequenceNumberCounter}; +use test_log::test; + +fn open_tree(path: &std::path::Path) -> AnyTree { + Config::new( + path, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open() + .expect("should open") +} + +/// Helper to collect keys from a forward iterator. +/// Returns `Vec>` which compares correctly with `vec![b"a", b"b"]` +/// via Rust's `PartialEq` blanket impl for `Vec` where `T: PartialEq`. +fn collect_keys(tree: &AnyTree, seqno: u64) -> lsm_tree::Result>> { + let mut keys = Vec::new(); + for item in tree.iter(seqno, None) { + let k = item.key()?; + keys.push(k.to_vec()); + } + Ok(keys) +} + +/// Helper to collect keys from a bounded range iterator. +fn collect_range_keys(tree: &AnyTree, range: R, seqno: u64) -> lsm_tree::Result>> +where + R: std::ops::RangeBounds<&'static str>, +{ + let mut keys = Vec::new(); + for item in tree.range(range, seqno, None) { + let k = item.key()?; + keys.push(k.to_vec()); + } + Ok(keys) +} + +/// Helper to collect keys from a reverse iterator. +fn collect_keys_rev(tree: &AnyTree, seqno: u64) -> lsm_tree::Result>> { + let mut keys = Vec::new(); + for item in tree.iter(seqno, None).rev() { + let k = item.key()?; + keys.push(k.to_vec()); + } + Ok(keys) +} + +/// Helper to collect keys from a bounded reverse range iterator. +fn collect_range_keys_rev(tree: &AnyTree, range: R, seqno: u64) -> lsm_tree::Result>> +where + R: std::ops::RangeBounds<&'static str>, +{ + let mut keys = Vec::new(); + for item in tree.range(range, seqno, None).rev() { + let k = item.key()?; + keys.push(k.to_vec()); + } + Ok(keys) +} + +fn find_rt_table(tree: &AnyTree) -> lsm_tree::Table { + tree.current_version() + .iter_tables() + .find(|table| table.regions.range_tombstones.is_some()) + .expect("expected RT-bearing table") + .clone() +} + +// --- Test A: Point reads suppressed by memtable range tombstone --- +#[test] +fn range_tombstone_suppresses_point_read_in_memtable() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "val_a", 1); + tree.insert("b", "val_b", 2); + tree.insert("c", "val_c", 3); + tree.insert("d", "val_d", 4); + + // Range tombstone [b, d) at seqno 10 suppresses b and c + tree.remove_range("b", "d", 10); + + // a is outside range — visible + assert_eq!(Some("val_a".as_bytes().into()), tree.get("a", 11)?); + // b is inside range — suppressed + assert_eq!(None, tree.get("b", 11)?); + // c is inside range — suppressed + assert_eq!(None, tree.get("c", 11)?); + // d is at exclusive end — visible + assert_eq!(Some("val_d".as_bytes().into()), tree.get("d", 11)?); + + Ok(()) +} + +// --- Test B: Range tombstone respects MVCC --- +#[test] +fn range_tombstone_mvcc_visibility() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "val_a", 1); + tree.insert("b", "val_b", 2); + + // Range tombstone at seqno 10 + tree.remove_range("a", "z", 10); + + // Reading at seqno 5 — tombstone not visible (seqno 10 > 5) + assert_eq!(Some("val_a".as_bytes().into()), tree.get("a", 5)?); + assert_eq!(Some("val_b".as_bytes().into()), tree.get("b", 5)?); + + // Reading at seqno 11 — tombstone visible, values suppressed + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + + Ok(()) +} + +// --- Test C: Range tombstone does not suppress newer values --- +#[test] +fn range_tombstone_does_not_suppress_newer_values() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "old_a", 1); + tree.remove_range("a", "z", 5); + tree.insert("a", "new_a", 10); + + // new_a at seqno 10 is newer than tombstone at seqno 5 + assert_eq!(Some("new_a".as_bytes().into()), tree.get("a", 11)?); + + Ok(()) +} + +// --- Test D: Range iteration suppressed by range tombstone --- +#[test] +fn range_tombstone_suppresses_range_iteration() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.insert("d", "4", 4); + tree.insert("e", "5", 5); + + // Delete [b, d) at seqno 10 + tree.remove_range("b", "d", 10); + + let keys = collect_keys(&tree, 11)?; + assert_eq!(keys, vec![b"a", b"d", b"e"]); + + Ok(()) +} + +// --- Test E: Reverse iteration with range tombstone --- +#[test] +fn range_tombstone_suppresses_reverse_iteration() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.insert("d", "4", 4); + tree.insert("e", "5", 5); + + tree.remove_range("b", "d", 10); + + let keys = collect_keys_rev(&tree, 11)?; + assert_eq!(keys, vec![b"e", b"d", b"a"]); + + Ok(()) +} + +// --- Test F: Range tombstone in memtable suppresses SST data --- +#[test] +fn range_tombstone_suppresses_across_memtable_and_sst() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Insert data and flush to SST + tree.insert("a", "val_a", 1); + tree.insert("b", "val_b", 2); + tree.insert("c", "val_c", 3); + tree.flush_active_memtable(0)?; + + // Range tombstone in memtable suppresses SST data + tree.remove_range("a", "d", 10); + + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + assert_eq!(None, tree.get("c", 11)?); + + Ok(()) +} + +// --- Test G: Range tombstone in sealed memtable --- +#[test] +fn range_tombstone_in_sealed_memtable() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Insert range tombstone then seal the memtable + tree.remove_range("a", "z", 10); + assert!( + tree.rotate_memtable().is_some(), + "memtable with RT should seal" + ); + assert!(tree.sealed_memtable_count() > 0); + + // Insert new data in active memtable (lower seqno) + tree.insert("b", "val_b", 5); + + // b@5 is suppressed by sealed tombstone@10 + assert_eq!(None, tree.get("b", 11)?); + + // Insert newer data + tree.insert("b", "val_b_new", 15); + // b@15 survives (newer than tombstone@10) + assert_eq!(Some("val_b_new".as_bytes().into()), tree.get("b", 16)?); + + Ok(()) +} + +// --- Test H: remove_prefix --- +#[test] +fn remove_prefix_suppresses_matching_keys() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("user:1", "alice", 1); + tree.insert("user:2", "bob", 2); + tree.insert("user:3", "carol", 3); + tree.insert("order:1", "pizza", 4); + + // Delete all "user:" prefixed keys + tree.remove_prefix("user:", 10); + + assert_eq!(None, tree.get("user:1", 11)?); + assert_eq!(None, tree.get("user:2", 11)?); + assert_eq!(None, tree.get("user:3", 11)?); + // "order:" is not affected + assert_eq!(Some("pizza".as_bytes().into()), tree.get("order:1", 11)?); + + Ok(()) +} + +#[test] +fn remove_prefix_rejects_unbounded_prefix_without_partial_delete() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let path = folder.path(); + + { + let tree = open_tree(path); + + tree.insert(vec![0xFF], "ff", 1); + tree.insert(vec![0xFF, 0x01], "ff01", 2); + tree.insert("plain", "plain", 3); + + assert_eq!(0, tree.remove_prefix([], 10)); + assert_eq!(0, tree.remove_prefix(vec![0xFF], 11)); + + assert_eq!(Some("ff".as_bytes().into()), tree.get(vec![0xFF], 12)?); + assert_eq!( + Some("ff01".as_bytes().into()), + tree.get(vec![0xFF, 0x01], 12)? + ); + assert_eq!(Some("plain".as_bytes().into()), tree.get("plain", 12)?); + + tree.flush_active_memtable(0)?; + } + + { + let tree = open_tree(path); + + assert_eq!(Some("ff".as_bytes().into()), tree.get(vec![0xFF], 12)?); + assert_eq!( + Some("ff01".as_bytes().into()), + tree.get(vec![0xFF, 0x01], 12)? + ); + assert_eq!(Some("plain".as_bytes().into()), tree.get("plain", 12)?); + } + + Ok(()) +} + +// --- Test I: Overlapping range tombstones --- +#[test] +fn overlapping_range_tombstones() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.insert("d", "4", 4); + tree.insert("e", "5", 5); + + // Two overlapping tombstones + tree.remove_range("a", "c", 10); // [a, c) + tree.remove_range("b", "e", 15); // [b, e) + + // a: suppressed by [a,c)@10 + assert_eq!(None, tree.get("a", 20)?); + // b: suppressed by both + assert_eq!(None, tree.get("b", 20)?); + // c: suppressed by [b,e)@15 only + assert_eq!(None, tree.get("c", 20)?); + // d: suppressed by [b,e)@15 + assert_eq!(None, tree.get("d", 20)?); + // e: NOT suppressed (exclusive end of [b,e)) + assert_eq!(Some("5".as_bytes().into()), tree.get("e", 20)?); + + Ok(()) +} + +// --- Test J: Range iteration with sealed tombstone and SST data --- +#[test] +fn range_iteration_with_sealed_tombstone_and_sst_data() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Data in SST + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.insert("d", "4", 4); + tree.flush_active_memtable(0)?; + + // Range tombstone in sealed memtable + tree.remove_range("b", "d", 10); + assert!( + tree.rotate_memtable().is_some(), + "memtable with RT should seal" + ); + assert!(tree.sealed_memtable_count() > 0); + + // New data in active memtable + tree.insert("e", "5", 11); + + let keys = collect_keys(&tree, 12)?; + assert_eq!(keys, vec![b"a", b"d", b"e"]); + + Ok(()) +} + +// --- Test K: Range tombstone persists through flush to SST --- +#[test] +fn range_tombstone_persists_through_flush() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Insert data + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + + // Insert range tombstone in same memtable + tree.remove_range("a", "c", 10); + + // Flush everything to SST (both data and range tombstone) + tree.flush_active_memtable(0)?; + + // After flush: range tombstone should still suppress from SST + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + assert_eq!(Some("3".as_bytes().into()), tree.get("c", 11)?); // c is at exclusive end + + // Verify via range iteration too + let keys = collect_keys(&tree, 11)?; + assert_eq!(keys, vec![b"c"]); + + Ok(()) +} + +// --- Test K2: Range tombstone survives compaction --- +#[test] +fn range_tombstone_survives_compaction() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Batch 1: data + range tombstone in same memtable + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.insert("d", "4", 4); + tree.remove_range("b", "d", 10); + tree.flush_active_memtable(0)?; + + // Batch 2: more data to force a second table + tree.insert("e", "5", 11); + tree.flush_active_memtable(0)?; + + // Both tables in L0 — compact them + assert_eq!(2, tree.table_count()); + tree.major_compact(64_000_000, 0)?; + assert!(tree.table_count() <= 1, "major_compact should merge tables"); + + // After compaction, range tombstone should still suppress + assert_eq!(Some("1".as_bytes().into()), tree.get("a", 12)?); + assert_eq!(None, tree.get("b", 12)?); + assert_eq!(None, tree.get("c", 12)?); + assert_eq!(Some("4".as_bytes().into()), tree.get("d", 12)?); + assert_eq!(Some("5".as_bytes().into()), tree.get("e", 12)?); + + // Verify via iteration + let keys = collect_keys(&tree, 12)?; + assert_eq!(keys, vec![b"a", b"d", b"e"]); + + Ok(()) +} + +// --- Test L: Range tombstone persists through recovery --- +#[test] +fn range_tombstone_persists_through_recovery() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + { + let tree = open_tree(folder.path()); + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.remove_range("a", "c", 10); + tree.flush_active_memtable(0)?; + } + + // Reopen the tree — range tombstones should be recovered from SST + { + let tree = open_tree(folder.path()); + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + assert_eq!(Some("3".as_bytes().into()), tree.get("c", 11)?); + } + + Ok(()) +} + +#[test] +fn range_tombstone_tampered_rt_block_fails_recovery() -> lsm_tree::Result<()> { + use std::{ + fs::OpenOptions, + io::{Seek, SeekFrom, Write}, + }; + + let folder = get_tmp_folder(); + + { + let tree = open_tree(folder.path()); + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.remove_range("a", "c", 10); + tree.flush_active_memtable(0)?; + + let rt_table = find_rt_table(&tree); + let rt_handle = rt_table + .regions + .range_tombstones + .expect("expected range tombstone block"); + + let mut file = OpenOptions::new().write(true).open(&*rt_table.path)?; + let payload_pos = *rt_handle.offset() + + u64::try_from(lsm_tree::table::block::Header::serialized_len()) + .expect("header size should fit in u64"); + file.seek(SeekFrom::Start(payload_pos))?; + file.write_all(&u16::MAX.to_le_bytes())?; + file.flush()?; + } + + match Config::new( + folder.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open() + { + Err(lsm_tree::Error::ChecksumMismatch { .. }) | Err(lsm_tree::Error::Unrecoverable) => {} + Err(other) => panic!("expected ChecksumMismatch or Unrecoverable, got: {other:?}"), + Ok(_) => panic!("tampered RT block must fail recovery, not reopen successfully"), + } + + Ok(()) +} + +// --- Regression: RT-only table with sentinel survives recovery --- +// The sentinel WeakTombstone inflates item_count/tombstone_count metadata. +// Recovery must accept these tables without validation errors, and the +// RT must still suppress covered keys after reopen. +#[test] +fn rt_only_table_sentinel_survives_recovery() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let path = folder.path(); + + { + let tree = open_tree(path); + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.flush_active_memtable(0)?; + + // RT-only flush: sentinel written, counts inflated by +1 + tree.remove_range("a", "c", 10); + tree.flush_active_memtable(0)?; + } + + // Reopen — recovery must succeed despite sentinel in metadata + let tree = open_tree(path); + + // RT must still suppress after recovery + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + assert_eq!(Some("1".as_bytes().into()), tree.get("a", 5)?); + + Ok(()) +} + +// --- Test M: RT-only memtable flush creates a valid table --- +#[test] +fn range_tombstone_only_flush() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // First: insert data and flush + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.flush_active_memtable(0)?; + + let tables_before = tree.table_count(); + + // Second: insert only a range tombstone and flush + // RT-only flush writes synthetic sentinel tombstones to create a valid SST + tree.remove_range("a", "c", 10); + tree.flush_active_memtable(0)?; + + assert!( + tree.table_count() > tables_before, + "RT-only flush should produce a table with sentinel tombstones" + ); + + // The range tombstone in the SST should suppress + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + assert_eq!(Some("3".as_bytes().into()), tree.get("c", 11)?); + + Ok(()) +} + +// --- Test N: Bottom-level compaction must keep RT-deleted keys hidden --- +#[test] +fn range_tombstone_bottom_level_compaction_keeps_deleted_keys_hidden() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.remove_range("a", "d", 10); + tree.flush_active_memtable(0)?; + + // Before compaction: range tombstone suppresses all + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + + // Major compact with GC watermark ABOVE the tombstone seqno. + // We must not drop RTs yet because compaction does not physically remove + // all covered KVs based on RT coverage alone. + tree.major_compact(64_000_000, 11)?; + + // Deleted keys must stay hidden after bottom-level compaction. + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + + // Newer writes must still win over the older RT. + tree.insert("a", "new_a", 15); + assert_eq!(Some("new_a".as_bytes().into()), tree.get("a", 16)?); + + Ok(()) +} + +// --- Test O: Prefix iteration with range tombstone in SST --- +#[test] +fn range_tombstone_prefix_iteration_with_sst() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("user:1", "alice", 1); + tree.insert("user:2", "bob", 2); + tree.insert("user:3", "carol", 3); + tree.insert("order:1", "pizza", 4); + tree.remove_prefix("user:", 10); + tree.flush_active_memtable(0)?; + + // Prefix iteration over "user:" should yield nothing + let mut user_keys = Vec::new(); + for item in tree.prefix("user:", 11, None) { + let k = item.key()?; + user_keys.push(k.to_vec()); + } + assert!(user_keys.is_empty()); + + // Prefix iteration over "order:" should yield "order:1" + let mut order_keys = Vec::new(); + for item in tree.prefix("order:", 11, None) { + let k = item.key()?; + order_keys.push(k.to_vec()); + } + assert_eq!(order_keys, vec![b"order:1"]); + + Ok(()) +} + +// --- Test P: Compaction with MultiWriter rotation preserves RTs across tables --- +#[test] +fn range_tombstone_survives_compaction_with_rotation() -> lsm_tree::Result<()> { + use lsm_tree::config::CompressionPolicy; + use lsm_tree::CompressionType; + + let folder = get_tmp_folder(); + + // Disable compression: repetitive payloads compress too well under lz4, + // preventing MultiWriter rotation with the small 1 KiB target below. + let tree = Config::new( + folder.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .data_block_compression_policy(CompressionPolicy::all(CompressionType::None)) + .open()?; + + // Insert enough data to produce multiple tables on compaction + for i in 0u8..20 { + let key = format!("key_{i:03}"); + let val = "x".repeat(4000); + tree.insert(key.as_bytes(), val.as_bytes(), u64::from(i)); + } + // Range tombstone covering a subset + tree.remove_range("key_005", "key_015", 50); + tree.flush_active_memtable(0)?; + + // Force compaction with small target_size to trigger rotation + tree.major_compact(1024, 0)?; + assert!( + tree.table_count() > 1, + "compaction with 1 KiB target should produce multiple output tables" + ); + + // After compaction: keys inside [key_005, key_015) should be suppressed + assert_eq!(None, tree.get("key_005", 51)?); + assert_eq!(None, tree.get("key_010", 51)?); + assert_eq!(None, tree.get("key_014", 51)?); + + // Keys outside range should survive + assert!(tree.get("key_000", 51)?.is_some()); + assert!(tree.get("key_015", 51)?.is_some()); + assert!(tree.get("key_019", 51)?.is_some()); + + Ok(()) +} + +// --- Test Q: Table-skip optimization triggers for fully-covered tables --- +#[test] +fn range_tombstone_table_skip_optimization() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Create a table with keys a-c + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.flush_active_memtable(0)?; + + // Create a range tombstone that fully covers the table's key range + // with higher seqno than any key in the table + tree.remove_range("a", "d", 100); + tree.flush_active_memtable(0)?; + assert!( + tree.table_count() >= 2, + "table-skip regression should exercise SST-backed RT path" + ); + + // The table [a,c] is fully covered by [a,d)@100 (100 > max_seqno=3) + // Table-skip should allow skipping the entire table during iteration + let keys = collect_keys(&tree, 101)?; + assert!(keys.is_empty()); + + // Reverse iteration should also skip + let keys = collect_keys_rev(&tree, 101)?; + assert!(keys.is_empty()); + + Ok(()) +} + +// --- Test R: BlobTree range tombstone support --- +#[test] +fn range_tombstone_blob_tree() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + + let tree = Config::new( + folder.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .with_kv_separation(Some( + lsm_tree::KvSeparationOptions::default() + .separation_threshold(1) + .compression(lsm_tree::CompressionType::None), + )) + .open()?; + + tree.insert("a", "value_a", 1); + tree.insert("b", "value_b", 2); + tree.insert("c", "value_c", 3); + + // Range tombstone in BlobTree + tree.remove_range("a", "c", 10); + + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + assert_eq!(Some("value_c".as_bytes().into()), tree.get("c", 11)?); + + // Flush and verify persistence + tree.flush_active_memtable(0)?; + + assert_eq!(None, tree.get("a", 11)?); + assert_eq!(None, tree.get("b", 11)?); + assert_eq!(Some("value_c".as_bytes().into()), tree.get("c", 11)?); + + Ok(()) +} + +// --- Test S: Invalid interval silently returns 0 --- +#[test] +fn range_tombstone_invalid_interval() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "1", 1); + + // start >= end — should be silently ignored + let size = tree.remove_range("z", "a", 10); + assert_eq!(0, size); + + // Equal start and end — also invalid + let size = tree.remove_range("a", "a", 10); + assert_eq!(0, size); + + // Data should still be visible + assert_eq!(Some("1".as_bytes().into()), tree.get("a", 11)?); + + Ok(()) +} + +// --- Test T: Multiple compaction rounds preserve range tombstones --- +#[test] +fn range_tombstone_multiple_compaction_rounds() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Round 1: data + RT + flush + compact + tree.insert("a", "1", 1); + tree.insert("b", "2", 2); + tree.insert("c", "3", 3); + tree.remove_range("a", "c", 10); + tree.flush_active_memtable(0)?; + tree.major_compact(64_000_000, 0)?; + + // Round 2: add more data + flush + compact again + tree.insert("d", "4", 11); + tree.flush_active_memtable(0)?; + tree.major_compact(64_000_000, 0)?; + + // RT should survive both compaction rounds + assert_eq!(None, tree.get("a", 12)?); + assert_eq!(None, tree.get("b", 12)?); + assert_eq!(Some("3".as_bytes().into()), tree.get("c", 12)?); + assert_eq!(Some("4".as_bytes().into()), tree.get("d", 12)?); + + Ok(()) +} + +// --- Regression: RT-only table sentinel must not mask values in older SSTs --- +// When a memtable has only range tombstones (no KV data), flush produces an +// RT-only table with a synthetic sentinel at min(rt.start). If the sentinel +// seqno makes it visible before the RT's own seqno, point reads at intermediate +// snapshots incorrectly see a tombstone for that key, hiding real values in +// older tables. +#[test] +fn rt_only_sentinel_does_not_mask_older_values() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Older SST: real value at key "m" with seqno=5 + // Key "m" is chosen intentionally — it will be min(rt.start) for the RT below, + // so the sentinel key collides with this real value. + tree.insert("m", "real_value", 5); + tree.flush_active_memtable(0)?; + + // RT-only memtable: delete_range [m, z) at seqno=20. + // With a single RT, the sentinel uses that tombstone's start key "m". + tree.remove_range("m", "z", 20); + tree.flush_active_memtable(0)?; + + // Read at seqno=10: RT [m,z)@20 is NOT visible (20 > 10), so "m"@5 + // should be visible. The sentinel at ("m", sentinel_seqno) must NOT + // act as a tombstone that hides the real value when the RT itself + // is not yet visible. + assert_eq!( + Some("real_value".as_bytes().into()), + tree.get("m", 10)?, + "sentinel must not mask real value at key 'm' when RT is not yet visible" + ); + + // Read at seqno=21: RT [m,z)@20 IS visible → "m" suppressed by RT + assert_eq!(None, tree.get("m", 21)?); + + Ok(()) +} + +// --- Regression: sentinel key/seqno must come from the same tombstone --- +// If the sentinel key comes from min(rt.start) but the seqno comes from a +// different tombstone with a lower seqno, the sentinel can become visible at a +// key that is not yet covered by any visible RT and mask older data. +#[test] +fn rt_only_sentinel_uses_lowest_seqno_tombstones_start_key() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "real_value", 5); + tree.flush_active_memtable(0)?; + + // Two RTs in an RT-only flush: + // - [m, z) @20 is the earliest visible tombstone + // - [a, b) @30 provides the lexicographically smallest start key + // + // The sentinel must be written at "m"@20, not "a"@20. + tree.remove_range("a", "b", 30); + tree.remove_range("m", "z", 20); + tree.flush_active_memtable(0)?; + + // At seqno 25 only [m, z) is visible, so key "a" must remain visible. + assert_eq!(Some("real_value".as_bytes().into()), tree.get("a", 25)?); + + // Once [a, b) becomes visible, key "a" is suppressed as expected. + assert_eq!(None, tree.get("a", 31)?); + + Ok(()) +} + +// --- Test: RT disjoint from memtable KV range persists through flush --- +// Regression test: delete_range targeting keys only in older SSTs must not be +// dropped during flush just because it doesn't overlap the memtable's KV range. +#[test] +fn range_tombstone_disjoint_from_flush_kv_range() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Write keys [x, y, z] and flush to SST (older data) + tree.insert("x", "1", 1); + tree.insert("y", "2", 2); + tree.insert("z", "3", 3); + tree.flush_active_memtable(0)?; + + // Now write keys [a, b] + delete_range("x", "zz") in a new memtable. + // The RT is disjoint from the KV range [a, b] of this memtable. + tree.insert("a", "4", 4); + tree.insert("b", "5", 5); + tree.remove_range("x", "zz", 10); + tree.flush_active_memtable(0)?; + + // The RT must have survived flush and suppress [x, y, z] in the older SST + assert_eq!(Some("4".as_bytes().into()), tree.get("a", 11)?); + assert_eq!(Some("5".as_bytes().into()), tree.get("b", 11)?); + assert_eq!(None, tree.get("x", 11)?); + assert_eq!(None, tree.get("y", 11)?); + assert_eq!(None, tree.get("z", 11)?); + + Ok(()) +} + +// --- Regression: compaction clip must not drop RT covering gap between output tables --- +// major_compact merges all tables together so the gap scenario doesn't arise. +// Leveled compaction with overlap-based selection could produce a gap — tracked in #32. +#[test] +fn compaction_clip_preserves_rt_covering_gap_between_tables() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // L2: keys in the gap that the RT should suppress + tree.insert("m", "old_m", 1); + tree.insert("n", "old_n", 2); + tree.insert("o", "old_o", 3); + tree.flush_active_memtable(0)?; + tree.major_compact(64_000_000, 0)?; + + // L1: keys on both sides of the gap, plus RT covering the gap + tree.insert("a", "val_a", 10); + tree.insert("l", "val_l", 11); + tree.insert("q", "val_q", 12); + tree.insert("z", "val_z", 13); + tree.remove_range("m", "p", 20); + tree.flush_active_memtable(0)?; + + // Compact L1 → should produce tables covering [a,l] and [q,z] + // RT [m,p) must survive even though it falls in the gap + tree.major_compact(64_000_000, 0)?; + + // Keys in the gap must still be suppressed by RT + assert_eq!( + None, + tree.get("m", 21)?, + "RT [m,p)@20 must suppress 'm' after compaction" + ); + assert_eq!( + None, + tree.get("n", 21)?, + "RT [m,p)@20 must suppress 'n' after compaction" + ); + assert_eq!( + None, + tree.get("o", 21)?, + "RT [m,p)@20 must suppress 'o' after compaction" + ); + + // Keys outside the gap must be fine + assert_eq!(Some("val_a".as_bytes().into()), tree.get("a", 21)?); + assert_eq!(Some("val_q".as_bytes().into()), tree.get("q", 21)?); + + Ok(()) +} + +// Regression: flush must finalize the last buffered KV block before widening +// table metadata for RT coverage. Otherwise Writer::finish would overwrite the +// widened key_range with the buffered block's last KV key and later point reads +// could not soundly reject unrelated SSTs by metadata.key_range. +#[test] +fn range_tombstone_disjoint_flush_key_range_tracks_rt_coverage() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("x", "1", 1); + tree.flush_active_memtable(0)?; + + tree.insert("a", "2", 2); + tree.insert("b", "3", 3); + tree.remove_range("x", "zz", 10); + tree.flush_active_memtable(0)?; + + let rt_table = find_rt_table(&tree); + + assert!( + rt_table.metadata.key_range.contains_key(b"x"), + "RT-bearing table metadata must conservatively include RT coverage" + ); + assert_eq!(None, tree.get("x", 11)?); + + Ok(()) +} + +// --- Test: RT disjoint from KV range survives compaction --- +// Regression: disjoint RT (key range outside KV data) must survive +// multiple compaction rounds. Without key_range widening in flush mode, +// leveled compaction overlap selection would never pick up the table +// carrying the disjoint RT, leaving it permanently stuck. +#[test] +fn range_tombstone_disjoint_survives_multiple_compactions() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Older data in SST at low seqno + tree.insert("x", "1", 1); + tree.insert("y", "2", 2); + tree.flush_active_memtable(0)?; + + // New memtable: KV in [a, b], RT covering [x, z) — disjoint from KV + tree.insert("a", "3", 3); + tree.insert("b", "4", 4); + tree.remove_range("x", "z", 10); + tree.flush_active_memtable(0)?; + + // Multiple compaction rounds — RT must propagate through all of them + tree.major_compact(64_000_000, 0)?; + tree.major_compact(64_000_000, 0)?; + + // After two compaction rounds, disjoint RT must still suppress [x, y] + assert_eq!(Some("3".as_bytes().into()), tree.get("a", 11)?); + assert_eq!(Some("4".as_bytes().into()), tree.get("b", 11)?); + assert_eq!(None, tree.get("x", 11)?); + assert_eq!(None, tree.get("y", 11)?); + + // Also verify via range iteration + let keys = collect_keys(&tree, 11)?; + assert_eq!(keys, vec![b"a", b"b"]); + + Ok(()) +} + +#[test] +#[ignore = "allocates ~68 MiB to force MultiWriter rotation — run with --ignored"] +fn range_tombstone_multi_table_flush_keeps_newer_values_reachable() -> lsm_tree::Result<()> { + use lsm_tree::config::CompressionPolicy; + use lsm_tree::CompressionType; + + let folder = get_tmp_folder(); + // Disable compression: large repetitive payloads compress to almost nothing + // under lz4/zstd, preventing MultiWriter rotation and making the test flaky. + let tree = Config::new( + folder.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .data_block_compression_policy(CompressionPolicy::all(CompressionType::None)) + .open() + .expect("should open"); + + // Standard-tree flush uses a fixed 64 MiB MultiWriter target. Multiple + // large early values force rotation before the later "y" write, reproducing + // the widened-key-range bug on the actual flush path instead of a synthetic + // test-only configuration. Reuse one large buffer to keep the test lighter + // on CI memory. + let mi = 1_024 * 1_024; + let large_value = "a".repeat(17 * mi); + + tree.insert("a0", &large_value, 1); + tree.insert("a1", &large_value, 2); + tree.insert("b0", &large_value, 3); + tree.insert("b1", &large_value, 4); + tree.remove_range("x", "zz", 10); + tree.insert("y", "visible_newer", 20); + tree.flush_active_memtable(0)?; + + assert!(tree.table_count() > 1, "test requires a rotated flush"); + assert_eq!(Some("visible_newer".as_bytes().into()), tree.get("y", 21)?); + + Ok(()) +} + +#[test] +fn range_tombstone_suppresses_bulk_ingested_values() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + { + let mut ingestion = tree.ingestion()?; + ingestion.write("k", "old")?; + ingestion.finish()?; + } + + tree.remove_range("k", "l", 10); + tree.flush_active_memtable(0)?; + + assert_eq!(None, tree.get("k", 11)?); + + Ok(()) +} + +// After flush preserves the RT, compaction should merge it with the older SST +// and either suppress the keys or propagate the RT. +#[test] +fn range_tombstone_disjoint_survives_compaction() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Older data in SST + tree.insert("x", "1", 1); + tree.insert("y", "2", 2); + tree.flush_active_memtable(0)?; + + // New memtable: KV in [a, b], RT covering [x, z) — disjoint from KV + tree.insert("a", "3", 3); + tree.insert("b", "4", 4); + tree.remove_range("x", "z", 10); + tree.flush_active_memtable(0)?; + + // Compact everything + tree.major_compact(64_000_000, 0)?; + + // After compaction, [x, y] should still be suppressed + assert_eq!(Some("3".as_bytes().into()), tree.get("a", 11)?); + assert_eq!(Some("4".as_bytes().into()), tree.get("b", 11)?); + assert_eq!(None, tree.get("x", 11)?); + assert_eq!(None, tree.get("y", 11)?); + + Ok(()) +} + +// Regression: range iteration should only carry RTs that overlap the requested +// range. Narrow scans over untouched keys must keep returning those keys, while +// overlapping scans still honor the persisted RT. +#[test] +fn range_tombstone_narrow_range_queries_respect_overlap() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("x", "1", 1); + tree.insert("y", "2", 2); + tree.flush_active_memtable(0)?; + + tree.insert("a", "3", 3); + tree.insert("b", "4", 4); + tree.remove_range("x", "z", 10); + tree.flush_active_memtable(0)?; + + assert_eq!(collect_range_keys(&tree, "a"..="b", 11)?, vec![b"a", b"b"]); + assert_eq!( + collect_range_keys(&tree, "x"..="z", 11)?, + Vec::>::new() + ); + + Ok(()) +} + +#[test] +fn range_tombstone_memtable_narrow_range_queries_ignore_disjoint_rt() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("x", "1", 1); + tree.insert("y", "2", 2); + tree.remove_range("x", "z", 10); + + tree.insert("a", "3", 3); + tree.insert("b", "4", 4); + + assert_eq!(collect_range_keys(&tree, "a"..="b", 11)?, vec![b"a", b"b"]); + assert_eq!( + collect_range_keys(&tree, "x"..="z", 11)?, + Vec::>::new() + ); + assert_eq!( + collect_range_keys_rev(&tree, "a"..="b", 11)?, + vec![b"b", b"a"] + ); + assert_eq!( + collect_range_keys_rev(&tree, "x"..="z", 11)?, + Vec::>::new() + ); + + Ok(()) +} + +#[test] +fn range_tombstone_disjoint_survives_recovery_for_narrow_scans() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let path = folder.path(); + + { + let tree = open_tree(path); + + tree.insert("x", "1", 1); + tree.insert("y", "2", 2); + tree.flush_active_memtable(0)?; + + tree.insert("a", "3", 3); + tree.insert("b", "4", 4); + tree.remove_range("x", "z", 10); + tree.flush_active_memtable(0)?; + } + + let tree = open_tree(path); + + assert_eq!(Some("3".as_bytes().into()), tree.get("a", 11)?); + assert_eq!(Some("4".as_bytes().into()), tree.get("b", 11)?); + assert_eq!(None, tree.get("x", 11)?); + assert_eq!(None, tree.get("y", 11)?); + + assert_eq!(collect_range_keys(&tree, "a"..="b", 11)?, vec![b"a", b"b"]); + assert_eq!( + collect_range_keys(&tree, "x"..="z", 11)?, + Vec::>::new() + ); + assert_eq!( + collect_range_keys_rev(&tree, "a"..="b", 11)?, + vec![b"b", b"a"] + ); + assert_eq!( + collect_range_keys_rev(&tree, "x"..="z", 11)?, + Vec::>::new() + ); + + Ok(()) +} diff --git a/tests/tree_recovery_versions.rs b/tests/tree_recovery_versions.rs index b832f5554..700d893ee 100644 --- a/tests/tree_recovery_versions.rs +++ b/tests/tree_recovery_versions.rs @@ -1,6 +1,164 @@ +use byteorder::{LittleEndian, ReadBytesExt}; use lsm_tree::{get_tmp_folder, AbstractTree, Config, SequenceNumberCounter}; +use std::{ + fs::File, + io::{Seek, SeekFrom, Write}, + path::Path, +}; use test_log::test; +fn read_manifest_format_version(path: &Path) -> lsm_tree::Result { + // read_u64 takes &mut self, but calling it on an owned File from `?` is + // valid Rust — the compiler auto-borrows &mut on the owned temporary. + let curr_version_id = File::open(path.join("current"))?.read_u64::()?; + let manifest_path = path.join(format!("v{curr_version_id}")); + let reader = sfa::Reader::new(&manifest_path)?; + + #[expect( + clippy::expect_used, + reason = "test fixture should contain format_version" + )] + let section = reader + .toc() + .section(b"format_version") + .expect("format_version section should exist"); + + Ok(section.buf_reader(&manifest_path)?.read_u8()?) +} + +fn rewrite_manifest_format_version(path: &Path, version: u8) -> lsm_tree::Result<()> { + let curr_version_id = File::open(path.join("current"))?.read_u64::()?; + let manifest_path = path.join(format!("v{curr_version_id}")); + let reader = sfa::Reader::new(&manifest_path)?; + + #[expect( + clippy::expect_used, + reason = "test fixture should contain format_version" + )] + let section = reader + .toc() + .section(b"format_version") + .expect("format_version section should exist"); + + let mut file = std::fs::OpenOptions::new() + .write(true) + .open(&manifest_path)?; + file.seek(SeekFrom::Start(section.pos()))?; + file.write_all(&[version])?; + file.flush()?; + + Ok(()) +} + +#[test] +fn tree_writes_v4_manifest_and_recovers_it() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let path = folder.path(); + + { + let tree = Config::new( + path, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("a", "a", 0); + tree.flush_active_memtable(0)?; + + assert_eq!(4, read_manifest_format_version(path)?); + } + + { + let tree = Config::new( + path, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + assert_eq!(Some("a".as_bytes().into()), tree.get("a", 1)?); + assert_eq!(4, read_manifest_format_version(path)?); + } + + Ok(()) +} + +#[test] +fn tree_recovers_safe_v3_manifest() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let path = folder.path(); + + { + let tree = Config::new( + path, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("a", "a", 0); + tree.flush_active_memtable(0)?; + + assert_eq!(4, read_manifest_format_version(path)?); + rewrite_manifest_format_version(path, 3)?; + assert_eq!(3, read_manifest_format_version(path)?); + } + + { + let tree = Config::new( + path, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + assert_eq!(Some("a".as_bytes().into()), tree.get("a", 1)?); + assert_eq!(3, read_manifest_format_version(path)?); + } + + Ok(()) +} + +#[test] +fn tree_rejects_unsupported_manifest_version() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let path = folder.path(); + + { + let tree = Config::new( + path, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open()?; + + tree.insert("a", "a", 0); + tree.flush_active_memtable(0)?; + + assert_eq!(4, read_manifest_format_version(path)?); + rewrite_manifest_format_version(path, 99)?; + assert_eq!(99, read_manifest_format_version(path)?); + } + + let reopened = Config::new( + path, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open(); + + match reopened { + Err(lsm_tree::Error::InvalidVersion(v)) => { + assert_eq!(v, 99, "rejected version should match the tampered value"); + } + Err(other) => panic!("expected InvalidVersion(99), got: {other:?}"), + Ok(_) => panic!("unsupported manifest version must be rejected"), + } + + Ok(()) +} + #[test] #[ignore = "restore Version history maintenance"] fn tree_recovery_version_free_list() -> lsm_tree::Result<()> {