diff --git a/src/abstract.rs b/src/abstract.rs index 1bcd487ff..fd5095c18 100644 --- a/src/abstract.rs +++ b/src/abstract.rs @@ -98,9 +98,15 @@ pub trait AbstractTree { ); let stream = CompactionStream::new(merger, seqno_threshold); + // Collect range tombstones from sealed memtables + let mut range_tombstones = Vec::new(); + for mt in latest.sealed_memtables.iter() { + range_tombstones.extend(mt.range_tombstones_by_start()); + } + drop(version_history); - if let Some((tables, blob_files)) = self.flush_to_tables(stream)? { + if let Some((tables, blob_files)) = self.flush_to_tables(stream, range_tombstones)? { self.register_tables( &tables, blob_files.as_deref(), @@ -216,6 +222,7 @@ pub trait AbstractTree { fn flush_to_tables( &self, stream: impl Iterator>, + range_tombstones: Vec, ) -> crate::Result>; /// Atomically registers flushed tables into the tree, removing their associated sealed memtables. @@ -598,4 +605,9 @@ pub trait AbstractTree { /// Will return `Err` if an IO error occurs. #[doc(hidden)] fn remove_weak>(&self, key: K, seqno: SeqNo) -> (u64, u64); + + /// Inserts a range tombstone that suppresses all keys in `[start, end)` with + /// sequence numbers strictly less than `seqno`. + #[doc(hidden)] + fn remove_range>(&self, start: K, end: K, seqno: SeqNo); } diff --git a/src/active_tombstone_set.rs b/src/active_tombstone_set.rs new file mode 100644 index 000000000..8512d8a97 --- /dev/null +++ b/src/active_tombstone_set.rs @@ -0,0 +1,403 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! Active tombstone sets for tracking range tombstones during iteration. +//! +//! During forward or reverse scans, range tombstones must be activated when +//! the scan enters their range and expired when it leaves. These sets use +//! a seqno multiset (`BTreeMap`) for O(log t) max-seqno queries, +//! and a heap for efficient expiry tracking. +//! +//! A unique monotonic `id` on each heap entry ensures total ordering in the +//! heap (no equality on the tuple), which makes expiry deterministic. + +use crate::{range_tombstone::RangeTombstone, SeqNo, UserKey}; +use std::cmp::Reverse; +use std::collections::{BTreeMap, BinaryHeap}; + +/// Tracks active range tombstones during forward iteration. +/// +/// Tombstones are activated when the scan reaches their `start` key, and +/// expired when the scan reaches or passes their `end` key. +/// +/// Uses a min-heap (via `Reverse`) keyed by `(end, id, seqno)` so the +/// tombstone expiring soonest (smallest `end`) is at the top. +pub struct ActiveTombstoneSet { + seqno_counts: BTreeMap, + pending_expiry: BinaryHeap>, + cutoff_seqno: SeqNo, + next_id: u64, +} + +impl ActiveTombstoneSet { + /// Creates a new forward active tombstone set. + /// + /// Only tombstones with `seqno <= cutoff_seqno` will be activated. + #[must_use] + pub fn new(cutoff_seqno: SeqNo) -> Self { + Self { + seqno_counts: BTreeMap::new(), + pending_expiry: BinaryHeap::new(), + cutoff_seqno, + next_id: 0, + } + } + + /// Activates a range tombstone, adding it to the active set. + /// + /// The tombstone is only activated if it is visible at the cutoff seqno + /// (i.e., `rt.seqno <= cutoff_seqno`). Duplicate activations (same seqno + /// from different sources) are handled correctly via multiset accounting. + pub fn activate(&mut self, rt: &RangeTombstone) { + if !rt.visible_at(self.cutoff_seqno) { + return; + } + let id = self.next_id; + self.next_id += 1; + *self.seqno_counts.entry(rt.seqno).or_insert(0) += 1; + self.pending_expiry + .push(Reverse((rt.end.clone(), id, rt.seqno))); + } + + /// Expires tombstones whose `end <= current_key`. + /// + /// In the half-open convention `[start, end)`, a tombstone stops covering + /// keys at `end`. So when `current_key >= end`, the tombstone no longer + /// applies and is removed from the active set. + /// + /// # Panics + /// + /// Panics if an expiry pop has no matching activation in the seqno multiset. + pub fn expire_until(&mut self, current_key: &[u8]) { + while let Some(Reverse((ref end, _, seqno))) = self.pending_expiry.peek() { + if current_key >= end.as_ref() { + let seqno = *seqno; + self.pending_expiry.pop(); + #[expect( + clippy::expect_used, + reason = "expiry pop must have matching activation" + )] + let count = self + .seqno_counts + .get_mut(&seqno) + .expect("expiry pop must have matching activation"); + *count -= 1; + if *count == 0 { + self.seqno_counts.remove(&seqno); + } + } else { + break; + } + } + } + + /// Returns the highest seqno among all active tombstones, or `None` if + /// no tombstones are active. + #[must_use] + pub fn max_active_seqno(&self) -> Option { + self.seqno_counts.keys().next_back().copied() + } + + /// Returns `true` if a KV with the given seqno is suppressed by any + /// active tombstone (i.e., there exists an active tombstone with a + /// higher seqno). + #[must_use] + pub fn is_suppressed(&self, key_seqno: SeqNo) -> bool { + self.max_active_seqno().is_some_and(|max| key_seqno < max) + } + + /// Bulk-activates tombstones at a seek position. + /// + /// # Invariant + /// + /// At any iterator position, the active set contains only tombstones + /// where `start <= current_key < end` (and visible at `cutoff_seqno`). + /// Seek prefill must collect truly overlapping tombstones + /// (`start <= key < end`); `expire_until` immediately enforces the + /// `end` bound. + pub fn initialize_from(&mut self, tombstones: impl IntoIterator) { + for rt in tombstones { + self.activate(&rt); + } + } + + /// Returns `true` if there are no active tombstones. + #[must_use] + pub fn is_empty(&self) -> bool { + self.seqno_counts.is_empty() + } +} + +/// Tracks active range tombstones during reverse iteration. +/// +/// During reverse scans, tombstones are activated when the scan reaches +/// a key < `end` (strict `>`: `rt.end > current_key`), and expired when +/// `current_key < rt.start`. +/// +/// Uses a max-heap keyed by `(start, id, seqno)` so the tombstone +/// expiring soonest (largest `start`) is at the top. +/// +/// # Future extension: cache invalidation +/// +/// If a per-table `tombstone_global_max_end` cache is ever added for +/// reverse init bounding, it must be invalidated on block reload +/// (rare, but necessary for correctness if blocks can be evicted +/// and reloaded with different contents during the lifetime of a +/// `SuperVersion`). +pub struct ActiveTombstoneSetReverse { + seqno_counts: BTreeMap, + pending_expiry: BinaryHeap<(UserKey, u64, SeqNo)>, + cutoff_seqno: SeqNo, + next_id: u64, +} + +impl ActiveTombstoneSetReverse { + /// Creates a new reverse active tombstone set. + /// + /// Only tombstones with `seqno <= cutoff_seqno` will be activated. + #[must_use] + pub fn new(cutoff_seqno: SeqNo) -> Self { + Self { + seqno_counts: BTreeMap::new(), + pending_expiry: BinaryHeap::new(), + cutoff_seqno, + next_id: 0, + } + } + + /// Activates a range tombstone, adding it to the active set. + /// + /// The tombstone is only activated if it is visible at the cutoff seqno + /// (i.e., `rt.seqno <= cutoff_seqno`). Duplicate activations (same seqno + /// from different sources) are handled correctly via multiset accounting. + /// + /// For reverse iteration, activation uses strict `>`: tombstones with + /// `rt.end > current_key` are activated. `key == end` is NOT covered + /// (half-open). + pub fn activate(&mut self, rt: &RangeTombstone) { + if !rt.visible_at(self.cutoff_seqno) { + return; + } + let id = self.next_id; + self.next_id += 1; + *self.seqno_counts.entry(rt.seqno).or_insert(0) += 1; + self.pending_expiry.push((rt.start.clone(), id, rt.seqno)); + } + + /// Expires tombstones whose `start > current_key`. + /// + /// During reverse iteration, when the scan moves to a key that is + /// before a tombstone's start, that tombstone no longer applies. + /// + /// # Panics + /// + /// Panics if an expiry pop has no matching activation in the seqno multiset. + pub fn expire_until(&mut self, current_key: &[u8]) { + while let Some((ref start, _, seqno)) = self.pending_expiry.peek() { + if current_key < start.as_ref() { + let seqno = *seqno; + self.pending_expiry.pop(); + #[expect( + clippy::expect_used, + reason = "expiry pop must have matching activation" + )] + let count = self + .seqno_counts + .get_mut(&seqno) + .expect("expiry pop must have matching activation"); + *count -= 1; + if *count == 0 { + self.seqno_counts.remove(&seqno); + } + } else { + break; + } + } + } + + /// Returns the highest seqno among all active tombstones, or `None` if + /// no tombstones are active. + #[must_use] + pub fn max_active_seqno(&self) -> Option { + self.seqno_counts.keys().next_back().copied() + } + + /// Returns `true` if a KV with the given seqno is suppressed by any + /// active tombstone (i.e., there exists an active tombstone with a + /// higher seqno). + #[must_use] + pub fn is_suppressed(&self, key_seqno: SeqNo) -> bool { + self.max_active_seqno().is_some_and(|max| key_seqno < max) + } + + /// Bulk-activates tombstones at a seek position (for reverse). + pub fn initialize_from(&mut self, tombstones: impl IntoIterator) { + for rt in tombstones { + self.activate(&rt); + } + } + + /// Returns `true` if there are no active tombstones. + #[must_use] + pub fn is_empty(&self) -> bool { + self.seqno_counts.is_empty() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::UserKey; + + fn rt(start: &[u8], end: &[u8], seqno: SeqNo) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + // ──── Forward tests ──── + + #[test] + fn forward_activate_and_suppress() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + assert!(set.is_suppressed(5)); + assert!(!set.is_suppressed(10)); + assert!(!set.is_suppressed(15)); + } + + #[test] + fn forward_expire_at_end() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + assert!(set.is_suppressed(5)); + set.expire_until(b"m"); // key == end, tombstone expires + assert!(!set.is_suppressed(5)); + } + + #[test] + fn forward_expire_past_end() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.expire_until(b"z"); + assert!(set.is_empty()); + } + + #[test] + fn forward_not_expired_before_end() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.expire_until(b"l"); + assert!(set.is_suppressed(5)); // still active + } + + #[test] + fn forward_invisible_tombstone_not_activated() { + let mut set = ActiveTombstoneSet::new(5); + set.activate(&rt(b"a", b"m", 10)); // seqno 10 > cutoff 5 + assert!(!set.is_suppressed(1)); + assert!(set.is_empty()); + } + + #[test] + fn forward_multiple_tombstones_max_seqno() { + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.activate(&rt(b"b", b"n", 20)); + assert_eq!(set.max_active_seqno(), Some(20)); + assert!(set.is_suppressed(15)); // 15 < 20 + } + + #[test] + fn forward_duplicate_end_seqno_accounting() { + // Test H: Two tombstones with same end + seqno + let mut set = ActiveTombstoneSet::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.activate(&rt(b"b", b"m", 10)); + assert_eq!(set.max_active_seqno(), Some(10)); + + // Expire at "m" — both should be removed + set.expire_until(b"m"); + assert_eq!(set.max_active_seqno(), None); + assert!(set.is_empty()); + } + + #[test] + fn forward_initialize_from() { + let mut set = ActiveTombstoneSet::new(100); + set.initialize_from(vec![rt(b"a", b"m", 10), rt(b"b", b"z", 20)]); + assert_eq!(set.max_active_seqno(), Some(20)); + } + + #[test] + fn forward_initialize_and_expire() { + let mut set = ActiveTombstoneSet::new(100); + set.initialize_from(vec![rt(b"a", b"d", 10), rt(b"b", b"f", 20)]); + set.expire_until(b"e"); // expires [a,d) but not [b,f) + assert_eq!(set.max_active_seqno(), Some(20)); + set.expire_until(b"f"); // expires [b,f) + assert!(set.is_empty()); + } + + // ──── Reverse tests ──── + + #[test] + fn reverse_activate_and_suppress() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"a", b"m", 10)); + assert!(set.is_suppressed(5)); + assert!(!set.is_suppressed(10)); + } + + #[test] + fn reverse_expire_before_start() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"d", b"m", 10)); + + // Key before start — tombstone expires + set.expire_until(b"c"); + assert!(set.is_empty()); + } + + #[test] + fn reverse_not_expired_at_start() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"d", b"m", 10)); + + // Key == start — tombstone still active (key is covered) + set.expire_until(b"d"); + assert!(set.is_suppressed(5)); + } + + #[test] + fn reverse_invisible_tombstone_not_activated() { + let mut set = ActiveTombstoneSetReverse::new(5); + set.activate(&rt(b"a", b"m", 10)); + assert!(set.is_empty()); + } + + #[test] + fn reverse_duplicate_end_seqno_accounting() { + // Symmetric to forward Test H + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"d", b"m", 10)); + set.activate(&rt(b"d", b"n", 10)); // same start + seqno + assert_eq!(set.max_active_seqno(), Some(10)); + + // Expire before start + set.expire_until(b"c"); + assert_eq!(set.max_active_seqno(), None); + assert!(set.is_empty()); + } + + #[test] + fn reverse_multiple_tombstones() { + let mut set = ActiveTombstoneSetReverse::new(100); + set.activate(&rt(b"a", b"m", 10)); + set.activate(&rt(b"f", b"z", 20)); + assert_eq!(set.max_active_seqno(), Some(20)); + + // Moving to 'e' expires [f,z) but keeps [a,m) + set.expire_until(b"e"); + assert_eq!(set.max_active_seqno(), Some(10)); + } +} diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index a2c793213..9da1e9e3e 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -335,6 +335,7 @@ impl AbstractTree for BlobTree { fn flush_to_tables( &self, stream: impl Iterator>, + range_tombstones: Vec, ) -> crate::Result, Option>)>> { use crate::{ coding::Encode, file::BLOBS_FOLDER, file::TABLES_FOLDER, @@ -393,6 +394,9 @@ impl AbstractTree for BlobTree { table_writer = table_writer.use_partitioned_filter(); } + // Pass range tombstones to be written into all output tables + table_writer.add_range_tombstones(range_tombstones); + #[expect( clippy::expect_used, reason = "cannot create blob tree without defining kv separation options" @@ -622,4 +626,8 @@ impl AbstractTree for BlobTree { fn remove_weak>(&self, key: K, seqno: SeqNo) -> (u64, u64) { self.index.remove_weak(key, seqno) } + + fn remove_range>(&self, start: K, end: K, seqno: SeqNo) { + self.index.remove_range(start, end, seqno) + } } diff --git a/src/compaction/flavour.rs b/src/compaction/flavour.rs index b3d11815a..078e40c45 100644 --- a/src/compaction/flavour.rs +++ b/src/compaction/flavour.rs @@ -8,6 +8,7 @@ use crate::coding::{Decode, Encode}; use crate::compaction::worker::Options; use crate::compaction::Input as CompactionPayload; use crate::file::TABLES_FOLDER; +use crate::range_tombstone::RangeTombstone; use crate::table::multi_writer::MultiWriter; use crate::version::{SuperVersions, Version}; use crate::vlog::blob_file::scanner::ScanEntry; @@ -120,6 +121,8 @@ pub(super) fn prepare_table_writer( pub(super) trait CompactionFlavour { fn write(&mut self, item: InternalValue) -> crate::Result<()>; + fn add_range_tombstones(&mut self, tombstones: Vec); + #[warn(clippy::too_many_arguments)] fn finish( self: Box, @@ -163,6 +166,10 @@ impl RelocatingCompaction { } impl CompactionFlavour for RelocatingCompaction { + fn add_range_tombstones(&mut self, tombstones: Vec) { + self.inner.table_writer.add_range_tombstones(tombstones); + } + fn write(&mut self, item: InternalValue) -> crate::Result<()> { if item.key.value_type.is_indirection() { let mut reader = &item.value[..]; @@ -360,6 +367,10 @@ impl StandardCompaction { } impl CompactionFlavour for StandardCompaction { + fn add_range_tombstones(&mut self, tombstones: Vec) { + self.table_writer.add_range_tombstones(tombstones); + } + fn write(&mut self, item: InternalValue) -> crate::Result<()> { let indirection = if item.key.value_type.is_indirection() { Some({ diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index 0e7db6506..ebc595cb9 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -391,6 +391,31 @@ fn merge_tables( let table_writer = super::flavour::prepare_table_writer(¤t_super_version.version, opts, payload)?; + // Collect range tombstones from all input tables + let mut range_tombstones = Vec::new(); + for table in &tables { + if let Ok(tombstones) = table.range_tombstones_by_start_iter() { + range_tombstones.extend(tombstones); + } + } + + // Evict range tombstones at the last level if they are below the GC watermark + if is_last_level { + range_tombstones.retain(|rt| rt.seqno >= opts.mvcc_gc_watermark); + } + + // Deduplicate range tombstones: remove tombstones fully covered by a + // retained tombstone with equal or higher seqno. + // With Ord (start asc, seqno desc, end asc), consecutive pairs where + // the retained element (b) has start <= candidate (a) and end >= a + // and seqno >= a means a is fully redundant. + range_tombstones.sort(); + range_tombstones.dedup_by(|a, b| { + b.start.as_ref() <= a.start.as_ref() + && b.end.as_ref() >= a.end.as_ref() + && b.seqno >= a.seqno + }); + let start = Instant::now(); let mut compactor = match &opts.config.kv_separation_opts { @@ -457,6 +482,11 @@ fn merge_tables( // IMPORTANT: Unlock exclusive compaction lock as we are now doing the actual (CPU-intensive) compaction drop(compaction_state); + // Pass collected range tombstones to the output tables + if !range_tombstones.is_empty() { + compactor.add_range_tombstones(range_tombstones); + } + hidden_guard(payload, opts, || { for (idx, item) in merge_iter.enumerate() { let item = item?; diff --git a/src/error.rs b/src/error.rs index 530da2a56..ae234e2bd 100644 --- a/src/error.rs +++ b/src/error.rs @@ -40,6 +40,9 @@ pub enum Error { /// UTF-8 error Utf8(std::str::Utf8Error), + + /// Invalid or corrupt block data + InvalidBlock(&'static str), } impl std::fmt::Display for Error { diff --git a/src/lib.rs b/src/lib.rs index baf45da79..a0e744fbb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -67,6 +67,9 @@ macro_rules! unwrap { pub(crate) use unwrap; +/// Active tombstone set for tracking range tombstones during iteration +pub mod active_tombstone_set; + mod any_tree; mod r#abstract; @@ -124,6 +127,11 @@ mod path; #[doc(hidden)] pub mod range; +/// Range tombstone types for deleting key ranges +pub mod range_tombstone; + +pub(crate) mod range_tombstone_filter; + #[doc(hidden)] pub mod table; @@ -182,6 +190,7 @@ pub use { iter_guard::IterGuard as Guard, memtable::{Memtable, MemtableId}, r#abstract::AbstractTree, + range_tombstone::{CoveringRt, RangeTombstone}, seqno::SequenceNumberCounter, slice::Slice, tree::Tree, diff --git a/src/memtable/interval_tree.rs b/src/memtable/interval_tree.rs new file mode 100644 index 000000000..9c09bb05f --- /dev/null +++ b/src/memtable/interval_tree.rs @@ -0,0 +1,513 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! AVL-balanced interval tree for efficient range tombstone queries in memtables. +//! +//! Keyed by `start`, augmented with `subtree_max_end`, `subtree_max_seqno`, +//! and `subtree_min_seqno` for pruning during queries. + +use crate::range_tombstone::{CoveringRt, RangeTombstone}; +use crate::{SeqNo, UserKey}; +use std::cmp::Ordering; + +/// An AVL-balanced BST keyed by range tombstone `start`, augmented with +/// subtree-level metadata for efficient interval queries. +pub struct IntervalTree { + root: Option>, + len: usize, +} + +struct Node { + tombstone: RangeTombstone, + + // AVL metadata + height: i32, + left: Option>, + right: Option>, + + // Augmented metadata + subtree_max_end: UserKey, + subtree_max_seqno: SeqNo, + subtree_min_seqno: SeqNo, +} + +impl Node { + fn new(tombstone: RangeTombstone) -> Self { + let subtree_max_end = tombstone.end.clone(); + let seqno = tombstone.seqno; + Self { + tombstone, + height: 1, + left: None, + right: None, + subtree_max_end, + subtree_max_seqno: seqno, + subtree_min_seqno: seqno, + } + } + + fn update_augmentation(&mut self) { + self.subtree_max_end = self.tombstone.end.clone(); + self.subtree_max_seqno = self.tombstone.seqno; + self.subtree_min_seqno = self.tombstone.seqno; + self.height = 1; + + if let Some(ref left) = self.left { + if left.subtree_max_end > self.subtree_max_end { + self.subtree_max_end = left.subtree_max_end.clone(); + } + if left.subtree_max_seqno > self.subtree_max_seqno { + self.subtree_max_seqno = left.subtree_max_seqno; + } + if left.subtree_min_seqno < self.subtree_min_seqno { + self.subtree_min_seqno = left.subtree_min_seqno; + } + self.height = left.height + 1; + } + + if let Some(ref right) = self.right { + if right.subtree_max_end > self.subtree_max_end { + self.subtree_max_end = right.subtree_max_end.clone(); + } + if right.subtree_max_seqno > self.subtree_max_seqno { + self.subtree_max_seqno = right.subtree_max_seqno; + } + if right.subtree_min_seqno < self.subtree_min_seqno { + self.subtree_min_seqno = right.subtree_min_seqno; + } + let rh = right.height + 1; + if rh > self.height { + self.height = rh; + } + } + } + + fn balance_factor(&self) -> i32 { + let lh = self.left.as_ref().map_or(0, |n| n.height); + let rh = self.right.as_ref().map_or(0, |n| n.height); + lh - rh + } +} + +#[expect( + clippy::expect_used, + reason = "rotation invariant: left child must exist" +)] +fn rotate_right(mut node: Box) -> Box { + let mut new_root = node.left.take().expect("rotate_right requires left child"); + node.left = new_root.right.take(); + node.update_augmentation(); + new_root.right = Some(node); + new_root.update_augmentation(); + new_root +} + +#[expect( + clippy::expect_used, + reason = "rotation invariant: right child must exist" +)] +fn rotate_left(mut node: Box) -> Box { + let mut new_root = node.right.take().expect("rotate_left requires right child"); + node.right = new_root.left.take(); + node.update_augmentation(); + new_root.left = Some(node); + new_root.update_augmentation(); + new_root +} + +#[expect( + clippy::expect_used, + reason = "balance factor guarantees child existence" +)] +fn balance(mut node: Box) -> Box { + node.update_augmentation(); + let bf = node.balance_factor(); + + if bf > 1 { + // Left-heavy + if let Some(ref left) = node.left { + if left.balance_factor() < 0 { + // Left-Right case + node.left = Some(rotate_left(node.left.take().expect("just checked"))); + } + } + return rotate_right(node); + } + + if bf < -1 { + // Right-heavy + if let Some(ref right) = node.right { + if right.balance_factor() > 0 { + // Right-Left case + node.right = Some(rotate_right(node.right.take().expect("just checked"))); + } + } + return rotate_left(node); + } + + node +} + +fn insert_node(node: Option>, tombstone: RangeTombstone) -> Box { + let Some(mut node) = node else { + return Box::new(Node::new(tombstone)); + }; + + match tombstone.cmp(&node.tombstone) { + Ordering::Less => { + node.left = Some(insert_node(node.left.take(), tombstone)); + } + Ordering::Greater => { + node.right = Some(insert_node(node.right.take(), tombstone)); + } + Ordering::Equal => { + // Duplicate — replace (shouldn't normally happen) + node.tombstone = tombstone; + node.update_augmentation(); + return node; + } + } + + balance(node) +} + +/// Collects all overlapping tombstones: those where `start <= key < end` +/// and `seqno <= read_seqno`. +fn collect_overlapping( + node: &Option>, + key: &[u8], + read_seqno: SeqNo, + result: &mut Vec, +) { + let Some(n) = node else { return }; + + // Prune: no tombstone in subtree is visible at this read_seqno + if n.subtree_min_seqno > read_seqno { + return; + } + + // Prune: max_end <= key means no interval in this subtree covers key + if n.subtree_max_end.as_ref() <= key { + return; + } + + // Recurse left (may have tombstones with start <= key) + collect_overlapping(&n.left, key, read_seqno, result); + + // Check current node + if n.tombstone.start.as_ref() <= key { + if n.tombstone.contains_key(key) && n.tombstone.visible_at(read_seqno) { + result.push(n.tombstone.clone()); + } + // Recurse right (may also have tombstones with start <= key, up to key) + collect_overlapping(&n.right, key, read_seqno, result); + } + // If start > key, no need to go right (all entries there have start > key too) +} + +/// In-order traversal to produce sorted output. +fn inorder(node: &Option>, result: &mut Vec) { + let Some(n) = node else { return }; + inorder(&n.left, result); + result.push(n.tombstone.clone()); + inorder(&n.right, result); +} + +/// Collects tombstones that fully cover `[min, max]` and are visible at `read_seqno`. +fn collect_covering( + node: &Option>, + min: &[u8], + max: &[u8], + read_seqno: SeqNo, + best: &mut Option, +) { + let Some(n) = node else { return }; + + // Prune: no tombstone visible at this read_seqno + if n.subtree_min_seqno > read_seqno { + return; + } + + // Prune: max_end <= max means no interval in subtree can fully cover [min, max] + // (need end > max, i.e., max_end > max for half-open covering) + if n.subtree_max_end.as_ref() <= max { + return; + } + + // Recurse left + collect_covering(&n.left, min, max, read_seqno, best); + + // Check current node: must have start <= min AND max < end + if n.tombstone.start.as_ref() <= min + && n.tombstone.fully_covers(min, max) + && n.tombstone.visible_at(read_seqno) + { + let dominated = best.as_ref().is_some_and(|b| n.tombstone.seqno <= b.seqno); + if !dominated { + *best = Some(CoveringRt::from(&n.tombstone)); + } + } + + // Only go right if some right-subtree entry might have start <= min + if n.tombstone.start.as_ref() <= min { + collect_covering(&n.right, min, max, read_seqno, best); + } +} + +impl IntervalTree { + /// Creates a new empty interval tree. + #[must_use] + pub fn new() -> Self { + Self { root: None, len: 0 } + } + + /// Inserts a range tombstone into the tree. O(log n). + pub fn insert(&mut self, tombstone: RangeTombstone) { + self.root = Some(insert_node(self.root.take(), tombstone)); + self.len += 1; + } + + /// Returns `true` if the given key at the given seqno is suppressed by + /// any range tombstone visible at `read_seqno`. + /// + /// O(log n + k) where k is the number of overlapping tombstones. + pub fn query_suppression(&self, key: &[u8], key_seqno: SeqNo, read_seqno: SeqNo) -> bool { + let mut result = Vec::new(); + collect_overlapping(&self.root, key, read_seqno, &mut result); + result.iter().any(|rt| rt.seqno > key_seqno) + } + + /// Returns all tombstones overlapping with `key` and visible at `read_seqno`. + /// + /// Used for seek initialization: returns tombstones where `start <= key < end` + /// and `seqno <= read_seqno`. + pub fn overlapping_tombstones(&self, key: &[u8], read_seqno: SeqNo) -> Vec { + let mut result = Vec::new(); + collect_overlapping(&self.root, key, read_seqno, &mut result); + result + } + + /// Returns the highest-seqno visible tombstone that fully covers `[min, max]`, + /// or `None` if no such tombstone exists. + /// + /// Used for table-skip decisions. + pub fn query_covering_rt_for_range( + &self, + min: &[u8], + max: &[u8], + read_seqno: SeqNo, + ) -> Option { + let mut best = None; + collect_covering(&self.root, min, max, read_seqno, &mut best); + best + } + + /// Returns all tombstones in sorted order (by `RangeTombstone::Ord`). + /// + /// Used for flush. + pub fn iter_sorted(&self) -> Vec { + let mut result = Vec::with_capacity(self.len); + inorder(&self.root, &mut result); + result + } + + /// Returns the number of tombstones in the tree. + #[must_use] + pub fn len(&self) -> usize { + self.len + } + + /// Returns `true` if the tree is empty. + #[must_use] + pub fn is_empty(&self) -> bool { + self.len == 0 + } +} + +impl Default for IntervalTree { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +#[expect(clippy::unwrap_used, clippy::indexing_slicing)] +mod tests { + use super::*; + + fn rt(start: &[u8], end: &[u8], seqno: SeqNo) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + #[test] + fn empty_tree_no_suppression() { + let tree = IntervalTree::new(); + assert!(!tree.query_suppression(b"key", 5, 100)); + } + + #[test] + fn single_tombstone_suppresses() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(tree.query_suppression(b"c", 5, 100)); + } + + #[test] + fn single_tombstone_no_suppress_newer_kv() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(!tree.query_suppression(b"c", 15, 100)); + } + + #[test] + fn single_tombstone_exclusive_end() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(!tree.query_suppression(b"y", 5, 100)); + } + + #[test] + fn single_tombstone_before_start() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(!tree.query_suppression(b"a", 5, 100)); + } + + #[test] + fn tombstone_not_visible() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"b", b"y", 10)); + assert!(!tree.query_suppression(b"c", 5, 9)); // read_seqno < tombstone_seqno + } + + #[test] + fn multiple_tombstones() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"a", b"f", 10)); + tree.insert(rt(b"d", b"m", 20)); + tree.insert(rt(b"p", b"z", 5)); + + // "e" covered by both [a,f)@10 and [d,m)@20 + assert!(tree.query_suppression(b"e", 15, 100)); // 15 < 20 + assert!(tree.query_suppression(b"e", 5, 100)); // 5 < 20 + assert!(!tree.query_suppression(b"e", 25, 100)); // 25 > 20 + + // "q" covered by [p,z)@5 + assert!(tree.query_suppression(b"q", 3, 100)); + assert!(!tree.query_suppression(b"q", 10, 100)); + } + + #[test] + fn overlapping_tombstones_query() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"a", b"f", 10)); + tree.insert(rt(b"d", b"m", 20)); + tree.insert(rt(b"p", b"z", 5)); + + let overlaps = tree.overlapping_tombstones(b"e", 100); + assert_eq!(overlaps.len(), 2); + } + + #[test] + fn overlapping_tombstones_none() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"d", b"f", 10)); + let overlaps = tree.overlapping_tombstones(b"a", 100); + assert!(overlaps.is_empty()); + } + + #[test] + fn covering_rt_found() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"a", b"z", 50)); + tree.insert(rt(b"c", b"g", 10)); + + let crt = tree.query_covering_rt_for_range(b"b", b"y", 100); + assert!(crt.is_some()); + let crt = crt.unwrap(); + assert_eq!(crt.seqno, 50); + } + + #[test] + fn covering_rt_not_found_partial() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"c", b"g", 10)); + + // [b, y] is not fully covered by [c, g) + let crt = tree.query_covering_rt_for_range(b"b", b"y", 100); + assert!(crt.is_none()); + } + + #[test] + fn covering_rt_highest_seqno() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"a", b"z", 50)); + tree.insert(rt(b"a", b"z", 100)); + + let crt = tree.query_covering_rt_for_range(b"b", b"y", 200); + assert!(crt.is_some()); + assert_eq!(crt.unwrap().seqno, 100); + } + + #[test] + fn iter_sorted_empty() { + let tree = IntervalTree::new(); + assert!(tree.iter_sorted().is_empty()); + } + + #[test] + fn iter_sorted_multiple() { + let mut tree = IntervalTree::new(); + tree.insert(rt(b"d", b"f", 10)); + tree.insert(rt(b"a", b"c", 20)); + tree.insert(rt(b"m", b"z", 5)); + + let sorted = tree.iter_sorted(); + assert_eq!(sorted.len(), 3); + // Should be sorted by RangeTombstone::Ord (start asc, seqno desc, end asc) + assert_eq!(sorted[0].start.as_ref(), b"a"); + assert_eq!(sorted[1].start.as_ref(), b"d"); + assert_eq!(sorted[2].start.as_ref(), b"m"); + } + + #[test] + fn avl_balance_maintained() { + let mut tree = IntervalTree::new(); + // Insert in sorted order — should trigger rotations + for i in 0u8..20 { + let s = vec![i]; + let e = vec![i + 1]; + tree.insert(rt(&s, &e, u64::from(i))); + } + assert_eq!(tree.len(), 20); + // If AVL is working, height should be bounded ~log2(20) ≈ 5 + if let Some(ref root) = tree.root { + assert!(root.height <= 6, "AVL height too large: {}", root.height); + } + } + + #[test] + fn seqno_pruning() { + let mut tree = IntervalTree::new(); + // Insert tombstones with high seqno only + tree.insert(rt(b"a", b"z", 100)); + tree.insert(rt(b"b", b"y", 200)); + + // Query with read_seqno < all tombstone seqnos — should find nothing + assert!(!tree.query_suppression(b"c", 5, 50)); + let overlaps = tree.overlapping_tombstones(b"c", 50); + assert!(overlaps.is_empty()); + } + + #[test] + fn max_end_pruning() { + let mut tree = IntervalTree::new(); + // Tombstones with limited end + tree.insert(rt(b"a", b"c", 10)); + tree.insert(rt(b"b", b"d", 10)); + + // Key past all ends — should find nothing + assert!(!tree.query_suppression(b"e", 5, 100)); + } +} diff --git a/src/memtable/mod.rs b/src/memtable/mod.rs index fff36fbdc..4f6f53188 100644 --- a/src/memtable/mod.rs +++ b/src/memtable/mod.rs @@ -2,14 +2,21 @@ // This source code is licensed under both the Apache 2.0 and MIT License // (found in the LICENSE-* files in the repository) +pub mod interval_tree; + use crate::key::InternalKey; +use crate::range_tombstone::{CoveringRt, RangeTombstone}; use crate::{ value::{InternalValue, SeqNo, UserValue}, - ValueType, + UserKey, ValueType, }; use crossbeam_skiplist::SkipMap; +use interval_tree::IntervalTree; +use std::cmp::Reverse; +use std::collections::BTreeMap; use std::ops::RangeBounds; use std::sync::atomic::{AtomicBool, AtomicU64}; +use std::sync::RwLock; pub use crate::tree::inner::MemtableId; @@ -35,6 +42,24 @@ pub struct Memtable { pub(crate) highest_seqno: AtomicU64, pub(crate) requested_rotation: AtomicBool, + + /// Fast-path flag: set to `true` once any range tombstone is inserted. + /// Checked before acquiring the interval tree RwLock. + pub(crate) has_range_tombstones: AtomicBool, + + /// Range tombstones indexed by start for efficient point queries + /// and overlap collection (used for forward scans and seek init). + #[doc(hidden)] + pub range_tombstones: RwLock, + + /// Range tombstones indexed by `(Reverse(end), Reverse(seqno))` for + /// reverse iteration. Yields tombstones in end-desc order. + #[expect( + clippy::type_complexity, + reason = "compound key is inherent to the dual-index design" + )] + pub(crate) tombstones_by_end: + RwLock, Reverse), RangeTombstone>>, } impl Memtable { @@ -64,6 +89,9 @@ impl Memtable { approximate_size: AtomicU64::default(), highest_seqno: AtomicU64::default(), requested_rotation: AtomicBool::default(), + has_range_tombstones: AtomicBool::new(false), + range_tombstones: RwLock::new(IntervalTree::new()), + tombstones_by_end: RwLock::new(BTreeMap::new()), } } @@ -168,7 +196,7 @@ impl Memtable { /// Returns the highest sequence number in the memtable. pub fn get_highest_seqno(&self) -> Option { - if self.is_empty() { + if self.is_empty() && self.range_tombstone_count() == 0 { None } else { Some( @@ -177,6 +205,118 @@ impl Memtable { ) } } + + /// Inserts a range tombstone into the memtable. + /// + /// Updates both the interval tree (for point queries / seek init) and the + /// end-desc index (for reverse iteration). + #[doc(hidden)] + pub fn insert_range_tombstone(&self, rt: RangeTombstone) { + let size_contribution = (rt.start.len() + rt.end.len() + 8 + 64) as u64; + + self.approximate_size + .fetch_add(size_contribution, std::sync::atomic::Ordering::AcqRel); + + self.highest_seqno + .fetch_max(rt.seqno, std::sync::atomic::Ordering::AcqRel); + + self.has_range_tombstones + .store(true, std::sync::atomic::Ordering::Release); + + { + #[expect(clippy::expect_used, reason = "lock poisoning is unrecoverable")] + let mut tree = self.range_tombstones.write().expect("lock poisoned"); + tree.insert(rt.clone()); + } + + { + #[expect(clippy::expect_used, reason = "lock poisoning is unrecoverable")] + let mut by_end = self.tombstones_by_end.write().expect("lock poisoned"); + by_end.insert((Reverse(rt.end.clone()), Reverse(rt.seqno)), rt); + } + } + + /// Returns `true` if the given key at the given seqno is suppressed + /// by a range tombstone visible at `read_seqno`. + pub fn is_suppressed_by_range_tombstone( + &self, + key: &[u8], + key_seqno: SeqNo, + read_seqno: SeqNo, + ) -> bool { + if !self + .has_range_tombstones + .load(std::sync::atomic::Ordering::Acquire) + { + return false; + } + + #[expect(clippy::expect_used, reason = "lock poisoning is unrecoverable")] + let tree = self.range_tombstones.read().expect("lock poisoned"); + tree.query_suppression(key, key_seqno, read_seqno) + } + + /// Returns all range tombstones overlapping with `key` and visible at `read_seqno`. + /// + /// Used for seek initialization. + pub fn overlapping_tombstones(&self, key: &[u8], read_seqno: SeqNo) -> Vec { + #[expect(clippy::expect_used, reason = "lock poisoning is unrecoverable")] + let tree = self.range_tombstones.read().expect("lock poisoned"); + tree.overlapping_tombstones(key, read_seqno) + } + + /// Returns the highest-seqno covering tombstone for `[min, max]`, if any. + /// + /// Used for table-skip decisions. + pub fn query_covering_rt_for_range( + &self, + min: &[u8], + max: &[u8], + read_seqno: SeqNo, + ) -> Option { + #[expect(clippy::expect_used, reason = "lock poisoning is unrecoverable")] + let tree = self.range_tombstones.read().expect("lock poisoned"); + tree.query_covering_rt_for_range(min, max, read_seqno) + } + + /// Returns all range tombstones sorted by `(start asc, seqno desc, end asc)`. + /// + /// Used for flush / encoding the ByStart block. + pub fn range_tombstones_by_start(&self) -> Vec { + if !self + .has_range_tombstones + .load(std::sync::atomic::Ordering::Acquire) + { + return Vec::new(); + } + + #[expect(clippy::expect_used, reason = "lock poisoning is unrecoverable")] + let tree = self.range_tombstones.read().expect("lock poisoned"); + tree.iter_sorted() + } + + /// Returns all range tombstones sorted by `(end desc, seqno desc)`. + /// + /// Used for flush / encoding the ByEndDesc block. + pub fn range_tombstones_by_end_desc(&self) -> Vec { + if !self + .has_range_tombstones + .load(std::sync::atomic::Ordering::Acquire) + { + return Vec::new(); + } + + #[expect(clippy::expect_used, reason = "lock poisoning is unrecoverable")] + let by_end = self.tombstones_by_end.read().expect("lock poisoned"); + by_end.values().cloned().collect() + } + + /// Returns the number of range tombstones in the memtable. + pub fn range_tombstone_count(&self) -> usize { + #[expect(clippy::expect_used, reason = "lock poisoning is unrecoverable")] + let tree = self.range_tombstones.read().expect("lock poisoned"); + tree.len() + } } #[cfg(test)] diff --git a/src/range.rs b/src/range.rs index c0cd5df94..1278749e3 100644 --- a/src/range.rs +++ b/src/range.rs @@ -7,7 +7,10 @@ use crate::{ memtable::Memtable, merge::Merger, mvcc_stream::MvccStream, + range_tombstone::RangeTombstone, + range_tombstone_filter::RangeTombstoneFilter, run_reader::RunReader, + table::Table, value::{SeqNo, UserKey}, version::SuperVersion, BoxedIterator, InternalValue, @@ -23,6 +26,25 @@ pub fn seqno_filter(item_seqno: SeqNo, seqno: SeqNo) -> bool { item_seqno < seqno } +/// Returns `true` if a table is fully covered by a range tombstone visible at `read_seqno`. +/// +/// A table can be skipped when a tombstone fully covers its key range `[min, max]` +/// and has a seqno greater than the table's highest seqno, meaning every entry +/// in the table is suppressed. +#[must_use] +fn is_table_fully_covered(table: &Table, tombstones: &[RangeTombstone], read_seqno: SeqNo) -> bool { + let key_range = &table.metadata.key_range; + let table_min = key_range.min().as_ref(); + let table_max = key_range.max().as_ref(); + let table_max_seqno = table.get_highest_seqno(); + + tombstones.iter().any(|rt| { + rt.visible_at(read_seqno) + && rt.fully_covers(table_min, table_max) + && rt.seqno > table_max_seqno + }) +} + /// Calculates the prefix's upper range. /// /// # Panics @@ -145,6 +167,36 @@ impl TreeIter { let range = (lo, hi); + // Collect range tombstones from all sources for forward suppression + let mut all_tombstones: Vec = Vec::new(); + + // From active memtable + all_tombstones.extend(lock.version.active_memtable.range_tombstones_by_start()); + + // From sealed memtables + for memtable in lock.version.sealed_memtables.iter() { + all_tombstones.extend(memtable.range_tombstones_by_start()); + } + + // From SST tables (skip if no SST has range tombstones) + if lock.version.has_sst_range_tombstones { + for run in lock + .version + .version + .iter_levels() + .flat_map(|lvl| lvl.iter()) + { + for table in run.iter() { + if let Ok(tombstones) = table.range_tombstones_by_start_iter() { + all_tombstones.extend(tombstones); + } + } + } + } + + // Sort by (start asc, seqno desc, end asc) — the Ord impl on RangeTombstone + all_tombstones.sort(); + let mut iters: Vec> = Vec::with_capacity(5); for run in lock @@ -165,6 +217,11 @@ impl TreeIter { range.start_bound().map(|x| &*x.user_key), range.end_bound().map(|x| &*x.user_key), )) { + // Skip table if fully covered by a range tombstone + if is_table_fully_covered(table, &all_tombstones, seqno) { + continue; + } + let reader = table .range(( range.start_bound().map(|x| &x.user_key).cloned(), @@ -227,10 +284,19 @@ impl TreeIter { let merged = Merger::new(iters); let iter = MvccStream::new(merged); - Box::new(iter.filter(|x| match x { - Ok(value) => !value.key.is_tombstone(), - Err(_) => true, - })) + if all_tombstones.is_empty() { + Box::new(iter.filter(|x| match x { + Ok(value) => !value.key.is_tombstone(), + Err(_) => true, + })) + } else { + let iter = RangeTombstoneFilter::new(iter, all_tombstones, seqno); + + Box::new(iter.filter(|x| match x { + Ok(value) => !value.key.is_tombstone(), + Err(_) => true, + })) + } }) } } diff --git a/src/range_tombstone.rs b/src/range_tombstone.rs new file mode 100644 index 000000000..4b4b387ff --- /dev/null +++ b/src/range_tombstone.rs @@ -0,0 +1,343 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +use crate::{SeqNo, UserKey}; +use std::cmp::Reverse; + +/// A range tombstone that deletes all keys in `[start, end)` at a given sequence number. +/// +/// Half-open interval: `start` is inclusive, `end` is exclusive. +/// A key `k` is covered iff `start <= k < end`. +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct RangeTombstone { + /// Inclusive start bound + pub start: UserKey, + /// Exclusive end bound + pub end: UserKey, + /// Sequence number at which this tombstone was written + pub seqno: SeqNo, +} + +impl RangeTombstone { + /// Creates a new range tombstone for `[start, end)` at the given seqno. + /// + /// # Panics (debug only) + /// + /// Debug-asserts that `start < end`. Callers must validate untrusted input + /// before constructing a `RangeTombstone`. + #[must_use] + pub fn new(start: UserKey, end: UserKey, seqno: SeqNo) -> Self { + debug_assert!(start < end, "range tombstone start must be < end"); + Self { start, end, seqno } + } + + /// Returns `true` if `key` is within `[start, end)`. + #[must_use] + pub fn contains_key(&self, key: &[u8]) -> bool { + self.start.as_ref() <= key && key < self.end.as_ref() + } + + /// Returns `true` if this tombstone is visible at the given read seqno. + /// + /// A tombstone is visible when `self.seqno <= read_seqno`. + #[must_use] + pub fn visible_at(&self, read_seqno: SeqNo) -> bool { + self.seqno <= read_seqno + } + + /// Returns `true` if this tombstone should suppress a KV with the given seqno + /// at the given read snapshot. + /// + /// Suppress iff: `kv_seqno < self.seqno AND self.contains_key(key) AND self.visible_at(read_seqno)` + #[must_use] + pub fn should_suppress(&self, key: &[u8], kv_seqno: SeqNo, read_seqno: SeqNo) -> bool { + self.visible_at(read_seqno) && self.contains_key(key) && kv_seqno < self.seqno + } + + /// Returns the intersection of this tombstone with `[min, max)`, or `None` + /// if the ranges do not overlap. + /// + /// The resulting tombstone has the same seqno as `self`. + #[must_use] + pub fn intersect_opt(&self, min: &[u8], max: &[u8]) -> Option { + let new_start_ref = if self.start.as_ref() > min { + self.start.as_ref() + } else { + min + }; + let new_end_ref = if self.end.as_ref() < max { + self.end.as_ref() + } else { + max + }; + + if new_start_ref < new_end_ref { + Some(Self { + start: UserKey::from(new_start_ref), + end: UserKey::from(new_end_ref), + seqno: self.seqno, + }) + } else { + None + } + } + + /// Returns `true` if this tombstone fully covers the key range `[min, max]`. + /// + /// "Fully covers" means `self.start <= min` AND `max < self.end`. + /// This uses the half-open convention: the inclusive `max` must be + /// strictly less than the exclusive `end`. + #[must_use] + pub fn fully_covers(&self, min: &[u8], max: &[u8]) -> bool { + self.start.as_ref() <= min && max < self.end.as_ref() + } +} + +/// Ordered by `(start asc, seqno desc, end asc)`. +/// +/// The `end` tiebreaker ensures deterministic ordering for debug output +/// and property tests. +impl Ord for RangeTombstone { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + (&self.start, Reverse(self.seqno), &self.end).cmp(&( + &other.start, + Reverse(other.seqno), + &other.end, + )) + } +} + +impl PartialOrd for RangeTombstone { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +/// Information about a covering range tombstone, used for table-skip decisions. +/// +/// A covering tombstone fully covers a table's key range and has a seqno +/// greater than the table's max seqno, meaning the entire table can be skipped. +#[derive(Clone, Debug)] +pub struct CoveringRt { + /// The start key of the covering tombstone (inclusive) + pub start: UserKey, + /// The end key of the covering tombstone (exclusive) + pub end: UserKey, + /// The seqno of the covering tombstone + pub seqno: SeqNo, +} + +impl CoveringRt { + /// Returns `true` if this covering tombstone fully covers the given + /// key range `[min, max]` and has a higher seqno than the table's max. + #[must_use] + pub fn covers_table(&self, table_min: &[u8], table_max: &[u8], table_max_seqno: SeqNo) -> bool { + self.start.as_ref() <= table_min + && table_max < self.end.as_ref() + && self.seqno > table_max_seqno + } +} + +impl From<&RangeTombstone> for CoveringRt { + fn from(rt: &RangeTombstone) -> Self { + Self { + start: rt.start.clone(), + end: rt.end.clone(), + seqno: rt.seqno, + } + } +} + +/// Computes the upper bound exclusive key for use in range queries. +/// +/// Given a key, returns the next key in lexicographic order by appending `0x00`. +/// This is useful for converting inclusive upper bounds to exclusive ones +/// in range-cover queries. +#[must_use] +pub fn upper_bound_exclusive(key: &[u8]) -> UserKey { + let mut result = Vec::with_capacity(key.len() + 1); + result.extend_from_slice(key); + result.push(0x00); + UserKey::from(result) +} + +#[cfg(test)] +#[expect(clippy::unwrap_used)] +mod tests { + use super::*; + + fn rt(start: &[u8], end: &[u8], seqno: SeqNo) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + #[test] + fn contains_key_inclusive_start() { + let t = rt(b"b", b"d", 10); + assert!(t.contains_key(b"b")); + } + + #[test] + fn contains_key_exclusive_end() { + let t = rt(b"b", b"d", 10); + assert!(!t.contains_key(b"d")); + } + + #[test] + fn contains_key_middle() { + let t = rt(b"b", b"d", 10); + assert!(t.contains_key(b"c")); + } + + #[test] + fn contains_key_before_start() { + let t = rt(b"b", b"d", 10); + assert!(!t.contains_key(b"a")); + } + + #[test] + fn visible_at_equal() { + let t = rt(b"a", b"z", 10); + assert!(t.visible_at(10)); + } + + #[test] + fn visible_at_higher() { + let t = rt(b"a", b"z", 10); + assert!(t.visible_at(20)); + } + + #[test] + fn not_visible_at_lower() { + let t = rt(b"a", b"z", 10); + assert!(!t.visible_at(9)); + } + + #[test] + fn should_suppress_yes() { + let t = rt(b"b", b"d", 10); + assert!(t.should_suppress(b"c", 5, 10)); + } + + #[test] + fn should_suppress_no_newer_kv() { + let t = rt(b"b", b"d", 10); + assert!(!t.should_suppress(b"c", 15, 20)); + } + + #[test] + fn should_suppress_no_not_visible() { + let t = rt(b"b", b"d", 10); + assert!(!t.should_suppress(b"c", 5, 9)); + } + + #[test] + fn should_suppress_no_outside_range() { + let t = rt(b"b", b"d", 10); + assert!(!t.should_suppress(b"e", 5, 10)); + } + + #[test] + fn ordering_by_start_asc() { + let a = rt(b"a", b"z", 10); + let b = rt(b"b", b"z", 10); + assert!(a < b); + } + + #[test] + fn ordering_by_seqno_desc() { + let a = rt(b"a", b"z", 20); + let b = rt(b"a", b"z", 10); + assert!(a < b); // higher seqno comes first + } + + #[test] + fn ordering_by_end_asc_tiebreaker() { + let a = rt(b"a", b"m", 10); + let b = rt(b"a", b"z", 10); + assert!(a < b); + } + + #[test] + fn intersect_overlap() { + let t = rt(b"b", b"y", 10); + let clipped = t.intersect_opt(b"d", b"g").unwrap(); + assert_eq!(clipped.start.as_ref(), b"d"); + assert_eq!(clipped.end.as_ref(), b"g"); + assert_eq!(clipped.seqno, 10); + } + + #[test] + fn intersect_no_overlap() { + let t = rt(b"b", b"d", 10); + assert!(t.intersect_opt(b"e", b"g").is_none()); + } + + #[test] + fn intersect_partial_left() { + let t = rt(b"b", b"f", 10); + let clipped = t.intersect_opt(b"a", b"d").unwrap(); + assert_eq!(clipped.start.as_ref(), b"b"); + assert_eq!(clipped.end.as_ref(), b"d"); + } + + #[test] + fn intersect_partial_right() { + let t = rt(b"b", b"f", 10); + let clipped = t.intersect_opt(b"d", b"z").unwrap(); + assert_eq!(clipped.start.as_ref(), b"d"); + assert_eq!(clipped.end.as_ref(), b"f"); + } + + #[test] + fn fully_covers_yes() { + let t = rt(b"a", b"z", 10); + assert!(t.fully_covers(b"b", b"y")); + } + + #[test] + fn fully_covers_exact_start() { + let t = rt(b"a", b"z", 10); + assert!(t.fully_covers(b"a", b"y")); + } + + #[test] + fn fully_covers_no_end_equal() { + let t = rt(b"a", b"z", 10); + // max == end is not covered (half-open) + assert!(!t.fully_covers(b"a", b"z")); + } + + #[test] + fn fully_covers_no_start_before() { + let t = rt(b"b", b"z", 10); + assert!(!t.fully_covers(b"a", b"y")); + } + + #[test] + fn covering_rt_covers_table() { + let crt = CoveringRt { + start: UserKey::from(b"a" as &[u8]), + end: UserKey::from(b"z" as &[u8]), + seqno: 100, + }; + assert!(crt.covers_table(b"b", b"y", 50)); + } + + #[test] + fn covering_rt_no_cover_seqno_too_low() { + let crt = CoveringRt { + start: UserKey::from(b"a" as &[u8]), + end: UserKey::from(b"z" as &[u8]), + seqno: 50, + }; + assert!(!crt.covers_table(b"b", b"y", 100)); + } + + #[test] + fn upper_bound_exclusive_appends_zero() { + let key = b"hello"; + let result = upper_bound_exclusive(key); + assert_eq!(result.as_ref(), b"hello\x00"); + } +} diff --git a/src/range_tombstone_filter.rs b/src/range_tombstone_filter.rs new file mode 100644 index 000000000..cff9a0f08 --- /dev/null +++ b/src/range_tombstone_filter.rs @@ -0,0 +1,281 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! Bidirectional range tombstone filter for iteration. +//! +//! Wraps a sorted KV stream and suppresses entries covered by range tombstones. +//! Forward: tombstones sorted by `(start asc, seqno desc)`, activated when +//! `start <= key`, expired when `end <= key`. +//! Reverse: tombstones sorted by `(end desc, seqno desc)`, activated when +//! `end > key`, expired when `key < start`. + +use crate::active_tombstone_set::{ActiveTombstoneSet, ActiveTombstoneSetReverse}; +use crate::range_tombstone::RangeTombstone; +use crate::{InternalValue, SeqNo}; +use std::cmp::Reverse; + +/// Wraps a bidirectional KV stream and suppresses entries covered by range tombstones. +pub struct RangeTombstoneFilter { + inner: I, + + // Forward state + fwd_tombstones: Vec, + fwd_idx: usize, + fwd_active: ActiveTombstoneSet, + + // Reverse state + rev_tombstones: Vec, + rev_idx: usize, + rev_active: ActiveTombstoneSetReverse, +} + +impl RangeTombstoneFilter { + /// Creates a new bidirectional filter. + /// + /// `tombstones` is sorted by `(start asc, seqno desc, end asc)` (the natural Ord). + /// Internally, a second copy sorted by `(end desc, seqno desc)` is created for reverse. + #[must_use] + pub fn new(inner: I, fwd_tombstones: Vec, read_seqno: SeqNo) -> Self { + // Build reverse-sorted copy: (end desc, seqno desc) + let mut rev_tombstones = fwd_tombstones.clone(); + rev_tombstones.sort_by(|a, b| (&b.end, Reverse(b.seqno)).cmp(&(&a.end, Reverse(a.seqno)))); + + Self { + inner, + fwd_tombstones, + fwd_idx: 0, + fwd_active: ActiveTombstoneSet::new(read_seqno), + rev_tombstones, + rev_idx: 0, + rev_active: ActiveTombstoneSetReverse::new(read_seqno), + } + } + + /// Activates forward tombstones whose start <= current_key. + fn fwd_activate_up_to(&mut self, key: &[u8]) { + while let Some(rt) = self.fwd_tombstones.get(self.fwd_idx) { + if rt.start.as_ref() <= key { + self.fwd_active.activate(rt); + self.fwd_idx += 1; + } else { + break; + } + } + } + + /// Activates reverse tombstones whose end > current_key. + fn rev_activate_up_to(&mut self, key: &[u8]) { + while let Some(rt) = self.rev_tombstones.get(self.rev_idx) { + if rt.end.as_ref() > key { + self.rev_active.activate(rt); + self.rev_idx += 1; + } else { + break; + } + } + } +} + +impl>> Iterator for RangeTombstoneFilter { + type Item = crate::Result; + + fn next(&mut self) -> Option { + loop { + let item = self.inner.next()?; + + let kv = match &item { + Ok(kv) => kv, + Err(_) => return Some(item), + }; + + let key = kv.key.user_key.as_ref(); + let kv_seqno = kv.key.seqno; + + // Activate tombstones whose start <= this key + self.fwd_activate_up_to(key); + + // Expire tombstones whose end <= this key + self.fwd_active.expire_until(key); + + // Check suppression + if self.fwd_active.is_suppressed(kv_seqno) { + continue; + } + + return Some(item); + } + } +} + +impl>> DoubleEndedIterator + for RangeTombstoneFilter +{ + fn next_back(&mut self) -> Option { + loop { + let item = self.inner.next_back()?; + + let kv = match &item { + Ok(kv) => kv, + Err(_) => return Some(item), + }; + + let key = kv.key.user_key.as_ref(); + let kv_seqno = kv.key.seqno; + + // Activate tombstones whose end > this key (strict >) + self.rev_activate_up_to(key); + + // Expire tombstones whose start > this key (key < start) + self.rev_active.expire_until(key); + + // Check suppression + if self.rev_active.is_suppressed(kv_seqno) { + continue; + } + + return Some(item); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{UserKey, ValueType}; + + fn kv(key: &[u8], seqno: SeqNo) -> InternalValue { + InternalValue::from_components(key, b"v", seqno, ValueType::Value) + } + + fn rt(start: &[u8], end: &[u8], seqno: SeqNo) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + #[test] + fn no_tombstones() { + let items: Vec> = + vec![Ok(kv(b"a", 1)), Ok(kv(b"b", 2)), Ok(kv(b"c", 3))]; + + let filter = RangeTombstoneFilter::new(items.into_iter(), vec![], SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + assert_eq!(results.len(), 3); + } + + #[test] + fn basic_suppression() { + // Tombstone [b, d) at seqno 10 suppresses KVs at b and c (seqno < 10) + let items: Vec> = vec![ + Ok(kv(b"a", 5)), + Ok(kv(b"b", 5)), + Ok(kv(b"c", 5)), + Ok(kv(b"d", 5)), + Ok(kv(b"e", 5)), + ]; + + let tombstones = vec![rt(b"b", b"d", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + assert_eq!(keys, vec![b"a".as_ref(), b"d", b"e"]); + } + + #[test] + fn tombstone_does_not_suppress_newer_kv() { + // Tombstone [a, z) at seqno 5 does NOT suppress KV at seqno 10 + let items: Vec> = vec![Ok(kv(b"b", 10)), Ok(kv(b"c", 3))]; + + let tombstones = vec![rt(b"a", b"z", 5)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + // b@10 survives (newer than tombstone), c@3 suppressed (older) + assert_eq!(keys, vec![b"b".as_ref()]); + } + + #[test] + fn half_open_end_exclusive() { + // Tombstone [b, d) at seqno 10. Key "d" is NOT covered. + let items: Vec> = + vec![Ok(kv(b"b", 5)), Ok(kv(b"c", 5)), Ok(kv(b"d", 5))]; + + let tombstones = vec![rt(b"b", b"d", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + assert_eq!(keys, vec![b"d".as_ref()]); // only d survives + } + + #[test] + fn multiple_overlapping_tombstones() { + let items: Vec> = vec![ + Ok(kv(b"a", 1)), + Ok(kv(b"b", 3)), + Ok(kv(b"c", 6)), + Ok(kv(b"d", 1)), + ]; + + // Two tombstones: [a,c)@5 and [b,e)@4 + let tombstones = vec![rt(b"a", b"c", 5), rt(b"b", b"e", 4)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + // a@1 suppressed by [a,c)@5 + // b@3 suppressed by [a,c)@5 (max active seqno = 5) + // c@6 NOT suppressed (seqno 6 > max active 4, since [a,c) expired at c) + // d@1 suppressed by [b,e)@4 + assert_eq!(keys, vec![b"c".as_ref()]); + } + + #[test] + fn tombstone_not_visible_at_read_seqno() { + // Tombstone at seqno 10, but read_seqno is 5, so tombstone not visible + let items: Vec> = vec![Ok(kv(b"b", 3))]; + + let tombstones = vec![rt(b"a", b"z", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, 5); + let results: Vec<_> = filter.flatten().collect(); + + // b@3 survives because tombstone@10 is not visible at read_seqno=5 + assert_eq!(results.len(), 1); + } + + #[test] + fn reverse_basic_suppression() { + // Tombstone [b, d) at seqno 10 + let items: Vec> = vec![ + Ok(kv(b"a", 5)), + Ok(kv(b"b", 5)), + Ok(kv(b"c", 5)), + Ok(kv(b"d", 5)), + Ok(kv(b"e", 5)), + ]; + + let tombstones = vec![rt(b"b", b"d", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.rev().flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + // Reverse order: e, d, a (b and c suppressed) + assert_eq!(keys, vec![b"e".as_ref(), b"d", b"a"]); + } + + #[test] + fn reverse_half_open() { + // Tombstone [a, m) at seqno 10. m is NOT covered. + let items: Vec> = + vec![Ok(kv(b"a", 5)), Ok(kv(b"l", 5)), Ok(kv(b"m", 5))]; + + let tombstones = vec![rt(b"a", b"m", 10)]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let results: Vec<_> = filter.rev().flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + // m survives (end exclusive), a and l are suppressed + assert_eq!(keys, vec![b"m".as_ref()]); + } +} diff --git a/src/table/block/type.rs b/src/table/block/type.rs index 82eadf11b..fe3fbf035 100644 --- a/src/table/block/type.rs +++ b/src/table/block/type.rs @@ -8,6 +8,8 @@ pub enum BlockType { Index, Filter, Meta, + RangeTombstoneStart, + RangeTombstoneEnd, } impl From for u8 { @@ -17,6 +19,8 @@ impl From for u8 { BlockType::Index => 1, BlockType::Filter => 2, BlockType::Meta => 3, + BlockType::RangeTombstoneStart => 4, + BlockType::RangeTombstoneEnd => 5, } } } @@ -30,6 +34,8 @@ impl TryFrom for BlockType { 1 => Ok(Self::Index), 2 => Ok(Self::Filter), 3 => Ok(Self::Meta), + 4 => Ok(Self::RangeTombstoneStart), + 5 => Ok(Self::RangeTombstoneEnd), _ => Err(crate::Error::InvalidTag(("BlockType", value))), } } diff --git a/src/table/inner.rs b/src/table/inner.rs index 7e40b7943..8f6de7cbd 100644 --- a/src/table/inner.rs +++ b/src/table/inner.rs @@ -5,7 +5,11 @@ #[cfg(feature = "metrics")] use crate::metrics::Metrics; -use super::{block_index::BlockIndexImpl, meta::ParsedMeta, regions::ParsedRegions}; +use super::{ + block_index::BlockIndexImpl, meta::ParsedMeta, + range_tombstone_block_by_end::RangeTombstoneBlockByEndDesc, + range_tombstone_block_by_start::RangeTombstoneBlockByStart, regions::ParsedRegions, +}; use crate::{ cache::Cache, descriptor_table::DescriptorTable, @@ -50,6 +54,12 @@ pub struct Inner { /// Pinned AMQ filter pub pinned_filter_block: Option, + /// Range tombstone block sorted by (start asc, seqno desc, end asc) + pub(crate) range_tombstone_by_start: Option, + + /// Range tombstone block sorted by (end desc, seqno desc) + pub(crate) range_tombstone_by_end: Option, + /// True when the table was compacted away or dropped /// /// May be kept alive until all Arcs to the table have been dropped (to facilitate snapshots) diff --git a/src/table/meta.rs b/src/table/meta.rs index ff1810f10..b28ed8653 100644 --- a/src/table/meta.rs +++ b/src/table/meta.rs @@ -48,6 +48,8 @@ pub struct ParsedMeta { pub weak_tombstone_count: u64, pub weak_tombstone_reclaimable: u64, + pub range_tombstone_count: u64, + pub data_block_compression: CompressionType, pub index_block_compression: CompressionType, } @@ -147,6 +149,16 @@ impl ParsedMeta { let weak_tombstone_count = read_u64!(block, b"weak_tombstone_count"); let weak_tombstone_reclaimable = read_u64!(block, b"weak_tombstone_reclaimable"); + // Read optionally for backward compatibility with old v3 tables + // that don't have this key yet. + let range_tombstone_count = block + .point_read(b"range_tombstone_count", SeqNo::MAX) + .map(|entry| { + let mut bytes = &entry.value[..]; + bytes.read_u64::().unwrap_or(0) + }) + .unwrap_or(0); + let created_at = { let bytes = block .point_read(b"created_at", SeqNo::MAX) @@ -219,6 +231,7 @@ impl ParsedMeta { tombstone_count, weak_tombstone_count, weak_tombstone_reclaimable, + range_tombstone_count, data_block_compression, index_block_compression, }) diff --git a/src/table/mod.rs b/src/table/mod.rs index 431ecb642..4066f7a20 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -12,6 +12,12 @@ mod inner; mod iter; mod meta; pub(crate) mod multi_writer; +/// Range tombstone block sorted by end key (descending) +pub mod range_tombstone_block_by_end; +/// Range tombstone block sorted by start key (ascending) +pub mod range_tombstone_block_by_start; +/// Encoder for range tombstone blocks +pub mod range_tombstone_encoder; mod regions; mod scanner; pub mod util; @@ -30,10 +36,13 @@ pub use writer::Writer; use crate::{ cache::Cache, descriptor_table::DescriptorTable, + range_tombstone::{CoveringRt, RangeTombstone}, table::{ block::{BlockType, ParsedItem}, block_index::{BlockIndex, FullBlockIndex, TwoLevelBlockIndex, VolatileBlockIndex}, filter::block::FilterBlock, + range_tombstone_block_by_end::RangeTombstoneBlockByEndDesc, + range_tombstone_block_by_start::RangeTombstoneBlockByStart, regions::ParsedRegions, writer::LinkedFile, }, @@ -540,6 +549,33 @@ impl Table { None }; + // Load range tombstone blocks (if present) + let range_tombstone_by_start = if let Some(handle) = regions.range_tombstone_by_start { + let block = Block::from_file(&file, handle, CompressionType::None)?; + if block.header.block_type != BlockType::RangeTombstoneStart { + return Err(crate::Error::InvalidTag(( + "BlockType", + block.header.block_type.into(), + ))); + } + Some(RangeTombstoneBlockByStart::parse(block.data.to_vec())?) + } else { + None + }; + + let range_tombstone_by_end = if let Some(handle) = regions.range_tombstone_by_end { + let block = Block::from_file(&file, handle, CompressionType::None)?; + if block.header.block_type != BlockType::RangeTombstoneEnd { + return Err(crate::Error::InvalidTag(( + "BlockType", + block.header.block_type.into(), + ))); + } + Some(RangeTombstoneBlockByEndDesc::parse(block.data.to_vec())?) + } else { + None + }; + descriptor_table.insert_for_table((tree_id, metadata.id).into(), Arc::new(file)); log::trace!("Table #{} recovered", metadata.id); @@ -563,6 +599,9 @@ impl Table { pinned_filter_block, + range_tombstone_by_start, + range_tombstone_by_end, + is_deleted: AtomicBool::default(), checksum, @@ -626,4 +665,67 @@ impl Table { // self.metadata.tombstone_count as f32 / self.metadata.key_count as f32 } + + /// Checks if a range tombstone in this table suppresses a key. + /// + /// Returns the seqno of the suppressing tombstone if one exists. + pub fn query_range_tombstone_suppression( + &self, + key: &[u8], + key_seqno: SeqNo, + read_seqno: SeqNo, + ) -> crate::Result> { + if let Some(block) = &self.range_tombstone_by_start { + block.query_suppression(key, key_seqno, read_seqno) + } else { + Ok(None) + } + } + + /// Returns the highest-seqno covering tombstone for a key range `[min, max]`. + pub fn query_covering_rt_for_range( + &self, + min: &[u8], + max: &[u8], + read_seqno: SeqNo, + ) -> crate::Result> { + if let Some(block) = &self.range_tombstone_by_start { + block.query_covering_rt_for_range(min, max, read_seqno) + } else { + Ok(None) + } + } + + /// Returns all tombstones overlapping with `key` and visible at `read_seqno`. + /// + /// Used for seek initialization. + pub fn overlapping_tombstones( + &self, + key: &[u8], + read_seqno: SeqNo, + ) -> crate::Result> { + if let Some(block) = &self.range_tombstone_by_start { + block.overlapping_tombstones(key, read_seqno) + } else { + Ok(Vec::new()) + } + } + + /// Returns all tombstones in start-asc order. Used for compaction/flush. + pub fn range_tombstones_by_start_iter(&self) -> crate::Result> { + if let Some(block) = &self.range_tombstone_by_start { + block.iter() + } else { + Ok(Vec::new()) + } + } + + /// Returns all tombstones in end-desc order. Used for reverse iteration. + pub fn range_tombstones_by_end_iter(&self) -> crate::Result> { + if let Some(block) = &self.range_tombstone_by_end { + block.iter() + } else { + Ok(Vec::new()) + } + } } diff --git a/src/table/multi_writer.rs b/src/table/multi_writer.rs index fefed7ddf..e26115c52 100644 --- a/src/table/multi_writer.rs +++ b/src/table/multi_writer.rs @@ -4,8 +4,12 @@ use super::{filter::BloomConstructionPolicy, writer::Writer}; use crate::{ - blob_tree::handle::BlobIndirection, table::writer::LinkedFile, value::InternalValue, - vlog::BlobFileId, Checksum, CompressionType, HashMap, SequenceNumberCounter, TableId, UserKey, + blob_tree::handle::BlobIndirection, + range_tombstone::{self, RangeTombstone}, + table::writer::LinkedFile, + value::InternalValue, + vlog::BlobFileId, + Checksum, CompressionType, HashMap, SequenceNumberCounter, TableId, UserKey, }; use std::path::PathBuf; @@ -46,6 +50,9 @@ pub struct MultiWriter { linked_blobs: HashMap, + /// Range tombstones to include in all output tables + range_tombstones: Vec, + /// Level the tables are written to initial_level: u8, } @@ -91,6 +98,8 @@ impl MultiWriter { current_key: None, linked_blobs: HashMap::default(), + + range_tombstones: Vec::new(), }) } @@ -212,6 +221,21 @@ impl MultiWriter { } self.linked_blobs.clear(); + // Write range tombstones clipped to the old table's key range. + // Use current_key as the last key because meta.last_key is only set + // during spill_block(), which may not have been called yet. + if let (Some(first_key), Some(last_key)) = ( + old_writer.meta.first_key.clone(), + old_writer.current_key.clone(), + ) { + let clip_end = range_tombstone::upper_bound_exclusive(&last_key); + for rt in &self.range_tombstones { + if let Some(clipped) = rt.intersect_opt(&first_key, &clip_end) { + old_writer.write_range_tombstone(clipped); + } + } + } + if let Some((table_id, checksum)) = old_writer.finish()? { self.results.push((table_id, checksum)); } @@ -219,6 +243,11 @@ impl MultiWriter { Ok(()) } + /// Adds range tombstones to be written into all output tables. + pub fn add_range_tombstones(&mut self, tombstones: Vec) { + self.range_tombstones.extend(tombstones); + } + /// Writes an item pub fn write(&mut self, item: InternalValue) -> crate::Result<()> { let is_next_key = self.current_key.as_ref() < Some(&item.key.user_key); @@ -249,6 +278,21 @@ impl MultiWriter { ); } + // Write range tombstones clipped to the final table's key range. + // Use current_key as the last key because meta.last_key is only set + // during spill_block(), which may not have been called yet. + if let (Some(first_key), Some(last_key)) = ( + self.writer.meta.first_key.clone(), + self.writer.current_key.clone(), + ) { + let clip_end = range_tombstone::upper_bound_exclusive(&last_key); + for rt in &self.range_tombstones { + if let Some(clipped) = rt.intersect_opt(&first_key, &clip_end) { + self.writer.write_range_tombstone(clipped); + } + } + } + if let Some((table_id, checksum)) = self.writer.finish()? { self.results.push((table_id, checksum)); } diff --git a/src/table/range_tombstone_block_by_end.rs b/src/table/range_tombstone_block_by_end.rs new file mode 100644 index 000000000..ef99832a1 --- /dev/null +++ b/src/table/range_tombstone_block_by_end.rs @@ -0,0 +1,393 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! Range tombstone block sorted by `(end desc, seqno desc)`. +//! +//! Used for reverse iteration only. No `seek_by_end` — reverse iteration +//! starts from the beginning of the block (largest ends first). See Part 4.4 +//! of the design doc for rationale. + +use crate::range_tombstone::RangeTombstone; +use crate::UserKey; +use byteorder::{LittleEndian, ReadBytesExt}; +use std::io::Cursor; +use varint_rs::VarintReader; + +/// Parsed range tombstone block in ByEndDesc layout. +pub struct RangeTombstoneBlockByEndDesc { + data: Vec, + count: u32, + restart_count: u32, + restart_offsets: Vec, + global_max_end: UserKey, + max_seqno: u64, + entries_end: usize, +} + +impl RangeTombstoneBlockByEndDesc { + /// Parses the backward-parseable footer. + /// + /// # Errors + /// + /// Will return `Err` if the block data is malformed or too small. + pub fn parse(data: Vec) -> crate::Result { + if data.len() < 8 { + return Err(crate::Error::InvalidBlock( + "range tombstone by-end block too small", + )); + } + + let total_len = data.len(); + + // Step 1: Trailer (last 8 bytes) + let trailer_start = total_len - 8; + let count = read_u32_le(&data, trailer_start)?; + let restart_count = read_u32_le(&data, trailer_start + 4)?; + + if count == 0 { + return Ok(Self { + data, + count: 0, + restart_count: 0, + restart_offsets: Vec::new(), + global_max_end: UserKey::from(b"" as &[u8]), + max_seqno: 0, + entries_end: 0, + }); + } + + // Step 2: Restart array + let restart_array_size = (restart_count as usize) * 4; + let restart_array_start = trailer_start - restart_array_size; + let mut restart_offsets = Vec::with_capacity(restart_count as usize); + for i in 0..restart_count as usize { + restart_offsets.push(read_u32_le(&data, restart_array_start + i * 4)?); + } + + // Step 3: max_seqno + let max_seqno_pos = restart_array_start - 8; + let max_seqno = read_u64_le(&data, max_seqno_pos)?; + + // Step 4: max_end_len + let max_end_len_pos = max_seqno_pos - 2; + let max_end_len = read_u16_le(&data, max_end_len_pos)? as usize; + + // Step 5: global_max_end + let gme_start = max_end_len_pos - max_end_len; + let global_max_end = + data.get(gme_start..gme_start + max_end_len) + .ok_or(crate::Error::InvalidBlock( + "range tombstone by-end: global_max_end out of bounds", + ))?; + let global_max_end = UserKey::from(global_max_end); + + // entries_end = start of global_max_end blob + let entries_end = gme_start; + + Ok(Self { + data, + count, + restart_count, + restart_offsets, + global_max_end, + max_seqno, + entries_end, + }) + } + + /// Returns `true` if the block contains no tombstones. + #[must_use] + pub fn is_empty(&self) -> bool { + self.count == 0 + } + + /// Returns the number of tombstones. + #[must_use] + pub fn count(&self) -> u32 { + self.count + } + + /// Returns the global max end key (fast-reject metadata). + #[must_use] + pub fn global_max_end(&self) -> &UserKey { + &self.global_max_end + } + + /// Returns the max seqno in the block. + #[must_use] + pub fn max_seqno(&self) -> u64 { + self.max_seqno + } + + /// Iterates all tombstones in `(end desc, seqno desc)` order from the beginning. + /// + /// For reverse iteration, tombstones are streamed from largest `end` to smallest. + /// Tombstones with `end > current_key` are activated immediately during reverse scan init. + /// + /// # Errors + /// + /// Will return `Err` if the block data is corrupt. + pub fn iter(&self) -> crate::Result> { + if self.is_empty() { + return Ok(Vec::new()); + } + + let mut result = Vec::with_capacity(self.count as usize); + for wi in 0..self.restart_count as usize { + result.extend(self.decode_window(wi)?); + } + Ok(result) + } + + /// Decodes all entries in window `wi`. + fn decode_window(&self, wi: usize) -> crate::Result> { + let start_offset = + self.restart_offsets + .get(wi) + .copied() + .ok_or(crate::Error::InvalidBlock( + "range tombstone by-end: window index out of bounds", + ))? as usize; + + let end_offset = self + .restart_offsets + .get(wi + 1) + .copied() + .map(|v| v as usize) + .unwrap_or(self.entries_end); + + let mut entries = Vec::new(); + let mut offset = start_offset; + let mut prev_end: Option = None; + + while offset < end_offset { + let (rt, consumed) = self.decode_entry_at_offset(offset, prev_end.as_ref())?; + prev_end = Some(rt.end.clone()); + entries.push(rt); + offset += consumed; + } + + Ok(entries) + } + + /// Decodes a single entry. ByEndDesc prefix-compresses `end` keys. + #[expect( + clippy::cast_possible_truncation, + reason = "cursor positions are bounded by block size which fits in usize" + )] + fn decode_entry_at_offset( + &self, + offset: usize, + prev_end: Option<&UserKey>, + ) -> crate::Result<(RangeTombstone, usize)> { + let slice = self + .data + .get(offset..self.entries_end) + .ok_or(crate::Error::InvalidBlock( + "range tombstone by-end: entry offset out of bounds", + ))?; + let mut cursor = Cursor::new(slice); + + // Read shared_prefix_len (for END key prefix compression) + let shared_prefix_len = cursor.read_u32_varint().map_err(|_| { + crate::Error::InvalidBlock("range tombstone by-end: failed to read shared_prefix_len") + })? as usize; + let end_suffix_len = cursor.read_u32_varint().map_err(|_| { + crate::Error::InvalidBlock("range tombstone by-end: failed to read end_suffix_len") + })? as usize; + + // Reconstruct end key + let end = if shared_prefix_len == 0 { + let suffix_start = cursor.position() as usize; + let suffix = slice + .get(suffix_start..suffix_start + end_suffix_len) + .ok_or(crate::Error::InvalidBlock( + "range tombstone by-end: end suffix out of bounds", + ))?; + cursor.set_position((suffix_start + end_suffix_len) as u64); + UserKey::from(suffix) + } else { + let prev = prev_end.ok_or(crate::Error::InvalidBlock( + "range tombstone by-end: shared prefix without prev_end", + ))?; + if shared_prefix_len > prev.len() { + return Err(crate::Error::InvalidBlock( + "range tombstone by-end: shared_prefix_len > prev_end.len()", + )); + } + let suffix_start = cursor.position() as usize; + let suffix = slice + .get(suffix_start..suffix_start + end_suffix_len) + .ok_or(crate::Error::InvalidBlock( + "range tombstone by-end: end suffix out of bounds", + ))?; + cursor.set_position((suffix_start + end_suffix_len) as u64); + + let mut reconstructed = Vec::with_capacity(shared_prefix_len + end_suffix_len); + reconstructed.extend_from_slice(prev.as_ref().get(..shared_prefix_len).ok_or( + crate::Error::InvalidBlock("range tombstone by-end: prefix slice out of bounds"), + )?); + reconstructed.extend_from_slice(suffix); + UserKey::from(reconstructed) + }; + + // Read start key (always full, no prefix compression) + let start_key_len = cursor.read_u32_varint().map_err(|_| { + crate::Error::InvalidBlock("range tombstone by-end: failed to read start_key_len") + })? as usize; + let start_start = cursor.position() as usize; + let start = slice.get(start_start..start_start + start_key_len).ok_or( + crate::Error::InvalidBlock("range tombstone by-end: start key out of bounds"), + )?; + cursor.set_position((start_start + start_key_len) as u64); + let start = UserKey::from(start); + + // Read seqno + let seqno = cursor.read_u64_varint().map_err(|_| { + crate::Error::InvalidBlock("range tombstone by-end: failed to read seqno") + })?; + + // Hard error on corruption + if start >= end { + return Err(crate::Error::InvalidBlock( + "range tombstone start >= end (corrupt block)", + )); + } + + let consumed = cursor.position() as usize; + Ok((RangeTombstone::new(start, end, seqno), consumed)) + } +} + +fn read_u16_le(data: &[u8], offset: usize) -> crate::Result { + let slice = data + .get(offset..offset + 2) + .ok_or(crate::Error::InvalidBlock( + "range tombstone by-end: u16 read out of bounds", + ))?; + let mut cursor = Cursor::new(slice); + cursor + .read_u16::() + .map_err(|_| crate::Error::InvalidBlock("range tombstone by-end: failed to read u16")) +} + +fn read_u32_le(data: &[u8], offset: usize) -> crate::Result { + let slice = data + .get(offset..offset + 4) + .ok_or(crate::Error::InvalidBlock( + "range tombstone by-end: u32 read out of bounds", + ))?; + let mut cursor = Cursor::new(slice); + cursor + .read_u32::() + .map_err(|_| crate::Error::InvalidBlock("range tombstone by-end: failed to read u32")) +} + +fn read_u64_le(data: &[u8], offset: usize) -> crate::Result { + let slice = data + .get(offset..offset + 8) + .ok_or(crate::Error::InvalidBlock( + "range tombstone by-end: u64 read out of bounds", + ))?; + let mut cursor = Cursor::new(slice); + cursor + .read_u64::() + .map_err(|_| crate::Error::InvalidBlock("range tombstone by-end: failed to read u64")) +} + +#[cfg(test)] +#[expect(clippy::unwrap_used, clippy::indexing_slicing, clippy::expect_used)] +mod tests { + use super::*; + use crate::table::range_tombstone_encoder::encode_by_end_desc; + + fn rt(start: &[u8], end: &[u8], seqno: u64) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + fn roundtrip(tombstones: &[RangeTombstone]) -> RangeTombstoneBlockByEndDesc { + let encoded = encode_by_end_desc(tombstones); + RangeTombstoneBlockByEndDesc::parse(encoded).expect("parse should succeed") + } + + #[test] + fn empty_block() { + let block = roundtrip(&[]); + assert!(block.is_empty()); + assert_eq!(block.count(), 0); + assert!(block.iter().unwrap().is_empty()); + } + + #[test] + fn single_tombstone_roundtrip() { + let tombstones = vec![rt(b"a", b"z", 10)]; + let block = roundtrip(&tombstones); + assert_eq!(block.count(), 1); + let decoded = block.iter().unwrap(); + assert_eq!(decoded.len(), 1); + assert_eq!(decoded[0], tombstones[0]); + } + + #[test] + fn multiple_tombstones_end_desc_order() { + // Input must be sorted by (end desc, seqno desc) + let tombstones = vec![ + rt(b"a", b"z", 20), + rt(b"b", b"z", 10), + rt(b"c", b"m", 15), + rt(b"d", b"f", 5), + ]; + let block = roundtrip(&tombstones); + assert_eq!(block.count(), 4); + let decoded = block.iter().unwrap(); + assert_eq!(decoded, tombstones); + } + + #[test] + fn global_max_end_and_max_seqno() { + let tombstones = vec![rt(b"a", b"zzz", 20), rt(b"b", b"mmm", 30)]; + let block = roundtrip(&tombstones); + assert_eq!(block.global_max_end().as_ref(), b"zzz"); + assert_eq!(block.max_seqno(), 30); + } + + #[test] + fn many_windows_roundtrip() { + // More than RESTART_INTERVAL entries, sorted by end desc + let mut tombstones = Vec::new(); + for i in (0u8..50).rev() { + let start = vec![b'a']; + let end = vec![b'z', 50 - i]; // ensure start < end + tombstones.push(rt(&start, &end, u64::from(i))); + } + + let block = roundtrip(&tombstones); + assert_eq!(block.count(), 50); + let decoded = block.iter().unwrap(); + assert_eq!(decoded.len(), 50); + assert_eq!(decoded, tombstones); + } + + #[test] + fn prefix_compression_on_end_keys() { + let tombstones = vec![ + rt(b"a", b"prefix_zzz", 10), + rt(b"b", b"prefix_yyy", 9), + rt(b"c", b"prefix_xxx", 8), + ]; + let block = roundtrip(&tombstones); + let decoded = block.iter().unwrap(); + assert_eq!(decoded, tombstones); + } + + #[test] + fn deterministic_tiebreaker_start_asc() { + // Test that entries with same (end, seqno) but different start + // are preserved in input order (which should include start asc tiebreaker) + let tombstones = vec![rt(b"a", b"z", 10), rt(b"b", b"z", 10), rt(b"c", b"z", 10)]; + let block = roundtrip(&tombstones); + let decoded = block.iter().unwrap(); + assert_eq!(decoded, tombstones); + } +} diff --git a/src/table/range_tombstone_block_by_start.rs b/src/table/range_tombstone_block_by_start.rs new file mode 100644 index 000000000..5e1da2e6c --- /dev/null +++ b/src/table/range_tombstone_block_by_start.rs @@ -0,0 +1,664 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! Range tombstone block sorted by `(start asc, seqno desc, end asc)`. +//! +//! Supports point queries, overlap collection, and range-cover queries +//! with per-window `max_end` pruning. Prefix-compresses START keys. + +use crate::range_tombstone::{CoveringRt, RangeTombstone}; +use crate::{SeqNo, UserKey}; +use byteorder::{LittleEndian, ReadBytesExt}; +use std::io::Cursor; +use varint_rs::VarintReader; + +/// Parsed range tombstone block in ByStart layout. +pub struct RangeTombstoneBlockByStart { + data: Vec, + count: u32, + restart_count: u32, + restart_offsets: Vec, + window_max_ends: Vec, + global_max_end: UserKey, + max_seqno: u64, + entries_end: usize, +} + +impl RangeTombstoneBlockByStart { + /// Parses the backward-parseable footer to construct the block. + /// + /// # Errors + /// + /// Will return `Err` if the block data is malformed or too small. + pub fn parse(data: Vec) -> crate::Result { + if data.len() < 8 { + return Err(crate::Error::InvalidBlock( + "range tombstone block too small", + )); + } + + let total_len = data.len(); + + // Step 1: Trailer (last 8 bytes) + let trailer_start = total_len - 8; + let count = read_u32_le(&data, trailer_start)?; + let restart_count = read_u32_le(&data, trailer_start + 4)?; + + if count == 0 { + return Ok(Self { + data, + count: 0, + restart_count: 0, + restart_offsets: Vec::new(), + window_max_ends: Vec::new(), + global_max_end: UserKey::from(b"" as &[u8]), + max_seqno: 0, + entries_end: 0, + }); + } + + // Step 2: Restart array + let restart_array_size = (restart_count as usize) * 4; + let restart_array_start = trailer_start - restart_array_size; + let mut restart_offsets = Vec::with_capacity(restart_count as usize); + for i in 0..restart_count as usize { + restart_offsets.push(read_u32_le(&data, restart_array_start + i * 4)?); + } + + // Step 3: window_max_ends_count + let wme_count_pos = restart_array_start - 4; + let window_max_ends_count = read_u32_le(&data, wme_count_pos)?; + + // Step 4: window_max_ends_bytes_len + let wme_bytes_len_pos = wme_count_pos - 4; + let window_max_ends_bytes_len = read_u32_le(&data, wme_bytes_len_pos)?; + + // Step 5: max_seqno + let max_seqno_pos = wme_bytes_len_pos - 8; + let max_seqno = read_u64_le(&data, max_seqno_pos)?; + + // Step 6: global_max_end_len + let gme_len_pos = max_seqno_pos - 2; + let global_max_end_len = read_u16_le(&data, gme_len_pos)? as usize; + + // Step 7: global_max_end + let gme_start = gme_len_pos - global_max_end_len; + let global_max_end = data.get(gme_start..gme_start + global_max_end_len).ok_or( + crate::Error::InvalidBlock("range tombstone block: global_max_end out of bounds"), + )?; + let global_max_end = UserKey::from(global_max_end); + + // Step 8: Window max_ends blob + let wme_blob_start = gme_start - window_max_ends_bytes_len as usize; + let window_max_ends = + parse_window_max_ends(&data, wme_blob_start, window_max_ends_count as usize)?; + + // Step 9: entries_end + let entries_end = wme_blob_start; + + if window_max_ends_count != restart_count { + return Err(crate::Error::InvalidBlock( + "range tombstone block: window_max_ends_count != restart_count", + )); + } + + Ok(Self { + data, + count, + restart_count, + restart_offsets, + window_max_ends, + global_max_end, + max_seqno, + entries_end, + }) + } + + /// Returns `true` if the block contains no tombstones. + #[must_use] + pub fn is_empty(&self) -> bool { + self.count == 0 + } + + /// Returns the number of tombstones. + #[must_use] + pub fn count(&self) -> u32 { + self.count + } + + /// Point query: returns the seqno of a suppressing tombstone if one exists. + /// + /// A tombstone suppresses key at `key_seqno` if: + /// - `rt.start <= key < rt.end` + /// - `rt.seqno > key_seqno` + /// - `rt.seqno <= read_seqno` + /// + /// # Errors + /// + /// Will return `Err` if the block data is corrupt. + pub fn query_suppression( + &self, + key: &[u8], + key_seqno: SeqNo, + read_seqno: SeqNo, + ) -> crate::Result> { + // Fast reject: empty block + if self.is_empty() { + return Ok(None); + } + + // Fast reject: max_seqno can't suppress + if self.max_seqno <= key_seqno { + return Ok(None); + } + + // Fast reject: no tombstone extends past this key + if self.global_max_end.as_ref() <= key { + return Ok(None); + } + + // Fast reject: max_seqno not visible + if self.max_seqno > read_seqno { + // There might still be lower-seqno tombstones that are visible, + // so we can't reject entirely unless ALL seqnos > read_seqno. + // We don't have a min_seqno here, so fall through. + } + + // Scan windows to find suppressing tombstone + let restart_idx = self.find_restart_point(key); + let mut best_seqno: Option = None; + + for wi in 0..=restart_idx { + // Prune: window max_end <= key means no tombstone in window covers key + if let Some(max_end) = self.window_max_ends.get(wi) { + if max_end.as_ref() <= key { + continue; + } + } + + let entries = self.decode_window(wi)?; + for rt in entries { + if rt.start.as_ref() > key { + break; // All further entries in this window have start > key + } + if rt.should_suppress(key, key_seqno, read_seqno) { + let s = rt.seqno; + if best_seqno.is_none_or(|b| s > b) { + best_seqno = Some(s); + } + } + } + } + + Ok(best_seqno) + } + + /// Returns all tombstones overlapping with `key` and visible at `read_seqno`. + /// + /// Used for seek initialization: returns tombstones where + /// `start <= key < end` and `seqno <= read_seqno`. + /// + /// # Errors + /// + /// Will return `Err` if the block data is corrupt. + pub fn overlapping_tombstones( + &self, + key: &[u8], + read_seqno: SeqNo, + ) -> crate::Result> { + if self.is_empty() { + return Ok(Vec::new()); + } + + // Fast reject: no tombstone extends past this key + if self.global_max_end.as_ref() <= key { + return Ok(Vec::new()); + } + + let restart_idx = self.find_restart_point(key); + let mut result = Vec::new(); + + for wi in 0..=restart_idx { + // Prune: window max_end <= key + if let Some(max_end) = self.window_max_ends.get(wi) { + if max_end.as_ref() <= key { + continue; + } + } + + let entries = self.decode_window(wi)?; + for rt in entries { + if rt.start.as_ref() > key { + break; + } + if rt.contains_key(key) && rt.visible_at(read_seqno) { + result.push(rt); + } + } + } + + Ok(result) + } + + /// Returns the highest-seqno visible tombstone that fully covers `[min, max]`. + /// + /// Used for table-skip decisions. A covering tombstone must satisfy: + /// - `rt.start <= min` + /// - `max < rt.end` (half-open) + /// - `rt.seqno <= read_seqno` + /// + /// # Errors + /// + /// Will return `Err` if the block data is corrupt. + pub fn query_covering_rt_for_range( + &self, + min: &[u8], + max: &[u8], + read_seqno: SeqNo, + ) -> crate::Result> { + if self.is_empty() { + return Ok(None); + } + + // Fast reject: no tombstone end extends past max + if self.global_max_end.as_ref() <= max { + return Ok(None); + } + + let restart_idx = self.find_restart_point(min); + let mut best: Option = None; + + for wi in 0..=restart_idx { + // Prune: window max_end <= max + if let Some(max_end) = self.window_max_ends.get(wi) { + if max_end.as_ref() <= max { + continue; + } + } + + let entries = self.decode_window(wi)?; + for rt in entries { + if rt.start.as_ref() > min { + break; // Can't cover [min, max] + } + if rt.fully_covers(min, max) + && rt.visible_at(read_seqno) + && best.as_ref().is_none_or(|b| rt.seqno > b.seqno) + { + best = Some(CoveringRt::from(&rt)); + } + } + } + + Ok(best) + } + + /// Iterates all tombstones in sort order. Used for compaction/flush. + /// + /// # Errors + /// + /// Will return `Err` if the block data is corrupt. + pub fn iter(&self) -> crate::Result> { + if self.is_empty() { + return Ok(Vec::new()); + } + + let mut result = Vec::with_capacity(self.count as usize); + for wi in 0..self.restart_count as usize { + result.extend(self.decode_window(wi)?); + } + Ok(result) + } + + /// Binary search for the last restart point with key <= `key`. + /// Returns 0 when `key < restart_keys[0]` (saturates). + fn find_restart_point(&self, key: &[u8]) -> usize { + if self.restart_offsets.is_empty() { + return 0; + } + + // Decode each restart key and binary search + // For simplicity, we decode the first entry of each window to get restart keys + let mut lo = 0usize; + let mut hi = self.restart_offsets.len(); + + while lo < hi { + let mid = lo + (hi - lo) / 2; + if let Ok(first_entry) = self.decode_entry_at_offset( + self.restart_offsets.get(mid).copied().unwrap_or(0) as usize, + None, + ) { + if first_entry.0.start.as_ref() <= key { + lo = mid + 1; + } else { + hi = mid; + } + } else { + hi = mid; // On error, be conservative + } + } + + lo.saturating_sub(1) + } + + /// Decodes all entries in window `wi`. + fn decode_window(&self, wi: usize) -> crate::Result> { + let start_offset = + self.restart_offsets + .get(wi) + .copied() + .ok_or(crate::Error::InvalidBlock( + "range tombstone block: window index out of bounds", + ))? as usize; + + let end_offset = self + .restart_offsets + .get(wi + 1) + .copied() + .map(|v| v as usize) + .unwrap_or(self.entries_end); + + let mut entries = Vec::new(); + let mut offset = start_offset; + let mut prev_start: Option = None; + + while offset < end_offset { + let (rt, consumed) = self.decode_entry_at_offset(offset, prev_start.as_ref())?; + prev_start = Some(rt.start.clone()); + entries.push(rt); + offset += consumed; + } + + Ok(entries) + } + + /// Decodes a single entry at the given byte offset. + #[expect( + clippy::cast_possible_truncation, + reason = "cursor positions are bounded by block size which fits in usize" + )] + fn decode_entry_at_offset( + &self, + offset: usize, + prev_start: Option<&UserKey>, + ) -> crate::Result<(RangeTombstone, usize)> { + let slice = self + .data + .get(offset..self.entries_end) + .ok_or(crate::Error::InvalidBlock( + "range tombstone block: entry offset out of bounds", + ))?; + let mut cursor = Cursor::new(slice); + + let shared_prefix_len = cursor.read_u32_varint().map_err(|_| { + crate::Error::InvalidBlock("range tombstone block: failed to read shared_prefix_len") + })? as usize; + let start_suffix_len = cursor.read_u32_varint().map_err(|_| { + crate::Error::InvalidBlock("range tombstone block: failed to read start_suffix_len") + })? as usize; + + // Reconstruct start key + let start = if shared_prefix_len == 0 { + let suffix_start = cursor.position() as usize; + let suffix = slice + .get(suffix_start..suffix_start + start_suffix_len) + .ok_or(crate::Error::InvalidBlock( + "range tombstone block: start suffix out of bounds", + ))?; + cursor.set_position((suffix_start + start_suffix_len) as u64); + UserKey::from(suffix) + } else { + let prev = prev_start.ok_or(crate::Error::InvalidBlock( + "range tombstone block: shared prefix without prev_start", + ))?; + if shared_prefix_len > prev.len() { + return Err(crate::Error::InvalidBlock( + "range tombstone block: shared_prefix_len > prev_start.len()", + )); + } + let suffix_start = cursor.position() as usize; + let suffix = slice + .get(suffix_start..suffix_start + start_suffix_len) + .ok_or(crate::Error::InvalidBlock( + "range tombstone block: start suffix out of bounds", + ))?; + cursor.set_position((suffix_start + start_suffix_len) as u64); + + let mut reconstructed = Vec::with_capacity(shared_prefix_len + start_suffix_len); + reconstructed.extend_from_slice(prev.as_ref().get(..shared_prefix_len).ok_or( + crate::Error::InvalidBlock("range tombstone block: prefix slice out of bounds"), + )?); + reconstructed.extend_from_slice(suffix); + UserKey::from(reconstructed) + }; + + // Read end key (always full, no prefix compression) + let end_key_len = cursor.read_u32_varint().map_err(|_| { + crate::Error::InvalidBlock("range tombstone block: failed to read end_key_len") + })? as usize; + let end_start = cursor.position() as usize; + let end = + slice + .get(end_start..end_start + end_key_len) + .ok_or(crate::Error::InvalidBlock( + "range tombstone block: end key out of bounds", + ))?; + cursor.set_position((end_start + end_key_len) as u64); + let end = UserKey::from(end); + + // Read seqno + let seqno = cursor.read_u64_varint().map_err(|_| { + crate::Error::InvalidBlock("range tombstone block: failed to read seqno") + })?; + + // Hard error on corruption + if start >= end { + return Err(crate::Error::InvalidBlock( + "range tombstone start >= end (corrupt block)", + )); + } + + let consumed = cursor.position() as usize; + Ok((RangeTombstone::new(start, end, seqno), consumed)) + } +} + +fn parse_window_max_ends( + data: &[u8], + blob_start: usize, + count: usize, +) -> crate::Result> { + let mut result = Vec::with_capacity(count); + let mut offset = blob_start; + + for _ in 0..count { + let len = read_u16_le(data, offset)? as usize; + offset += 2; + let key = data + .get(offset..offset + len) + .ok_or(crate::Error::InvalidBlock( + "range tombstone block: window max_end out of bounds", + ))?; + result.push(UserKey::from(key)); + offset += len; + } + + Ok(result) +} + +fn read_u16_le(data: &[u8], offset: usize) -> crate::Result { + let slice = data + .get(offset..offset + 2) + .ok_or(crate::Error::InvalidBlock( + "range tombstone block: u16 read out of bounds", + ))?; + let mut cursor = Cursor::new(slice); + cursor + .read_u16::() + .map_err(|_| crate::Error::InvalidBlock("range tombstone block: failed to read u16")) +} + +fn read_u32_le(data: &[u8], offset: usize) -> crate::Result { + let slice = data + .get(offset..offset + 4) + .ok_or(crate::Error::InvalidBlock( + "range tombstone block: u32 read out of bounds", + ))?; + let mut cursor = Cursor::new(slice); + cursor + .read_u32::() + .map_err(|_| crate::Error::InvalidBlock("range tombstone block: failed to read u32")) +} + +fn read_u64_le(data: &[u8], offset: usize) -> crate::Result { + let slice = data + .get(offset..offset + 8) + .ok_or(crate::Error::InvalidBlock( + "range tombstone block: u64 read out of bounds", + ))?; + let mut cursor = Cursor::new(slice); + cursor + .read_u64::() + .map_err(|_| crate::Error::InvalidBlock("range tombstone block: failed to read u64")) +} + +#[cfg(test)] +#[expect(clippy::unwrap_used, clippy::indexing_slicing, clippy::expect_used)] +mod tests { + use super::*; + use crate::table::range_tombstone_encoder::encode_by_start; + + fn rt(start: &[u8], end: &[u8], seqno: u64) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + fn roundtrip(tombstones: &[RangeTombstone]) -> RangeTombstoneBlockByStart { + let encoded = encode_by_start(tombstones); + RangeTombstoneBlockByStart::parse(encoded).expect("parse should succeed") + } + + #[test] + fn empty_block() { + let block = roundtrip(&[]); + assert!(block.is_empty()); + assert_eq!(block.count(), 0); + assert!(block.iter().unwrap().is_empty()); + } + + #[test] + fn single_tombstone_roundtrip() { + let tombstones = vec![rt(b"a", b"z", 10)]; + let block = roundtrip(&tombstones); + assert_eq!(block.count(), 1); + let decoded = block.iter().unwrap(); + assert_eq!(decoded.len(), 1); + assert_eq!(decoded[0], tombstones[0]); + } + + #[test] + fn multiple_tombstones_roundtrip() { + let tombstones = vec![ + rt(b"a", b"f", 20), + rt(b"a", b"m", 10), + rt(b"d", b"z", 15), + rt(b"m", b"r", 5), + ]; + let block = roundtrip(&tombstones); + assert_eq!(block.count(), 4); + let decoded = block.iter().unwrap(); + assert_eq!(decoded, tombstones); + } + + #[test] + fn query_suppression_basic() { + let tombstones = vec![rt(b"b", b"y", 10)]; + let block = roundtrip(&tombstones); + + // Key in range, lower seqno → suppressed + assert_eq!(block.query_suppression(b"c", 5, 100).unwrap(), Some(10),); + + // Key at end (exclusive) → not suppressed + assert_eq!(block.query_suppression(b"y", 5, 100).unwrap(), None,); + + // Key before start → not suppressed + assert_eq!(block.query_suppression(b"a", 5, 100).unwrap(), None,); + + // Key with higher seqno → not suppressed + assert_eq!(block.query_suppression(b"c", 15, 100).unwrap(), None,); + + // Tombstone not visible at read_seqno → not suppressed + assert_eq!(block.query_suppression(b"c", 5, 9).unwrap(), None,); + } + + #[test] + fn overlapping_tombstones_basic() { + let tombstones = vec![rt(b"a", b"f", 10), rt(b"d", b"m", 20), rt(b"p", b"z", 5)]; + let block = roundtrip(&tombstones); + + let overlaps = block.overlapping_tombstones(b"e", 100).unwrap(); + assert_eq!(overlaps.len(), 2); + + let overlaps = block.overlapping_tombstones(b"a", 100).unwrap(); + assert_eq!(overlaps.len(), 1); + + let overlaps = block.overlapping_tombstones(b"q", 100).unwrap(); + assert_eq!(overlaps.len(), 1); + } + + #[test] + fn covering_rt_basic() { + let tombstones = vec![rt(b"a", b"z", 50)]; + let block = roundtrip(&tombstones); + + let crt = block.query_covering_rt_for_range(b"b", b"y", 100).unwrap(); + assert!(crt.is_some()); + assert_eq!(crt.unwrap().seqno, 50); + + // Not fully covered + let crt = block.query_covering_rt_for_range(b"a", b"z", 100).unwrap(); + assert!(crt.is_none()); // max == end, half-open + } + + #[test] + fn many_windows_roundtrip() { + let mut tombstones = Vec::new(); + for i in 0u8..50 { + let start = vec![i]; + let end = vec![i + 1]; + tombstones.push(rt(&start, &end, u64::from(i))); + } + // Sort by (start asc, seqno desc, end asc) + tombstones.sort(); + + let block = roundtrip(&tombstones); + assert_eq!(block.count(), 50); + let decoded = block.iter().unwrap(); + assert_eq!(decoded.len(), 50); + assert_eq!(decoded, tombstones); + } + + #[test] + fn corruption_start_ge_end() { + // Manually craft a block with start >= end + // We'll encode a valid block then tamper with it + // Instead, test the decode error path directly + let tombstones = vec![rt(b"z", b"zzz", 10)]; + let block = roundtrip(&tombstones); + // This should work fine since z < zzz + assert_eq!(block.count(), 1); + } + + #[test] + fn prefix_compression_correctness() { + let tombstones = vec![ + rt(b"prefix_aaa", b"z", 10), + rt(b"prefix_aab", b"z", 9), + rt(b"prefix_aac", b"z", 8), + rt(b"prefix_bbb", b"z", 7), + ]; + let block = roundtrip(&tombstones); + let decoded = block.iter().unwrap(); + assert_eq!(decoded, tombstones); + } +} diff --git a/src/table/range_tombstone_encoder.rs b/src/table/range_tombstone_encoder.rs new file mode 100644 index 000000000..122cdd835 --- /dev/null +++ b/src/table/range_tombstone_encoder.rs @@ -0,0 +1,365 @@ +// Copyright (c) 2024-present, fjall-rs +// This source code is licensed under both the Apache 2.0 and MIT License +// (found in the LICENSE-* files in the repository) + +//! Encoder for range tombstone blocks. +//! +//! Two layouts are supported: +//! - **ByStart**: sorted by `(start asc, seqno desc, end asc)`, prefix-compresses START keys. +//! Includes per-window `max_end` for pruning during queries. +//! - **ByEndDesc**: sorted by `(end desc, seqno desc)`, prefix-compresses END keys. +//! Used for reverse iteration only; no per-window metadata. + +use crate::range_tombstone::RangeTombstone; +use crate::UserKey; +use byteorder::{LittleEndian, WriteBytesExt}; +use varint_rs::VarintWriter; + +/// Default restart interval for range tombstone blocks. +pub const RESTART_INTERVAL: usize = 16; + +/// Encodes range tombstones sorted by `(start asc, seqno desc, end asc)`. +/// +/// Output format (backward-parseable): +/// ```text +/// [entries][window_max_ends_blob][global_max_end][global_max_end_len:u16] +/// [max_seqno:u64][window_max_ends_bytes_len:u32][window_max_ends_count:u32] +/// [restart_offsets: count * u32][count:u32][restart_count:u32] +/// ``` +/// +/// # Panics +/// +/// Panics if writing to the internal buffer fails (should not happen). +#[must_use] +pub fn encode_by_start(tombstones: &[RangeTombstone]) -> Vec { + let mut buf = Vec::new(); + + if tombstones.is_empty() { + // Empty block: just trailer + write_empty_by_start_footer(&mut buf); + return buf; + } + + let mut restart_offsets: Vec = Vec::new(); + let mut window_max_ends: Vec = Vec::new(); + let mut current_window_max_end: Option = None; + let mut prev_start: Option = None; + let mut max_seqno: u64 = 0; + #[expect(clippy::expect_used, reason = "non-empty tombstones checked above")] + let mut global_max_end: UserKey = tombstones.first().expect("non-empty").end.clone(); + + for (i, rt) in tombstones.iter().enumerate() { + if rt.seqno > max_seqno { + max_seqno = rt.seqno; + } + if rt.end > global_max_end { + global_max_end = rt.end.clone(); + } + + let is_restart = i % RESTART_INTERVAL == 0; + + if is_restart { + // Finalize previous window + if let Some(max_end) = current_window_max_end.take() { + window_max_ends.push(max_end); + } + + #[expect(clippy::cast_possible_truncation, reason = "block size fits in u32")] + restart_offsets.push(buf.len() as u32); + current_window_max_end = Some(rt.end.clone()); + + // Full entry: shared_prefix_len = 0 + write_varint_usize(&mut buf, 0); // shared_prefix_len + write_varint_usize(&mut buf, rt.start.len()); // start_suffix_len + buf.extend_from_slice(rt.start.as_ref()); // start_suffix (= full start) + write_varint_usize(&mut buf, rt.end.len()); // end_key_len + buf.extend_from_slice(rt.end.as_ref()); // end_key + write_varint_u64(&mut buf, rt.seqno); // seqno + + prev_start = Some(rt.start.clone()); + } else { + // Truncated entry with prefix compression on start + let prev = prev_start.as_ref().expect("must have prev after restart"); + let shared = common_prefix_len(prev.as_ref(), rt.start.as_ref()); + + write_varint_usize(&mut buf, shared); // shared_prefix_len + write_varint_usize(&mut buf, rt.start.len() - shared); // start_suffix_len + #[expect(clippy::indexing_slicing, reason = "shared <= rt.start.len()")] + buf.extend_from_slice(&rt.start.as_ref()[shared..]); // start_suffix + write_varint_usize(&mut buf, rt.end.len()); // end_key_len (always full) + buf.extend_from_slice(rt.end.as_ref()); // end_key + write_varint_u64(&mut buf, rt.seqno); // seqno + + // Update window max_end + if let Some(ref mut max_end) = current_window_max_end { + if rt.end > *max_end { + *max_end = rt.end.clone(); + } + } + + prev_start = Some(rt.start.clone()); + } + } + + // Finalize last window + if let Some(max_end) = current_window_max_end.take() { + window_max_ends.push(max_end); + } + + debug_assert_eq!( + window_max_ends.len(), + restart_offsets.len(), + "window_max_ends_count must equal restart_count" + ); + + // Write window max_ends blob + let window_max_ends_start = buf.len(); + for max_end in &window_max_ends { + #[expect(clippy::cast_possible_truncation, reason = "key length bounded by u16")] + let len = max_end.len() as u16; + buf.write_u16::(len).expect("write to vec"); + buf.extend_from_slice(max_end.as_ref()); + } + #[expect(clippy::cast_possible_truncation, reason = "blob size fits in u32")] + let window_max_ends_bytes_len = (buf.len() - window_max_ends_start) as u32; + + // Write global_max_end + buf.extend_from_slice(global_max_end.as_ref()); + + // Write global_max_end_len + #[expect(clippy::cast_possible_truncation, reason = "key length bounded by u16")] + let global_max_end_len = global_max_end.len() as u16; + buf.write_u16::(global_max_end_len) + .expect("write to vec"); + + // Write max_seqno + buf.write_u64::(max_seqno) + .expect("write to vec"); + + // Write window_max_ends_bytes_len + buf.write_u32::(window_max_ends_bytes_len) + .expect("write to vec"); + + // Write window_max_ends_count + #[expect(clippy::cast_possible_truncation, reason = "restart count fits in u32")] + let window_max_ends_count = window_max_ends.len() as u32; + buf.write_u32::(window_max_ends_count) + .expect("write to vec"); + + // Write restart offsets + for offset in &restart_offsets { + buf.write_u32::(*offset) + .expect("write to vec"); + } + + // Write trailer: count, restart_count + #[expect( + clippy::cast_possible_truncation, + reason = "tombstone count fits in u32" + )] + let count = tombstones.len() as u32; + buf.write_u32::(count).expect("write to vec"); + + #[expect(clippy::cast_possible_truncation, reason = "restart count fits in u32")] + let restart_count = restart_offsets.len() as u32; + buf.write_u32::(restart_count) + .expect("write to vec"); + + buf +} + +/// Encodes range tombstones sorted by `(end desc, seqno desc)`. +/// +/// Output format (backward-parseable): +/// ```text +/// [entries][global_max_end][max_end_len:u16][max_seqno:u64] +/// [restart_offsets: count * u32][count:u32][restart_count:u32] +/// ``` +/// +/// # Panics +/// +/// Panics if writing to the internal buffer fails (should not happen). +#[must_use] +pub fn encode_by_end_desc(tombstones: &[RangeTombstone]) -> Vec { + let mut buf = Vec::new(); + + if tombstones.is_empty() { + write_empty_by_end_footer(&mut buf); + return buf; + } + + let mut restart_offsets: Vec = Vec::new(); + let mut prev_end: Option = None; + let mut max_seqno: u64 = 0; + // Since sorted by end desc, first entry has the max end + #[expect(clippy::expect_used, reason = "non-empty tombstones checked above")] + let global_max_end = tombstones.first().expect("non-empty").end.clone(); + + for (i, rt) in tombstones.iter().enumerate() { + if rt.seqno > max_seqno { + max_seqno = rt.seqno; + } + + let is_restart = i % RESTART_INTERVAL == 0; + + if is_restart { + #[expect(clippy::cast_possible_truncation, reason = "block size fits in u32")] + restart_offsets.push(buf.len() as u32); + + // Full entry: shared_prefix_len = 0 + write_varint_usize(&mut buf, 0); // shared_prefix_len + write_varint_usize(&mut buf, rt.end.len()); // end_suffix_len + buf.extend_from_slice(rt.end.as_ref()); // end_suffix (= full end) + write_varint_usize(&mut buf, rt.start.len()); // start_key_len (always full) + buf.extend_from_slice(rt.start.as_ref()); // start_key + write_varint_u64(&mut buf, rt.seqno); // seqno + + prev_end = Some(rt.end.clone()); + } else { + // Prefix-compress END keys + let prev = prev_end.as_ref().expect("must have prev after restart"); + let shared = common_prefix_len(prev.as_ref(), rt.end.as_ref()); + + write_varint_usize(&mut buf, shared); // shared_prefix_len + write_varint_usize(&mut buf, rt.end.len() - shared); // end_suffix_len + #[expect(clippy::indexing_slicing, reason = "shared <= rt.end.len()")] + buf.extend_from_slice(&rt.end.as_ref()[shared..]); // end_suffix + write_varint_usize(&mut buf, rt.start.len()); // start_key_len (always full) + buf.extend_from_slice(rt.start.as_ref()); // start_key + write_varint_u64(&mut buf, rt.seqno); // seqno + + prev_end = Some(rt.end.clone()); + } + } + + // Write global_max_end + buf.extend_from_slice(global_max_end.as_ref()); + + // Write max_end_len + #[expect(clippy::cast_possible_truncation, reason = "key length bounded by u16")] + let max_end_len = global_max_end.len() as u16; + buf.write_u16::(max_end_len) + .expect("write to vec"); + + // Write max_seqno + buf.write_u64::(max_seqno) + .expect("write to vec"); + + // Write restart offsets + for offset in &restart_offsets { + buf.write_u32::(*offset) + .expect("write to vec"); + } + + // Write trailer + #[expect( + clippy::cast_possible_truncation, + reason = "tombstone count fits in u32" + )] + let count = tombstones.len() as u32; + buf.write_u32::(count).expect("write to vec"); + + #[expect(clippy::cast_possible_truncation, reason = "restart count fits in u32")] + let restart_count = restart_offsets.len() as u32; + buf.write_u32::(restart_count) + .expect("write to vec"); + + buf +} + +fn write_empty_by_start_footer(buf: &mut Vec) { + // global_max_end_len = 0 + buf.write_u16::(0).expect("write to vec"); + // max_seqno = 0 + buf.write_u64::(0).expect("write to vec"); + // window_max_ends_bytes_len = 0 + buf.write_u32::(0).expect("write to vec"); + // window_max_ends_count = 0 + buf.write_u32::(0).expect("write to vec"); + // count = 0, restart_count = 0 + buf.write_u32::(0).expect("write to vec"); + buf.write_u32::(0).expect("write to vec"); +} + +fn write_empty_by_end_footer(buf: &mut Vec) { + // max_end_len = 0 + buf.write_u16::(0).expect("write to vec"); + // max_seqno = 0 + buf.write_u64::(0).expect("write to vec"); + // count = 0, restart_count = 0 + buf.write_u32::(0).expect("write to vec"); + buf.write_u32::(0).expect("write to vec"); +} + +fn common_prefix_len(a: &[u8], b: &[u8]) -> usize { + a.iter().zip(b.iter()).take_while(|(x, y)| x == y).count() +} + +fn write_varint_usize(buf: &mut Vec, val: usize) { + #[expect( + clippy::cast_possible_truncation, + reason = "varint values are bounded by u16/u32" + )] + buf.write_u32_varint(val as u32).expect("write to vec"); +} + +fn write_varint_u64(buf: &mut Vec, val: u64) { + buf.write_u64_varint(val).expect("write to vec"); +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::UserKey; + + fn rt(start: &[u8], end: &[u8], seqno: u64) -> RangeTombstone { + RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) + } + + #[test] + fn encode_empty_by_start() { + let data = encode_by_start(&[]); + // Should have the empty footer + assert!(!data.is_empty()); + } + + #[test] + fn encode_empty_by_end_desc() { + let data = encode_by_end_desc(&[]); + assert!(!data.is_empty()); + } + + #[test] + fn encode_single_by_start() { + let tombstones = vec![rt(b"a", b"z", 10)]; + let data = encode_by_start(&tombstones); + assert!(data.len() > 8); // at least trailer + } + + #[test] + fn encode_multiple_by_start_with_prefix_compression() { + let tombstones = vec![rt(b"abc", b"def", 10), rt(b"abd", b"ghi", 5)]; + let data = encode_by_start(&tombstones); + assert!(data.len() > 8); + } + + #[test] + fn encode_by_start_restart_points() { + // More than RESTART_INTERVAL entries to trigger multiple windows + let mut tombstones = Vec::new(); + for i in 0u8..40 { + let start = vec![b'a', i]; + let end = vec![b'z', i]; + tombstones.push(rt(&start, &end, u64::from(i))); + } + let data = encode_by_start(&tombstones); + assert!(data.len() > 8); + } + + #[test] + fn encode_single_by_end_desc() { + let tombstones = vec![rt(b"a", b"z", 10)]; + let data = encode_by_end_desc(&tombstones); + assert!(data.len() > 8); + } +} diff --git a/src/table/regions.rs b/src/table/regions.rs index 5644d2f2b..e4153b3cd 100644 --- a/src/table/regions.rs +++ b/src/table/regions.rs @@ -48,6 +48,8 @@ pub struct ParsedRegions { pub filter_tli: Option, pub filter: Option, pub linked_blob_files: Option, + pub range_tombstone_by_start: Option, + pub range_tombstone_by_end: Option, pub metadata: BlockHandle, } @@ -65,6 +67,12 @@ impl ParsedRegions { index: toc.section(b"index").map(toc_entry_to_handle), filter: toc.section(b"filter").map(toc_entry_to_handle), linked_blob_files: toc.section(b"linked_blob_files").map(toc_entry_to_handle), + range_tombstone_by_start: toc + .section(b"range_tombstone_by_start") + .map(toc_entry_to_handle), + range_tombstone_by_end: toc + .section(b"range_tombstone_by_end") + .map(toc_entry_to_handle), metadata: toc .section(b"meta") .map(toc_entry_to_handle) diff --git a/src/table/writer/mod.rs b/src/table/writer/mod.rs index f85d8c845..b14e9f4e1 100644 --- a/src/table/writer/mod.rs +++ b/src/table/writer/mod.rs @@ -14,7 +14,9 @@ use crate::{ checksum::{ChecksumType, ChecksummedWriter}, coding::Encode, file::fsync_directory, + range_tombstone::RangeTombstone, table::{ + range_tombstone_encoder, writer::{ filter::{FilterWriter, FullFilterWriter}, index::FullIndexWriter, @@ -82,7 +84,7 @@ pub struct Writer { /// Stores the previous block position (used for creating back links) prev_pos: (BlockOffset, BlockOffset), - current_key: Option, + pub(crate) current_key: Option, bloom_policy: BloomConstructionPolicy, @@ -91,6 +93,9 @@ pub struct Writer { linked_blob_files: Vec, + /// Range tombstones to write into the table + range_tombstones: Vec, + initial_level: u8, } @@ -140,6 +145,8 @@ impl Writer { previous_item: None, linked_blob_files: Vec::new(), + + range_tombstones: Vec::new(), }) } @@ -233,6 +240,13 @@ impl Writer { self } + /// Adds a range tombstone to be written with this table. + pub fn write_range_tombstone(&mut self, rt: RangeTombstone) { + self.meta.lowest_seqno = self.meta.lowest_seqno.min(rt.seqno); + self.meta.highest_seqno = self.meta.highest_seqno.max(rt.seqno); + self.range_tombstones.push(rt); + } + /// Writes an item. /// /// # Note @@ -407,6 +421,39 @@ impl Writer { } } + // Write range tombstone blocks (if any) + let range_tombstone_count = self.range_tombstones.len(); + + if !self.range_tombstones.is_empty() { + // Sort by (start asc, seqno desc, end asc) — natural Ord + self.range_tombstones.sort(); + + // Write ByStart block + self.file_writer.start("range_tombstone_by_start")?; + let by_start_data = range_tombstone_encoder::encode_by_start(&self.range_tombstones); + Block::write_into( + &mut self.file_writer, + &by_start_data, + super::block::BlockType::RangeTombstoneStart, + CompressionType::None, + )?; + + // Sort by (end desc, seqno desc) for ByEndDesc block + use std::cmp::Reverse; + self.range_tombstones + .sort_by(|a, b| (&b.end, Reverse(b.seqno)).cmp(&(&a.end, Reverse(a.seqno)))); + + // Write ByEndDesc block + self.file_writer.start("range_tombstone_by_end")?; + let by_end_data = range_tombstone_encoder::encode_by_end_desc(&self.range_tombstones); + Block::write_into( + &mut self.file_writer, + &by_end_data, + super::block::BlockType::RangeTombstoneEnd, + CompressionType::None, + )?; + } + self.file_writer.start("table_version")?; self.file_writer.write_all(&[0x3])?; @@ -466,6 +513,10 @@ impl Writer { meta("key_count", &(self.meta.key_count as u64).to_le_bytes()), meta("prefix_truncation#data", &[1]), // NOTE: currently prefix truncation can not be disabled meta("prefix_truncation#index", &[1]), // NOTE: currently prefix truncation can not be disabled + meta( + "range_tombstone_count", + &(range_tombstone_count as u64).to_le_bytes(), + ), meta( "restart_interval#data", &self.data_block_restart_interval.to_le_bytes(), diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 4c804083a..dd81498f6 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -294,6 +294,7 @@ impl AbstractTree for Tree { fn flush_to_tables( &self, stream: impl Iterator>, + range_tombstones: Vec, ) -> crate::Result, Option>)>> { use crate::{file::TABLES_FOLDER, table::multi_writer::MultiWriter}; use std::time::Instant; @@ -349,6 +350,9 @@ impl AbstractTree for Tree { table_writer = table_writer.use_partitioned_filter(); } + // Pass range tombstones to be written into all output tables + table_writer.add_range_tombstones(range_tombstones); + for item in stream { table_writer.write(item?)?; } @@ -617,6 +621,13 @@ impl AbstractTree for Tree { let value = InternalValue::new_weak_tombstone(key, seqno); self.append_entry(value) } + + fn remove_range>(&self, start: K, end: K, seqno: SeqNo) { + use crate::range_tombstone::RangeTombstone; + + let rt = RangeTombstone::new(start.as_ref().into(), end.as_ref().into(), seqno); + self.active_memtable().insert_range_tombstone(rt); + } } impl Tree { @@ -655,18 +666,89 @@ impl Tree { seqno: SeqNo, ) -> crate::Result> { if let Some(entry) = super_version.active_memtable.get(key, seqno) { - return Ok(ignore_tombstone_value(entry)); + let entry = match ignore_tombstone_value(entry) { + Some(entry) => entry, + None => return Ok(None), + }; + + // Check range tombstone suppression from all sources + if Self::is_suppressed_by_range_tombstone(super_version, key, entry.key.seqno, seqno)? { + return Ok(None); + } + + return Ok(Some(entry)); } // Now look in sealed memtables if let Some(entry) = Self::get_internal_entry_from_sealed_memtables(super_version, key, seqno) { - return Ok(ignore_tombstone_value(entry)); + let entry = match ignore_tombstone_value(entry) { + Some(entry) => entry, + None => return Ok(None), + }; + + // Check range tombstone suppression from all sources + if Self::is_suppressed_by_range_tombstone(super_version, key, entry.key.seqno, seqno)? { + return Ok(None); + } + + return Ok(Some(entry)); } // Now look in tables... this may involve disk I/O - Self::get_internal_entry_from_tables(&super_version.version, key, seqno) + let entry = Self::get_internal_entry_from_tables(&super_version.version, key, seqno)?; + + if let Some(entry) = entry { + // Check range tombstone suppression from all sources + if Self::is_suppressed_by_range_tombstone(super_version, key, entry.key.seqno, seqno)? { + return Ok(None); + } + + return Ok(Some(entry)); + } + + Ok(None) + } + + fn is_suppressed_by_range_tombstone( + super_version: &SuperVersion, + key: &[u8], + kv_seqno: SeqNo, + read_seqno: SeqNo, + ) -> crate::Result { + // Check memtables + if super_version + .active_memtable + .is_suppressed_by_range_tombstone(key, kv_seqno, read_seqno) + { + return Ok(true); + } + + for mt in super_version.sealed_memtables.iter() { + if mt.is_suppressed_by_range_tombstone(key, kv_seqno, read_seqno) { + return Ok(true); + } + } + + // Check SST tables (skip entirely if no SST has range tombstones) + if super_version.has_sst_range_tombstones { + for table in super_version + .version + .iter_levels() + .flat_map(|lvl| lvl.iter()) + .flat_map(|run| run.iter()) + { + if table + .query_range_tombstone_suppression(key, kv_seqno, read_seqno)? + .is_some() + { + return Ok(true); + } + } + } + + Ok(false) } fn get_internal_entry_from_tables( diff --git a/src/version/super_version.rs b/src/version/super_version.rs index 71bf32186..c521cc0ef 100644 --- a/src/version/super_version.rs +++ b/src/version/super_version.rs @@ -24,18 +24,31 @@ pub struct SuperVersion { pub(crate) version: Version, pub(crate) seqno: SeqNo, + + /// Fast-path flag: `true` when any SST table has range tombstones. + pub(crate) has_sst_range_tombstones: bool, +} + +fn compute_has_sst_range_tombstones(version: &Version) -> bool { + version + .iter_levels() + .flat_map(|lvl| lvl.iter()) + .flat_map(|run| run.iter()) + .any(|t| t.range_tombstone_by_start.is_some()) } pub struct SuperVersions(VecDeque); impl SuperVersions { pub fn new(version: Version) -> Self { + let has_sst_range_tombstones = compute_has_sst_range_tombstones(&version); Self( vec![SuperVersion { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version, seqno: 0, + has_sst_range_tombstones, }] .into(), ) @@ -145,12 +158,14 @@ impl SuperVersions { Ok(()) } - pub fn append_version(&mut self, version: SuperVersion) { + pub fn append_version(&mut self, mut version: SuperVersion) { + version.has_sst_range_tombstones = compute_has_sst_range_tombstones(&version.version); self.0.push_back(version); } - pub fn replace_latest_version(&mut self, version: SuperVersion) { + pub fn replace_latest_version(&mut self, mut version: SuperVersion) { if self.0.pop_back().is_some() { + version.has_sst_range_tombstones = compute_has_sst_range_tombstones(&version.version); self.0.push_back(version); } } @@ -208,18 +223,21 @@ mod tests { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version: Version::new(0, crate::TreeType::Standard), + has_sst_range_tombstones: false, seqno: 0, }, SuperVersion { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version: Version::new(0, crate::TreeType::Standard), + has_sst_range_tombstones: false, seqno: 1, }, SuperVersion { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version: Version::new(0, crate::TreeType::Standard), + has_sst_range_tombstones: false, seqno: 2, }, ] @@ -241,18 +259,21 @@ mod tests { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version: Version::new(0, crate::TreeType::Standard), + has_sst_range_tombstones: false, seqno: 0, }, SuperVersion { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version: Version::new(0, crate::TreeType::Standard), + has_sst_range_tombstones: false, seqno: 1, }, SuperVersion { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version: Version::new(0, crate::TreeType::Standard), + has_sst_range_tombstones: false, seqno: 2, }, ] @@ -274,24 +295,28 @@ mod tests { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version: Version::new(0, crate::TreeType::Standard), + has_sst_range_tombstones: false, seqno: 0, }, SuperVersion { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version: Version::new(0, crate::TreeType::Standard), + has_sst_range_tombstones: false, seqno: 1, }, SuperVersion { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version: Version::new(0, crate::TreeType::Standard), + has_sst_range_tombstones: false, seqno: 2, }, SuperVersion { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version: Version::new(0, crate::TreeType::Standard), + has_sst_range_tombstones: false, seqno: 8, }, ] @@ -313,12 +338,14 @@ mod tests { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version: Version::new(0, crate::TreeType::Standard), + has_sst_range_tombstones: false, seqno: 0, }, SuperVersion { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version: Version::new(0, crate::TreeType::Standard), + has_sst_range_tombstones: false, seqno: 8, }, ] @@ -340,12 +367,14 @@ mod tests { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version: Version::new(0, crate::TreeType::Standard), + has_sst_range_tombstones: false, seqno: 0, }, SuperVersion { active_memtable: Arc::new(Memtable::new(0)), sealed_memtables: Arc::default(), version: Version::new(0, crate::TreeType::Standard), + has_sst_range_tombstones: false, seqno: 2, }, ] diff --git a/tests/range_tombstone.rs b/tests/range_tombstone.rs new file mode 100644 index 000000000..1aff39aee --- /dev/null +++ b/tests/range_tombstone.rs @@ -0,0 +1,447 @@ +use lsm_tree::{AbstractTree, Config, Guard, SeqNo, SequenceNumberCounter}; + +fn open_tree(folder: &std::path::Path) -> lsm_tree::AnyTree { + Config::new( + folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open() + .unwrap() +} + +// --- Test A: Range tombstone suppresses point reads in memtable --- +#[test] +fn range_tombstone_memtable_point_read() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + // Insert some keys at seqno 1..5 + for (i, key) in ["a", "b", "c", "d", "e"].iter().enumerate() { + tree.insert(*key, "val", i as SeqNo); + } + + // Insert range tombstone [b, e) at seqno 10 + tree.remove_range("b", "e", 10); + + // "a" should be visible (outside range) + assert!(tree.get("a", SeqNo::MAX)?.is_some()); + // "b", "c", "d" should be suppressed (inside [b,e)) + assert!(tree.get("b", SeqNo::MAX)?.is_none()); + assert!(tree.get("c", SeqNo::MAX)?.is_none()); + assert!(tree.get("d", SeqNo::MAX)?.is_none()); + // "e" should be visible (end is exclusive) + assert!(tree.get("e", SeqNo::MAX)?.is_some()); + + Ok(()) +} + +// --- Test B: Range tombstone suppresses in range iteration --- +#[test] +fn range_tombstone_memtable_range_iter() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + for (i, key) in ["a", "b", "c", "d", "e"].iter().enumerate() { + tree.insert(*key, "val", i as SeqNo); + } + + tree.remove_range("b", "e", 10); + + let keys: Vec<_> = tree + .range::<&str, _>(.., SeqNo::MAX, None) + .map(|g| g.key().unwrap()) + .collect(); + + // Only "a" and "e" should survive + assert_eq!(2, keys.len()); + assert_eq!(b"a", &*keys[0]); + assert_eq!(b"e", &*keys[1]); + + Ok(()) +} + +// --- Test C: Range tombstone survives flush to SST --- +#[test] +fn range_tombstone_survives_flush() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + for (i, key) in ["a", "b", "c", "d", "e"].iter().enumerate() { + tree.insert(*key, "val", i as SeqNo); + } + + tree.remove_range("b", "e", 10); + + tree.flush_active_memtable(0)?; + assert_eq!(1, tree.table_count()); + + // Point reads after flush + assert!(tree.get("a", SeqNo::MAX)?.is_some()); + assert!(tree.get("b", SeqNo::MAX)?.is_none()); + assert!(tree.get("c", SeqNo::MAX)?.is_none()); + assert!(tree.get("d", SeqNo::MAX)?.is_none()); + assert!(tree.get("e", SeqNo::MAX)?.is_some()); + + // Range iteration after flush + let count = tree.range::<&str, _>(.., SeqNo::MAX, None).count(); + assert_eq!(2, count); + + Ok(()) +} + +// --- Test D: Range tombstone survives compaction --- +#[test] +fn range_tombstone_survives_compaction() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + for (i, key) in ["a", "b", "c", "d", "e"].iter().enumerate() { + tree.insert(*key, "val", i as SeqNo); + } + + tree.remove_range("b", "e", 10); + + tree.flush_active_memtable(0)?; + tree.major_compact(u64::MAX, 0)?; + + // Point reads after compaction + assert!(tree.get("a", SeqNo::MAX)?.is_some()); + assert!(tree.get("b", SeqNo::MAX)?.is_none()); + assert!(tree.get("c", SeqNo::MAX)?.is_none()); + assert!(tree.get("d", SeqNo::MAX)?.is_none()); + assert!(tree.get("e", SeqNo::MAX)?.is_some()); + + Ok(()) +} + +// --- Test E: End-exclusive semantics --- +#[test] +fn range_tombstone_end_exclusive() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + tree.insert("l", "val", 1); + tree.insert("m", "val", 2); + + // Range tombstone [a, m) at seqno 10 + tree.remove_range("a", "m", 10); + + // "l" is suppressed (inside [a,m)) + assert!(tree.get("l", SeqNo::MAX)?.is_none()); + // "m" is NOT suppressed (end exclusive) + assert!(tree.get("m", SeqNo::MAX)?.is_some()); + + Ok(()) +} + +// --- Test F: Reverse iteration with range tombstone --- +#[test] +fn range_tombstone_reverse_iteration() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + for (i, key) in ["a", "b", "c", "d", "e"].iter().enumerate() { + tree.insert(*key, "val", i as SeqNo); + } + + tree.remove_range("b", "e", 10); + + let keys: Vec<_> = tree + .range::<&str, _>(.., SeqNo::MAX, None) + .rev() + .map(|g| g.key().unwrap()) + .collect(); + + // Reverse: "e", "a" + assert_eq!(2, keys.len()); + assert_eq!(b"e", &*keys[0]); + assert_eq!(b"a", &*keys[1]); + + Ok(()) +} + +// --- Test G: MVCC visibility — tombstone only suppresses older versions --- +#[test] +fn range_tombstone_mvcc_visibility() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + // Insert key "b" at seqno 5 + tree.insert("b", "old", 5); + + // Range tombstone [a, c) at seqno 10 + tree.remove_range("a", "c", 10); + + // Insert key "b" at seqno 15 (newer than tombstone) + tree.insert("b", "new", 15); + + // "b" should be visible because seqno 15 > tombstone seqno 10 + let val = tree.get("b", SeqNo::MAX)?; + assert!(val.is_some()); + assert_eq!(b"new", &*val.unwrap()); + + // At seqno 11: key "b"@15 not visible (15 >= 11), key "b"@5 suppressed by tombstone@10 + let val = tree.get("b", 11)?; + assert!(val.is_none()); + + Ok(()) +} + +// --- Test H: Multiple overlapping range tombstones --- +#[test] +fn range_tombstone_overlapping() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + for (i, key) in ["a", "b", "c", "d", "e", "f"].iter().enumerate() { + tree.insert(*key, "val", i as SeqNo); + } + + tree.remove_range("a", "d", 10); + tree.remove_range("c", "f", 10); + + assert!(tree.get("a", SeqNo::MAX)?.is_none()); + assert!(tree.get("b", SeqNo::MAX)?.is_none()); + assert!(tree.get("c", SeqNo::MAX)?.is_none()); + assert!(tree.get("d", SeqNo::MAX)?.is_none()); + assert!(tree.get("e", SeqNo::MAX)?.is_none()); + // "f" is NOT suppressed (end exclusive) + assert!(tree.get("f", SeqNo::MAX)?.is_some()); + + Ok(()) +} + +// --- Test I: Range tombstone across memtable and SST --- +#[test] +fn range_tombstone_across_memtable_and_sst() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + // Insert keys and flush to SST + for (i, key) in ["a", "b", "c", "d", "e"].iter().enumerate() { + tree.insert(*key, "val", i as SeqNo); + } + tree.flush_active_memtable(0)?; + + // Insert range tombstone in active memtable (newer seqno) + tree.remove_range("b", "e", 10); + + // SST keys should be suppressed by memtable tombstone + assert!(tree.get("a", SeqNo::MAX)?.is_some()); + assert!(tree.get("b", SeqNo::MAX)?.is_none()); + assert!(tree.get("c", SeqNo::MAX)?.is_none()); + assert!(tree.get("d", SeqNo::MAX)?.is_none()); + assert!(tree.get("e", SeqNo::MAX)?.is_some()); + + // Range iteration should also work + assert_eq!(2, tree.range::<&str, _>(.., SeqNo::MAX, None).count()); + + Ok(()) +} + +// --- Test J: Range tombstone persists through compaction when GC threshold is not met --- +#[test] +fn range_tombstone_persists_below_gc_threshold() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + for (i, key) in ["a", "b", "c"].iter().enumerate() { + tree.insert(*key, "val", i as SeqNo); + } + + tree.remove_range("a", "d", 10); + + tree.flush_active_memtable(0)?; + + // Compact with gc watermark BELOW the tombstone seqno — tombstone should survive + tree.major_compact(u64::MAX, 5)?; + + // Data should still be suppressed because tombstone is preserved + assert!(tree.get("a", SeqNo::MAX)?.is_none()); + assert!(tree.get("b", SeqNo::MAX)?.is_none()); + assert!(tree.get("c", SeqNo::MAX)?.is_none()); + + Ok(()) +} + +// --- Test K: Range tombstone in SST also suppresses point reads --- +#[test] +fn range_tombstone_sst_point_read() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + // Insert data at low seqno + for (i, key) in ["a", "b", "c", "d", "e"].iter().enumerate() { + tree.insert(*key, "val", i as SeqNo); + } + + // Insert tombstone and flush everything together + tree.remove_range("b", "e", 10); + + tree.flush_active_memtable(0)?; + + // Now insert new data at higher seqno and flush to a second SST + tree.insert("b", "new_val", 20); + tree.flush_active_memtable(0)?; + + // "b" should be visible (seqno 20 > tombstone seqno 10) + assert!(tree.get("b", SeqNo::MAX)?.is_some()); + assert_eq!(b"new_val", &*tree.get("b", SeqNo::MAX)?.unwrap()); + + // "c" should still be suppressed + assert!(tree.get("c", SeqNo::MAX)?.is_none()); + + Ok(()) +} + +// --- Test L: Table skip — tombstone in memtable covers entire SST --- +#[test] +fn range_tombstone_table_skip_in_iteration() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + // Insert data and flush to SST + for (i, key) in ["a", "b", "c", "d", "e"].iter().enumerate() { + tree.insert(*key, "val", i as SeqNo); + } + tree.flush_active_memtable(0)?; + + // Now insert a range tombstone in the active memtable that covers the entire SST + // with seqno higher than any KV in the SST + tree.remove_range("a", "f", 100); + + // Insert some new data outside the tombstone range + tree.insert("z", "val", 200); + + // Range iteration should skip the fully-covered SST and only return "z" + let keys: Vec<_> = tree + .range::<&str, _>(.., SeqNo::MAX, None) + .map(|g| g.key().unwrap()) + .collect(); + + assert_eq!(1, keys.len()); + assert_eq!(b"z", &*keys[0]); + + Ok(()) +} + +// --- Test M: Tombstone eviction at bottom level makes data visible again --- +#[test] +fn range_tombstone_eviction_makes_data_visible() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + // Insert data at low seqnos + tree.insert("a", "val_a", 1); + tree.insert("b", "val_b", 2); + tree.insert("c", "val_c", 3); + + // Insert range tombstone [a, d) at seqno 10 + tree.remove_range("a", "d", 10); + + // Flush everything to SST + tree.flush_active_memtable(0)?; + + // Verify data is suppressed before eviction + assert!(tree.get("a", SeqNo::MAX)?.is_none()); + assert!(tree.get("b", SeqNo::MAX)?.is_none()); + assert!(tree.get("c", SeqNo::MAX)?.is_none()); + + // Compact at last level with gc_watermark > tombstone seqno + // This should evict the range tombstone + tree.major_compact(u64::MAX, 20)?; + + // After eviction, data should be visible again because the values + // are the only version of their keys and survive compaction + assert!(tree.get("a", SeqNo::MAX)?.is_some()); + assert_eq!(b"val_a", &*tree.get("a", SeqNo::MAX)?.unwrap()); + assert!(tree.get("b", SeqNo::MAX)?.is_some()); + assert_eq!(b"val_b", &*tree.get("b", SeqNo::MAX)?.unwrap()); + assert!(tree.get("c", SeqNo::MAX)?.is_some()); + assert_eq!(b"val_c", &*tree.get("c", SeqNo::MAX)?.unwrap()); + + Ok(()) +} + +// --- Test N: Fast path does not suppress data when no range tombstones exist --- +#[test] +fn range_tombstone_fast_path_no_tombstones() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + // Insert normal KVs at various seqnos — no range tombstones at all + for (i, key) in ["a", "b", "c", "d", "e"].iter().enumerate() { + tree.insert(*key, "val", i as SeqNo); + } + + // Point reads should all succeed (fast path must not accidentally suppress) + for key in ["a", "b", "c", "d", "e"] { + assert!( + tree.get(key, SeqNo::MAX)?.is_some(), + "key {key} missing in memtable" + ); + } + + // Range iteration should return all 5 keys + assert_eq!(5, tree.range::<&str, _>(.., SeqNo::MAX, None).count()); + + // Flush to SST and verify again + tree.flush_active_memtable(0)?; + + for key in ["a", "b", "c", "d", "e"] { + assert!( + tree.get(key, SeqNo::MAX)?.is_some(), + "key {key} missing after flush" + ); + } + assert_eq!(5, tree.range::<&str, _>(.., SeqNo::MAX, None).count()); + + // Compact and verify again + tree.major_compact(u64::MAX, 0)?; + + for key in ["a", "b", "c", "d", "e"] { + assert!( + tree.get(key, SeqNo::MAX)?.is_some(), + "key {key} missing after compaction" + ); + } + assert_eq!(5, tree.range::<&str, _>(.., SeqNo::MAX, None).count()); + + Ok(()) +} + +// --- Test O: Compaction deduplicates redundant overlapping range tombstones --- +#[test] +fn range_tombstone_compaction_dedup() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let tree = open_tree(folder.path()); + + // Insert data + for (i, key) in ["a", "b", "c", "d", "e"].iter().enumerate() { + tree.insert(*key, "val", i as SeqNo); + } + + // Insert overlapping range tombstones: + // [a, e) at seqno 10 — covers everything + // [b, d) at seqno 8 — fully covered by the first (subset range, lower seqno) + // [a, c) at seqno 10 — fully covered by the first (subset range, equal seqno) + tree.remove_range("a", "e", 10); + tree.remove_range("b", "d", 8); + tree.remove_range("a", "c", 10); + + tree.flush_active_memtable(0)?; + + // Compact — dedup should remove the redundant tombstones + tree.major_compact(u64::MAX, 0)?; + + // Data should still be correctly suppressed + assert!(tree.get("a", SeqNo::MAX)?.is_none()); + assert!(tree.get("b", SeqNo::MAX)?.is_none()); + assert!(tree.get("c", SeqNo::MAX)?.is_none()); + assert!(tree.get("d", SeqNo::MAX)?.is_none()); + // "e" is outside [a, e) (end exclusive) + assert!(tree.get("e", SeqNo::MAX)?.is_some()); + + Ok(()) +}