diff --git a/src/active_tombstone_set.rs b/src/active_tombstone_set.rs index 245a1fe1b..26c5734a0 100644 --- a/src/active_tombstone_set.rs +++ b/src/active_tombstone_set.rs @@ -26,31 +26,29 @@ use std::collections::{BTreeMap, BinaryHeap}; pub struct ActiveTombstoneSet { seqno_counts: BTreeMap, pending_expiry: BinaryHeap>, - cutoff_seqno: SeqNo, next_id: u64, } impl ActiveTombstoneSet { /// Creates a new forward active tombstone set. - /// - /// Only tombstones with `seqno < cutoff_seqno` will be activated. #[must_use] - pub fn new(cutoff_seqno: SeqNo) -> Self { + pub fn new() -> Self { Self { seqno_counts: BTreeMap::new(), pending_expiry: BinaryHeap::new(), - cutoff_seqno, next_id: 0, } } /// Activates a range tombstone, adding it to the active set. /// - /// The tombstone is only activated if it is visible at the cutoff seqno - /// (i.e., `rt.seqno < cutoff_seqno`). Duplicate activations (same seqno - /// from different sources) are handled correctly via multiset accounting. - pub fn activate(&mut self, rt: &RangeTombstone) { - if !rt.visible_at(self.cutoff_seqno) { + /// The tombstone is only activated if it is visible at `cutoff_seqno` + /// (i.e., `rt.seqno < cutoff_seqno`). Each source may supply a different + /// cutoff (e.g., ephemeral memtable uses its own `index_seqno`). + /// Duplicate activations (same seqno from different sources) are handled + /// correctly via multiset accounting. + pub fn activate(&mut self, rt: &RangeTombstone, cutoff_seqno: SeqNo) { + if !rt.visible_at(cutoff_seqno) { return; } let id = self.next_id; @@ -112,17 +110,20 @@ impl ActiveTombstoneSet { /// # Invariant /// /// At any iterator position, the active set contains only tombstones - /// where `start <= current_key < end` (and visible at `cutoff_seqno`). - /// Seek prefill must collect truly overlapping tombstones + /// where `start <= current_key < end` (and visible at their respective + /// `cutoff_seqno`). Seek prefill must collect truly overlapping tombstones /// (`start <= key < end`); `expire_until` immediately enforces the /// `end` bound. #[cfg_attr( not(test), expect(dead_code, reason = "used by iterator initialization logic") )] - pub fn initialize_from(&mut self, tombstones: impl IntoIterator) { - for rt in tombstones { - self.activate(&rt); + pub fn initialize_from( + &mut self, + tombstones: impl IntoIterator, + ) { + for (rt, cutoff) in tombstones { + self.activate(&rt, cutoff); } } @@ -148,35 +149,33 @@ impl ActiveTombstoneSet { pub struct ActiveTombstoneSetReverse { seqno_counts: BTreeMap, pending_expiry: BinaryHeap<(UserKey, u64, SeqNo)>, - cutoff_seqno: SeqNo, next_id: u64, } impl ActiveTombstoneSetReverse { /// Creates a new reverse active tombstone set. - /// - /// Only tombstones with `seqno < cutoff_seqno` will be activated. #[must_use] - pub fn new(cutoff_seqno: SeqNo) -> Self { + pub fn new() -> Self { Self { seqno_counts: BTreeMap::new(), pending_expiry: BinaryHeap::new(), - cutoff_seqno, next_id: 0, } } /// Activates a range tombstone, adding it to the active set. /// - /// The tombstone is only activated if it is visible at the cutoff seqno - /// (i.e., `rt.seqno < cutoff_seqno`). Duplicate activations (same seqno - /// from different sources) are handled correctly via multiset accounting. + /// The tombstone is only activated if it is visible at `cutoff_seqno` + /// (i.e., `rt.seqno < cutoff_seqno`). Each source may supply a different + /// cutoff (e.g., ephemeral memtable uses its own `index_seqno`). + /// Duplicate activations (same seqno from different sources) are handled + /// correctly via multiset accounting. /// /// For reverse iteration, activation uses strict `>`: tombstones with /// `rt.end > current_key` are activated. `key == end` is NOT covered /// (half-open). - pub fn activate(&mut self, rt: &RangeTombstone) { - if !rt.visible_at(self.cutoff_seqno) { + pub fn activate(&mut self, rt: &RangeTombstone, cutoff_seqno: SeqNo) { + if !rt.visible_at(cutoff_seqno) { return; } let id = self.next_id; @@ -236,9 +235,12 @@ impl ActiveTombstoneSetReverse { not(test), expect(dead_code, reason = "used by iterator initialization logic") )] - pub fn initialize_from(&mut self, tombstones: impl IntoIterator) { - for rt in tombstones { - self.activate(&rt); + pub fn initialize_from( + &mut self, + tombstones: impl IntoIterator, + ) { + for (rt, cutoff) in tombstones { + self.activate(&rt, cutoff); } } @@ -266,8 +268,8 @@ mod tests { #[test] fn forward_activate_and_suppress() { - let mut set = ActiveTombstoneSet::new(100); - set.activate(&rt(b"a", b"m", 10)); + let mut set = ActiveTombstoneSet::new(); + set.activate(&rt(b"a", b"m", 10), 100); assert!(set.is_suppressed(5)); assert!(!set.is_suppressed(10)); assert!(!set.is_suppressed(15)); @@ -275,8 +277,8 @@ mod tests { #[test] fn forward_expire_at_end() { - let mut set = ActiveTombstoneSet::new(100); - set.activate(&rt(b"a", b"m", 10)); + let mut set = ActiveTombstoneSet::new(); + set.activate(&rt(b"a", b"m", 10), 100); assert!(set.is_suppressed(5)); set.expire_until(b"m"); // key == end, tombstone expires assert!(!set.is_suppressed(5)); @@ -284,42 +286,42 @@ mod tests { #[test] fn forward_expire_past_end() { - let mut set = ActiveTombstoneSet::new(100); - set.activate(&rt(b"a", b"m", 10)); + let mut set = ActiveTombstoneSet::new(); + set.activate(&rt(b"a", b"m", 10), 100); set.expire_until(b"z"); assert!(set.is_empty()); } #[test] fn forward_not_expired_before_end() { - let mut set = ActiveTombstoneSet::new(100); - set.activate(&rt(b"a", b"m", 10)); + let mut set = ActiveTombstoneSet::new(); + set.activate(&rt(b"a", b"m", 10), 100); set.expire_until(b"l"); assert!(set.is_suppressed(5)); // still active } #[test] fn forward_invisible_tombstone_not_activated() { - let mut set = ActiveTombstoneSet::new(5); - set.activate(&rt(b"a", b"m", 10)); // seqno 10 > cutoff 5 + let mut set = ActiveTombstoneSet::new(); + set.activate(&rt(b"a", b"m", 10), 5); // seqno 10 > cutoff 5 assert!(!set.is_suppressed(1)); assert!(set.is_empty()); } #[test] fn forward_multiple_tombstones_max_seqno() { - let mut set = ActiveTombstoneSet::new(100); - set.activate(&rt(b"a", b"m", 10)); - set.activate(&rt(b"b", b"n", 20)); + let mut set = ActiveTombstoneSet::new(); + set.activate(&rt(b"a", b"m", 10), 100); + set.activate(&rt(b"b", b"n", 20), 100); assert_eq!(set.max_active_seqno(), Some(20)); assert!(set.is_suppressed(15)); // 15 < 20 } #[test] fn forward_duplicate_end_seqno_accounting() { - let mut set = ActiveTombstoneSet::new(100); - set.activate(&rt(b"a", b"m", 10)); - set.activate(&rt(b"b", b"m", 10)); + let mut set = ActiveTombstoneSet::new(); + set.activate(&rt(b"a", b"m", 10), 100); + set.activate(&rt(b"b", b"m", 10), 100); assert_eq!(set.max_active_seqno(), Some(10)); set.expire_until(b"m"); @@ -329,35 +331,51 @@ mod tests { #[test] fn forward_initialize_from() { - let mut set = ActiveTombstoneSet::new(100); - set.initialize_from(vec![rt(b"a", b"m", 10), rt(b"b", b"z", 20)]); + let mut set = ActiveTombstoneSet::new(); + set.initialize_from(vec![(rt(b"a", b"m", 10), 100), (rt(b"b", b"z", 20), 100)]); assert_eq!(set.max_active_seqno(), Some(20)); } #[test] fn forward_initialize_and_expire() { - let mut set = ActiveTombstoneSet::new(100); - set.initialize_from(vec![rt(b"a", b"d", 10), rt(b"b", b"f", 20)]); + let mut set = ActiveTombstoneSet::new(); + set.initialize_from(vec![(rt(b"a", b"d", 10), 100), (rt(b"b", b"f", 20), 100)]); set.expire_until(b"e"); // expires [a,d) but not [b,f) assert_eq!(set.max_active_seqno(), Some(20)); set.expire_until(b"f"); // expires [b,f) assert!(set.is_empty()); } + #[test] + fn forward_mixed_cutoffs_activates_only_visible_rt() { + let mut set = ActiveTombstoneSet::new(); + // RT from source with cutoff 15 — visible (10 < 15) + set.activate(&rt(b"a", b"m", 10), 15); + // RT from source with cutoff 5 — NOT visible (10 >= 5) + set.activate(&rt(b"a", b"z", 10), 5); + assert_eq!(set.max_active_seqno(), Some(10)); + assert!(!set.is_empty()); + + // Expire past the first RT's end; the set should now be empty if the + // second RT was never incorrectly activated. + set.expire_until(b"m"); + assert!(set.is_empty()); + } + // ──── Reverse tests ──── #[test] fn reverse_activate_and_suppress() { - let mut set = ActiveTombstoneSetReverse::new(100); - set.activate(&rt(b"a", b"m", 10)); + let mut set = ActiveTombstoneSetReverse::new(); + set.activate(&rt(b"a", b"m", 10), 100); assert!(set.is_suppressed(5)); assert!(!set.is_suppressed(10)); } #[test] fn reverse_expire_before_start() { - let mut set = ActiveTombstoneSetReverse::new(100); - set.activate(&rt(b"d", b"m", 10)); + let mut set = ActiveTombstoneSetReverse::new(); + set.activate(&rt(b"d", b"m", 10), 100); set.expire_until(b"c"); assert!(set.is_empty()); @@ -365,15 +383,15 @@ mod tests { #[test] fn reverse_initialize_from() { - let mut set = ActiveTombstoneSetReverse::new(100); - set.initialize_from(vec![rt(b"a", b"m", 10), rt(b"b", b"z", 20)]); + let mut set = ActiveTombstoneSetReverse::new(); + set.initialize_from(vec![(rt(b"a", b"m", 10), 100), (rt(b"b", b"z", 20), 100)]); assert_eq!(set.max_active_seqno(), Some(20)); } #[test] fn reverse_not_expired_at_start() { - let mut set = ActiveTombstoneSetReverse::new(100); - set.activate(&rt(b"d", b"m", 10)); + let mut set = ActiveTombstoneSetReverse::new(); + set.activate(&rt(b"d", b"m", 10), 100); set.expire_until(b"d"); assert!(set.is_suppressed(5)); @@ -381,16 +399,16 @@ mod tests { #[test] fn reverse_invisible_tombstone_not_activated() { - let mut set = ActiveTombstoneSetReverse::new(5); - set.activate(&rt(b"a", b"m", 10)); + let mut set = ActiveTombstoneSetReverse::new(); + set.activate(&rt(b"a", b"m", 10), 5); assert!(set.is_empty()); } #[test] fn reverse_duplicate_end_seqno_accounting() { - let mut set = ActiveTombstoneSetReverse::new(100); - set.activate(&rt(b"d", b"m", 10)); - set.activate(&rt(b"d", b"n", 10)); + let mut set = ActiveTombstoneSetReverse::new(); + set.activate(&rt(b"d", b"m", 10), 100); + set.activate(&rt(b"d", b"n", 10), 100); assert_eq!(set.max_active_seqno(), Some(10)); set.expire_until(b"c"); @@ -400,12 +418,29 @@ mod tests { #[test] fn reverse_multiple_tombstones() { - let mut set = ActiveTombstoneSetReverse::new(100); - set.activate(&rt(b"a", b"m", 10)); - set.activate(&rt(b"f", b"z", 20)); + let mut set = ActiveTombstoneSetReverse::new(); + set.activate(&rt(b"a", b"m", 10), 100); + set.activate(&rt(b"f", b"z", 20), 100); assert_eq!(set.max_active_seqno(), Some(20)); set.expire_until(b"e"); assert_eq!(set.max_active_seqno(), Some(10)); } + + #[test] + fn reverse_mixed_cutoffs_activates_only_visible_rt() { + let mut set = ActiveTombstoneSetReverse::new(); + // RT from source with cutoff 15 — visible (10 < 15) + set.activate(&rt(b"n", b"z", 10), 15); + // RT from source with cutoff 5 — NOT visible (10 >= 5) + set.activate(&rt(b"a", b"m", 10), 5); + assert_eq!(set.max_active_seqno(), Some(10)); + + // Advance expiry past the visible tombstone's start but not the + // invisible one's. If only the visible RT was activated, the set + // should become empty. + set.expire_until(b"l"); + assert_eq!(set.max_active_seqno(), None); + assert!(set.is_empty()); + } } diff --git a/src/range.rs b/src/range.rs index 4cac370d7..42403536e 100644 --- a/src/range.rs +++ b/src/range.rs @@ -182,7 +182,10 @@ impl TreeIter { ); let mut iters: Vec> = Vec::with_capacity(5); - let mut all_range_tombstones: Vec = Vec::new(); + // Each RT is paired with the per-source visibility cutoff so that + // ephemeral memtable RTs use their own index_seqno instead of the + // outer scan seqno (see issue #33). + let mut all_range_tombstones: Vec<(RangeTombstone, SeqNo)> = Vec::new(); let mut single_tables = Vec::new(); let mut multi_runs = Vec::new(); @@ -205,7 +208,7 @@ impl TreeIter { .range_tombstones() .iter() .filter(|rt| range_tombstone_overlaps_bounds(rt, &user_range)) - .cloned(), + .map(|rt| (rt.clone(), seqno)), ); // Check key range overlap first (cheap metadata check) before @@ -224,7 +227,7 @@ impl TreeIter { .range_tombstones() .iter() .filter(|rt| range_tombstone_overlaps_bounds(rt, &user_range)) - .cloned(), + .map(|rt| (rt.clone(), seqno)), ); } @@ -240,8 +243,8 @@ impl TreeIter { // RT stored in the same table won't trigger skip (conservative // but correct). Separate KV/RT seqno bounds would improve this. // key_range.max() is inclusive, fully_covers uses half-open: max < rt.end - let is_covered = all_range_tombstones.iter().any(|rt| { - rt.visible_at(seqno) + let is_covered = all_range_tombstones.iter().any(|(rt, cutoff)| { + rt.visible_at(*cutoff) && rt.fully_covers( table.metadata.key_range.min(), table.metadata.key_range.max(), @@ -276,7 +279,8 @@ impl TreeIter { memtable .range_tombstones_sorted() .into_iter() - .filter(|rt| range_tombstone_overlaps_bounds(rt, &user_range)), + .filter(|rt| range_tombstone_overlaps_bounds(rt, &user_range)) + .map(|rt| (rt, seqno)), ); let iter = memtable.range(range.clone()); @@ -294,7 +298,8 @@ impl TreeIter { .active_memtable .range_tombstones_sorted() .into_iter() - .filter(|rt| range_tombstone_overlaps_bounds(rt, &user_range)), + .filter(|rt| range_tombstone_overlaps_bounds(rt, &user_range)) + .map(|rt| (rt, seqno)), ); let iter = lock.version.active_memtable.range(range.clone()); @@ -305,16 +310,17 @@ impl TreeIter { )); } - if let Some((mt, seqno)) = &lock.ephemeral { + if let Some((mt, eph_seqno)) = &lock.ephemeral { all_range_tombstones.extend( mt.range_tombstones_sorted() .into_iter() - .filter(|rt| range_tombstone_overlaps_bounds(rt, &user_range)), + .filter(|rt| range_tombstone_overlaps_bounds(rt, &user_range)) + .map(|rt| (rt, *eph_seqno)), ); let iter = Box::new( mt.range(range) - .filter(move |item| seqno_filter(item.key.seqno, *seqno)) + .filter(move |item| seqno_filter(item.key.seqno, *eph_seqno)) .map(Ok), ); iters.push(iter); @@ -330,16 +336,31 @@ impl TreeIter { // Deduplicate: MultiWriter rotation copies the same RTs into each // output table, so collected tombstones can contain duplicates. - all_range_tombstones.sort(); - all_range_tombstones.dedup(); + // When the same RT appears from different sources with different + // cutoffs (e.g., persisted SST + ephemeral), keep the max cutoff + // so the RT stays visible if ANY source's snapshot includes it. + all_range_tombstones.sort_by(|a, b| a.0.cmp(&b.0)); + all_range_tombstones.dedup_by(|a, b| { + if a.0 == b.0 { + // dedup_by passes (a=later, b=earlier); b survives, a is + // removed. Merge a's cutoff into the surviving b. + b.1 = b.1.max(a.1); + true + } else { + false + } + }); - // Fast path: skip filter wrapping when no tombstone is visible at this - // read seqno. We collect RTs while building the iterator inputs to - // avoid a separate pre-scan over every memtable and SST. - if all_range_tombstones.iter().all(|rt| !rt.visible_at(seqno)) { + // Fast path: skip filter wrapping when no tombstone is visible at + // its per-source cutoff. Each RT carries the seqno of its originating + // source, so the check is per-RT rather than global. + if all_range_tombstones + .iter() + .all(|(rt, cutoff)| !rt.visible_at(*cutoff)) + { Box::new(iter) } else { - Box::new(RangeTombstoneFilter::new(iter, all_range_tombstones, seqno)) + Box::new(RangeTombstoneFilter::new(iter, all_range_tombstones)) } }) } diff --git a/src/range_tombstone_filter.rs b/src/range_tombstone_filter.rs index 41e0b5d22..d812b8cb4 100644 --- a/src/range_tombstone_filter.rs +++ b/src/range_tombstone_filter.rs @@ -15,16 +15,20 @@ use crate::range_tombstone::RangeTombstone; use crate::{InternalValue, SeqNo}; /// Wraps a bidirectional KV stream and suppresses entries covered by range tombstones. +/// +/// Each tombstone is paired with a per-source visibility cutoff (`SeqNo`). +/// Different sources may use different cutoffs — e.g., an ephemeral memtable +/// uses its own `index_seqno` while disk segments use the outer scan seqno. pub struct RangeTombstoneFilter { inner: I, - // Forward state - fwd_tombstones: Vec, + // Forward state: (tombstone, per-source cutoff) + fwd_tombstones: Vec<(RangeTombstone, SeqNo)>, fwd_idx: usize, fwd_active: ActiveTombstoneSet, - // Reverse state - rev_tombstones: Vec, + // Reverse state: (tombstone, per-source cutoff) + rev_tombstones: Vec<(RangeTombstone, SeqNo)>, rev_idx: usize, rev_active: ActiveTombstoneSetReverse, } @@ -32,33 +36,36 @@ pub struct RangeTombstoneFilter { impl RangeTombstoneFilter { /// Creates a new bidirectional filter. /// - /// `fwd_tombstones` need not be pre-sorted — the constructor sorts internally by natural Ord. - /// Internally, a second copy sorted by `(end desc, seqno desc)` is created for reverse. + /// Each tombstone is paired with its per-source visibility cutoff. + /// Forward tombstones need not be pre-sorted — the constructor sorts + /// internally. A second copy sorted by `(end desc, seqno desc)` is + /// created for reverse iteration. #[must_use] - pub fn new(inner: I, mut fwd_tombstones: Vec, read_seqno: SeqNo) -> Self { - // Ensure forward tombstones are sorted by natural order (start asc, seqno desc, end asc) - fwd_tombstones.sort(); + pub fn new(inner: I, mut fwd_tombstones: Vec<(RangeTombstone, SeqNo)>) -> Self { + // Sort by RT natural order (start asc, seqno desc, end asc). + // Callers may pre-sort for dedup; re-sorting is O(n) on sorted input. + fwd_tombstones.sort_by(|a, b| a.0.cmp(&b.0)); // Build reverse-sorted copy: (end desc, seqno desc) let mut rev_tombstones = fwd_tombstones.clone(); - rev_tombstones.sort_by(|a, b| (&b.end, &b.seqno).cmp(&(&a.end, &a.seqno))); + rev_tombstones.sort_by(|a, b| (&b.0.end, &b.0.seqno).cmp(&(&a.0.end, &a.0.seqno))); Self { inner, fwd_tombstones, fwd_idx: 0, - fwd_active: ActiveTombstoneSet::new(read_seqno), + fwd_active: ActiveTombstoneSet::new(), rev_tombstones, rev_idx: 0, - rev_active: ActiveTombstoneSetReverse::new(read_seqno), + rev_active: ActiveTombstoneSetReverse::new(), } } /// Activates forward tombstones whose start <= `current_key`. fn fwd_activate_up_to(&mut self, key: &[u8]) { - while let Some(rt) = self.fwd_tombstones.get(self.fwd_idx) { + while let Some((rt, cutoff)) = self.fwd_tombstones.get(self.fwd_idx) { if rt.start.as_ref() <= key { - self.fwd_active.activate(rt); + self.fwd_active.activate(rt, *cutoff); self.fwd_idx += 1; } else { break; @@ -68,9 +75,9 @@ impl RangeTombstoneFilter { /// Activates reverse tombstones whose end > `current_key`. fn rev_activate_up_to(&mut self, key: &[u8]) { - while let Some(rt) = self.rev_tombstones.get(self.rev_idx) { + while let Some((rt, cutoff)) = self.rev_tombstones.get(self.rev_idx) { if rt.end.as_ref() > key { - self.rev_active.activate(rt); + self.rev_active.activate(rt, *cutoff); self.rev_idx += 1; } else { break; @@ -148,12 +155,17 @@ mod tests { RangeTombstone::new(UserKey::from(start), UserKey::from(end), seqno) } + /// Helper: tag all tombstones with the same cutoff seqno. + fn tagged(tombstones: Vec, cutoff: SeqNo) -> Vec<(RangeTombstone, SeqNo)> { + tombstones.into_iter().map(|rt| (rt, cutoff)).collect() + } + #[test] fn items_no_tombstones_return_all() { let items: Vec> = vec![Ok(kv(b"a", 1)), Ok(kv(b"b", 2)), Ok(kv(b"c", 3))]; - let filter = RangeTombstoneFilter::new(items.into_iter(), vec![], SeqNo::MAX); + let filter = RangeTombstoneFilter::new(items.into_iter(), vec![]); let results: Vec<_> = filter.flatten().collect(); assert_eq!(results.len(), 3); } @@ -168,8 +180,8 @@ mod tests { Ok(kv(b"e", 5)), ]; - let tombstones = vec![rt(b"b", b"d", 10)]; - let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let tombstones = tagged(vec![rt(b"b", b"d", 10)], SeqNo::MAX); + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones); let results: Vec<_> = filter.flatten().collect(); let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); @@ -180,8 +192,8 @@ mod tests { fn items_newer_than_tombstone_survive() { let items: Vec> = vec![Ok(kv(b"b", 10)), Ok(kv(b"c", 3))]; - let tombstones = vec![rt(b"a", b"z", 5)]; - let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let tombstones = tagged(vec![rt(b"a", b"z", 5)], SeqNo::MAX); + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones); let results: Vec<_> = filter.flatten().collect(); let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); @@ -193,8 +205,8 @@ mod tests { let items: Vec> = vec![Ok(kv(b"b", 5)), Ok(kv(b"c", 5)), Ok(kv(b"d", 5))]; - let tombstones = vec![rt(b"b", b"d", 10)]; - let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let tombstones = tagged(vec![rt(b"b", b"d", 10)], SeqNo::MAX); + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones); let results: Vec<_> = filter.flatten().collect(); let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); @@ -210,8 +222,8 @@ mod tests { Ok(kv(b"d", 1)), ]; - let tombstones = vec![rt(b"a", b"c", 5), rt(b"b", b"e", 4)]; - let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let tombstones = tagged(vec![rt(b"a", b"c", 5), rt(b"b", b"e", 4)], SeqNo::MAX); + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones); let results: Vec<_> = filter.flatten().collect(); let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); @@ -222,13 +234,33 @@ mod tests { fn tombstone_newer_than_read_seqno_not_visible() { let items: Vec> = vec![Ok(kv(b"b", 3))]; - let tombstones = vec![rt(b"a", b"z", 10)]; - let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, 5); + // RT at seqno 10 with cutoff 5 — not visible + let tombstones = tagged(vec![rt(b"a", b"z", 10)], 5); + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones); let results: Vec<_> = filter.flatten().collect(); assert_eq!(results.len(), 1); } + #[test] + fn mixed_cutoffs_suppress_only_visible_source() { + // Two RTs with same seqno but different per-source cutoffs: + // RT from source A (cutoff 15) — visible (10 < 15), suppresses kv at seqno 5 + // RT from source B (cutoff 5) — NOT visible (10 >= 5), does not suppress + let items: Vec> = vec![Ok(kv(b"b", 5)), Ok(kv(b"x", 5))]; + + let tombstones = vec![ + (rt(b"a", b"d", 10), 15), // source A: visible + (rt(b"w", b"z", 10), 5), // source B: not visible + ]; + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones); + let results: Vec<_> = filter.flatten().collect(); + + let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); + // "b" suppressed by source-A RT, "x" survives (source-B RT invisible) + assert_eq!(keys, vec![b"x".as_ref()]); + } + #[test] fn rev_items_with_range_tombstone_suppress_covered_keys() { let items: Vec> = vec![ @@ -239,8 +271,8 @@ mod tests { Ok(kv(b"e", 5)), ]; - let tombstones = vec![rt(b"b", b"d", 10)]; - let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let tombstones = tagged(vec![rt(b"b", b"d", 10)], SeqNo::MAX); + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones); let results: Vec<_> = filter.rev().flatten().collect(); let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); @@ -252,8 +284,8 @@ mod tests { let items: Vec> = vec![Ok(kv(b"a", 5)), Ok(kv(b"l", 5)), Ok(kv(b"m", 5))]; - let tombstones = vec![rt(b"a", b"m", 10)]; - let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones, SeqNo::MAX); + let tombstones = tagged(vec![rt(b"a", b"m", 10)], SeqNo::MAX); + let filter = RangeTombstoneFilter::new(items.into_iter(), tombstones); let results: Vec<_> = filter.rev().flatten().collect(); let keys: Vec<&[u8]> = results.iter().map(|v| v.key.user_key.as_ref()).collect(); diff --git a/tests/range_tombstone_ephemeral.rs b/tests/range_tombstone_ephemeral.rs new file mode 100644 index 000000000..c7c14c78d --- /dev/null +++ b/tests/range_tombstone_ephemeral.rs @@ -0,0 +1,305 @@ +// Tests for per-source RT visibility when ephemeral memtable index_seqno +// differs from the outer scan seqno (issue #33). +// +// The ephemeral memtable is an overlay whose KV stream is gated at its own +// `index_seqno`. Range tombstones from the ephemeral source must use that +// same cutoff — not the outer scan seqno — so that: +// • Over-suppress is prevented (RT visible at outer_seqno but not at +// eph_seqno must NOT suppress keys from other sources). +// • Leak is prevented (RT visible at eph_seqno but not at outer_seqno +// must still suppress keys that entered the merged stream through the +// ephemeral source). + +// Guard: trait import required for .key() method on iterator items (IterGuard trait) +use lsm_tree::{ + get_tmp_folder, AbstractTree, AnyTree, Config, Guard, Memtable, SequenceNumberCounter, UserKey, +}; +use std::sync::Arc; +use test_log::test; + +fn open_tree(path: &std::path::Path) -> AnyTree { + Config::new( + path, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .open() + .expect("should open") +} + +/// Arbitrary ID for ephemeral memtables in tests (distinct from tree-managed IDs). +const EPHEMERAL_MT_ID: lsm_tree::MemtableId = 999; + +/// Build an ephemeral memtable with the given KVs and range tombstones. +fn build_ephemeral(kvs: &[(&[u8], &[u8], u64)], rts: &[(&[u8], &[u8], u64)]) -> Arc { + let mt = Arc::new(Memtable::new(EPHEMERAL_MT_ID)); + for &(key, val, seqno) in kvs { + mt.insert(lsm_tree::InternalValue::from_components( + key, + val, + seqno, + lsm_tree::ValueType::Value, + )); + } + for &(start, end, seqno) in rts { + assert!( + mt.insert_range_tombstone(UserKey::from(start), UserKey::from(end), seqno) > 0, + "insert_range_tombstone returned 0 (rejected)", + ); + } + mt +} + +/// Collect keys from a forward iterator. +/// Returns `Vec>` which compares correctly with `vec![b"a", b"b"]` +/// via Rust's `PartialEq` blanket impl for `Vec` where `T: PartialEq`. +fn collect_keys( + tree: &AnyTree, + seqno: u64, + eph: Option<(Arc, u64)>, +) -> lsm_tree::Result>> { + let mut keys = Vec::new(); + for item in tree.iter(seqno, eph) { + keys.push(item.key()?.to_vec()); + } + Ok(keys) +} + +/// Collect keys from a reverse iterator. +fn collect_keys_rev( + tree: &AnyTree, + seqno: u64, + eph: Option<(Arc, u64)>, +) -> lsm_tree::Result>> { + let mut keys = Vec::new(); + for item in tree.iter(seqno, eph).rev() { + keys.push(item.key()?.to_vec()); + } + Ok(keys) +} + +/// Collect keys from a range iterator. +fn collect_range_keys( + tree: &AnyTree, + range: R, + seqno: u64, + eph: Option<(Arc, u64)>, +) -> lsm_tree::Result>> +where + R: std::ops::RangeBounds<&'static str>, +{ + let mut keys = Vec::new(); + for item in tree.range(range, seqno, eph) { + keys.push(item.key()?.to_vec()); + } + Ok(keys) +} + +/// Collect keys from a prefix iterator. +fn collect_prefix_keys( + tree: &AnyTree, + prefix: &str, + seqno: u64, + eph: Option<(Arc, u64)>, +) -> lsm_tree::Result>> { + let mut keys = Vec::new(); + for item in tree.prefix(prefix, seqno, eph) { + keys.push(item.key()?.to_vec()); + } + Ok(keys) +} + +// ───────────────────────────────────────────────────────────────────────── +// Over-suppress: eph_seqno < outer_seqno +// An ephemeral RT at seqno X where eph_seqno <= X < outer_seqno would be +// visible at outer_seqno but NOT at eph_seqno. Without per-source cutoff, +// it would incorrectly suppress base-tree keys. +// ───────────────────────────────────────────────────────────────────────── + +#[test] +fn ephemeral_rt_not_visible_at_eph_seqno_does_not_suppress_base_keys() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Base tree: keys a..d at seqno 1 + tree.insert("a", "v", 1); + tree.insert("b", "v", 1); + tree.insert("c", "v", 1); + tree.insert("d", "v", 1); + + // Ephemeral: RT [b, d) at seqno 15. + // eph_seqno = 10 → RT NOT visible (15 >= 10). + // outer_seqno = 20 → RT IS visible (15 < 20). + // Without fix: RT would suppress b,c from base tree. + // With fix: RT uses eph_seqno=10 as cutoff → invisible → no suppression. + let eph = build_ephemeral(&[], &[(b"b", b"d", 15)]); + + let keys = collect_keys(&tree, 20, Some((eph.clone(), 10)))?; + assert_eq!(keys, vec![b"a", b"b", b"c", b"d"]); + + // Same check in reverse + let keys_rev = collect_keys_rev(&tree, 20, Some((eph, 10)))?; + assert_eq!(keys_rev, vec![b"d", b"c", b"b", b"a"]); + + Ok(()) +} + +#[test] +fn ephemeral_rt_not_visible_at_eph_seqno_range_query() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "v", 1); + tree.insert("b", "v", 1); + tree.insert("c", "v", 1); + + // Ephemeral RT [a, d) at seqno 15, eph_seqno=10, outer_seqno=20. + let eph = build_ephemeral(&[], &[(b"a", b"d", 15)]); + + let keys = collect_range_keys(&tree, "a"..="c", 20, Some((eph, 10)))?; + assert_eq!(keys, vec![b"a", b"b", b"c"]); + + Ok(()) +} + +#[test] +fn ephemeral_rt_not_visible_at_eph_seqno_prefix_query() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("pre:a", "v", 1); + tree.insert("pre:b", "v", 1); + tree.insert("pre:c", "v", 1); + + // Ephemeral RT [pre:a, pre:d) at seqno 15, eph_seqno=10, outer_seqno=20. + let eph = build_ephemeral(&[], &[(b"pre:a", b"pre:d", 15)]); + + let keys = collect_prefix_keys(&tree, "pre:", 20, Some((eph, 10)))?; + assert_eq!(keys, vec![b"pre:a", b"pre:b", b"pre:c"]); + + Ok(()) +} + +// ───────────────────────────────────────────────────────────────────────── +// Leak: eph_seqno > outer_seqno +// An ephemeral RT at seqno X where outer_seqno <= X < eph_seqno is visible +// at eph_seqno but NOT at outer_seqno. The RT should still suppress +// ephemeral KVs that entered the merged stream. +// ───────────────────────────────────────────────────────────────────────── + +#[test] +fn ephemeral_rt_visible_at_eph_seqno_suppresses_ephemeral_kvs() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Base tree: key "a" at seqno 1 + tree.insert("a", "base", 1); + + // Ephemeral: KVs b,c at seqno 5; RT [b, d) at seqno 8. + // eph_seqno = 10 → RT visible (8 < 10). KVs visible (5 < 10). + // outer_seqno = 6 → RT NOT visible with outer cutoff (8 >= 6). + // Without fix: RT uses outer_seqno=6 as cutoff → invisible → b,c leak. + // With fix: RT uses eph_seqno=10 as cutoff → visible → b,c suppressed. + let eph = build_ephemeral(&[(b"b", b"vb", 5), (b"c", b"vc", 5)], &[(b"b", b"d", 8)]); + + let keys = collect_keys(&tree, 6, Some((eph.clone(), 10)))?; + // "a" from base (seqno 1 < outer_seqno 6), b and c suppressed by eph RT + assert_eq!(keys, vec![b"a"]); + + // Reverse + let keys_rev = collect_keys_rev(&tree, 6, Some((eph, 10)))?; + assert_eq!(keys_rev, vec![b"a"]); + + Ok(()) +} + +// ───────────────────────────────────────────────────────────────────────── +// Normal case: eph_seqno == outer_seqno (no divergence) +// Sanity check that the per-source cutoff doesn't break the common case. +// ───────────────────────────────────────────────────────────────────────── + +#[test] +fn ephemeral_rt_same_seqno_still_suppresses() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "v", 1); + tree.insert("b", "v", 1); + tree.insert("c", "v", 1); + + // Ephemeral RT [a, c) at seqno 5, both seqnos = 10. + let eph = build_ephemeral(&[], &[(b"a", b"c", 5)]); + + let keys = collect_keys(&tree, 10, Some((eph.clone(), 10)))?; + assert_eq!(keys, vec![b"c"]); + + let keys_rev = collect_keys_rev(&tree, 10, Some((eph, 10)))?; + assert_eq!(keys_rev, vec![b"c"]); + + Ok(()) +} + +// ───────────────────────────────────────────────────────────────────────── +// Duplicate RT from two sources with different cutoffs +// The same RT may exist in both a persisted SST and the ephemeral memtable. +// Dedup must preserve the higher cutoff so the RT remains visible when at +// least one source's snapshot includes it. +// ───────────────────────────────────────────────────────────────────────── + +#[test] +fn duplicate_rt_from_two_sources_keeps_max_cutoff() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + // Base tree: keys a,b,c at seqno 1; RT [a, d) at seqno 5 in memtable + tree.insert("a", "v", 1); + tree.insert("b", "v", 1); + tree.insert("c", "v", 1); + tree.remove_range("a", "d", 5); + + // Flush to SST so the RT is persisted (cutoff will be outer_seqno=4) + tree.flush_active_memtable(0)?; + + // Ephemeral: same RT [a, d) at seqno 5 (cutoff will be eph_seqno=10) + // outer_seqno = 4 → persisted RT NOT visible (5 >= 4) + // eph_seqno = 10 → ephemeral RT IS visible (5 < 10) + // If dedup drops the ephemeral copy, the RT becomes invisible and a,b,c leak. + let eph = build_ephemeral(&[], &[(b"a", b"d", 5)]); + + let keys = collect_keys(&tree, 4, Some((eph.clone(), 10)))?; + // a,b,c must be suppressed — the ephemeral copy's cutoff=10 makes the RT visible + assert_eq!(keys, Vec::>::new()); + + // Reverse + let keys_rev = collect_keys_rev(&tree, 4, Some((eph, 10)))?; + assert_eq!(keys_rev, Vec::>::new()); + + Ok(()) +} + +// ───────────────────────────────────────────────────────────────────────── +// Base-tree RT should not be affected by ephemeral seqno +// ───────────────────────────────────────────────────────────────────────── + +#[test] +fn base_rt_uses_outer_seqno_not_ephemeral() -> lsm_tree::Result<()> { + let folder = get_tmp_folder(); + let tree = open_tree(folder.path()); + + tree.insert("a", "v", 1); + tree.insert("b", "v", 1); + tree.insert("c", "v", 1); + + // Base-tree RT [a, c) at seqno 5 + tree.remove_range("a", "c", 5); + + // Ephemeral: just KV "x" — no RTs. + // eph_seqno = 3, outer_seqno = 10. + // Base RT should use outer_seqno=10 → visible (5 < 10) → suppresses a,b. + let eph = build_ephemeral(&[(b"x", b"vx", 1)], &[]); + + let keys = collect_keys(&tree, 10, Some((eph, 3)))?; + assert_eq!(keys, vec![b"c", b"x"]); + + Ok(()) +}