Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 87 additions & 64 deletions src/active_tombstone_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,29 @@ use std::collections::{BTreeMap, BinaryHeap};
pub struct ActiveTombstoneSet {
seqno_counts: BTreeMap<SeqNo, u32>,
pending_expiry: BinaryHeap<Reverse<(UserKey, u64, SeqNo)>>,
cutoff_seqno: SeqNo,
next_id: u64,
}

impl ActiveTombstoneSet {
/// Creates a new forward active tombstone set.
///
/// Only tombstones with `seqno < cutoff_seqno` will be activated.
#[must_use]
pub fn new(cutoff_seqno: SeqNo) -> Self {
pub fn new() -> Self {
Self {
Comment thread
polaz marked this conversation as resolved.
seqno_counts: BTreeMap::new(),
pending_expiry: BinaryHeap::new(),
cutoff_seqno,
next_id: 0,
}
}

/// Activates a range tombstone, adding it to the active set.
///
/// The tombstone is only activated if it is visible at the cutoff seqno
/// (i.e., `rt.seqno < cutoff_seqno`). Duplicate activations (same seqno
/// from different sources) are handled correctly via multiset accounting.
pub fn activate(&mut self, rt: &RangeTombstone) {
if !rt.visible_at(self.cutoff_seqno) {
/// The tombstone is only activated if it is visible at `cutoff_seqno`
/// (i.e., `rt.seqno < cutoff_seqno`). Each source may supply a different
/// cutoff (e.g., ephemeral memtable uses its own `index_seqno`).
/// Duplicate activations (same seqno from different sources) are handled
/// correctly via multiset accounting.
pub fn activate(&mut self, rt: &RangeTombstone, cutoff_seqno: SeqNo) {
if !rt.visible_at(cutoff_seqno) {
return;
}
let id = self.next_id;
Expand Down Expand Up @@ -112,17 +110,20 @@ impl ActiveTombstoneSet {
/// # Invariant
///
/// At any iterator position, the active set contains only tombstones
/// where `start <= current_key < end` (and visible at `cutoff_seqno`).
/// Seek prefill must collect truly overlapping tombstones
/// where `start <= current_key < end` (and visible at their respective
/// `cutoff_seqno`). Seek prefill must collect truly overlapping tombstones
/// (`start <= key < end`); `expire_until` immediately enforces the
/// `end` bound.
#[cfg_attr(
not(test),
expect(dead_code, reason = "used by iterator initialization logic")
)]
pub fn initialize_from(&mut self, tombstones: impl IntoIterator<Item = RangeTombstone>) {
for rt in tombstones {
self.activate(&rt);
pub fn initialize_from(
&mut self,
tombstones: impl IntoIterator<Item = (RangeTombstone, SeqNo)>,
) {
for (rt, cutoff) in tombstones {
self.activate(&rt, cutoff);
}
}

Expand All @@ -148,35 +149,33 @@ impl ActiveTombstoneSet {
pub struct ActiveTombstoneSetReverse {
seqno_counts: BTreeMap<SeqNo, u32>,
pending_expiry: BinaryHeap<(UserKey, u64, SeqNo)>,
cutoff_seqno: SeqNo,
next_id: u64,
}

impl ActiveTombstoneSetReverse {
/// Creates a new reverse active tombstone set.
///
/// Only tombstones with `seqno < cutoff_seqno` will be activated.
#[must_use]
pub fn new(cutoff_seqno: SeqNo) -> Self {
pub fn new() -> Self {
Self {
seqno_counts: BTreeMap::new(),
pending_expiry: BinaryHeap::new(),
cutoff_seqno,
next_id: 0,
}
}

/// Activates a range tombstone, adding it to the active set.
///
/// The tombstone is only activated if it is visible at the cutoff seqno
/// (i.e., `rt.seqno < cutoff_seqno`). Duplicate activations (same seqno
/// from different sources) are handled correctly via multiset accounting.
/// The tombstone is only activated if it is visible at `cutoff_seqno`
/// (i.e., `rt.seqno < cutoff_seqno`). Each source may supply a different
/// cutoff (e.g., ephemeral memtable uses its own `index_seqno`).
/// Duplicate activations (same seqno from different sources) are handled
/// correctly via multiset accounting.
///
/// For reverse iteration, activation uses strict `>`: tombstones with
/// `rt.end > current_key` are activated. `key == end` is NOT covered
/// (half-open).
pub fn activate(&mut self, rt: &RangeTombstone) {
if !rt.visible_at(self.cutoff_seqno) {
pub fn activate(&mut self, rt: &RangeTombstone, cutoff_seqno: SeqNo) {
if !rt.visible_at(cutoff_seqno) {
return;
}
let id = self.next_id;
Expand Down Expand Up @@ -236,9 +235,12 @@ impl ActiveTombstoneSetReverse {
not(test),
expect(dead_code, reason = "used by iterator initialization logic")
)]
pub fn initialize_from(&mut self, tombstones: impl IntoIterator<Item = RangeTombstone>) {
for rt in tombstones {
self.activate(&rt);
pub fn initialize_from(
&mut self,
tombstones: impl IntoIterator<Item = (RangeTombstone, SeqNo)>,
) {
for (rt, cutoff) in tombstones {
self.activate(&rt, cutoff);
}
}

Expand Down Expand Up @@ -266,60 +268,60 @@ mod tests {

#[test]
fn forward_activate_and_suppress() {
let mut set = ActiveTombstoneSet::new(100);
set.activate(&rt(b"a", b"m", 10));
let mut set = ActiveTombstoneSet::new();
set.activate(&rt(b"a", b"m", 10), 100);
assert!(set.is_suppressed(5));
assert!(!set.is_suppressed(10));
assert!(!set.is_suppressed(15));
}

#[test]
fn forward_expire_at_end() {
let mut set = ActiveTombstoneSet::new(100);
set.activate(&rt(b"a", b"m", 10));
let mut set = ActiveTombstoneSet::new();
set.activate(&rt(b"a", b"m", 10), 100);
assert!(set.is_suppressed(5));
set.expire_until(b"m"); // key == end, tombstone expires
assert!(!set.is_suppressed(5));
}

#[test]
fn forward_expire_past_end() {
let mut set = ActiveTombstoneSet::new(100);
set.activate(&rt(b"a", b"m", 10));
let mut set = ActiveTombstoneSet::new();
set.activate(&rt(b"a", b"m", 10), 100);
set.expire_until(b"z");
assert!(set.is_empty());
}

#[test]
fn forward_not_expired_before_end() {
let mut set = ActiveTombstoneSet::new(100);
set.activate(&rt(b"a", b"m", 10));
let mut set = ActiveTombstoneSet::new();
set.activate(&rt(b"a", b"m", 10), 100);
set.expire_until(b"l");
assert!(set.is_suppressed(5)); // still active
}

#[test]
fn forward_invisible_tombstone_not_activated() {
let mut set = ActiveTombstoneSet::new(5);
set.activate(&rt(b"a", b"m", 10)); // seqno 10 > cutoff 5
let mut set = ActiveTombstoneSet::new();
set.activate(&rt(b"a", b"m", 10), 5); // seqno 10 > cutoff 5
assert!(!set.is_suppressed(1));
assert!(set.is_empty());
}

#[test]
fn forward_multiple_tombstones_max_seqno() {
let mut set = ActiveTombstoneSet::new(100);
set.activate(&rt(b"a", b"m", 10));
set.activate(&rt(b"b", b"n", 20));
let mut set = ActiveTombstoneSet::new();
set.activate(&rt(b"a", b"m", 10), 100);
set.activate(&rt(b"b", b"n", 20), 100);
assert_eq!(set.max_active_seqno(), Some(20));
assert!(set.is_suppressed(15)); // 15 < 20
}

#[test]
fn forward_duplicate_end_seqno_accounting() {
let mut set = ActiveTombstoneSet::new(100);
set.activate(&rt(b"a", b"m", 10));
set.activate(&rt(b"b", b"m", 10));
let mut set = ActiveTombstoneSet::new();
set.activate(&rt(b"a", b"m", 10), 100);
set.activate(&rt(b"b", b"m", 10), 100);
assert_eq!(set.max_active_seqno(), Some(10));

set.expire_until(b"m");
Expand All @@ -329,68 +331,79 @@ mod tests {

#[test]
fn forward_initialize_from() {
let mut set = ActiveTombstoneSet::new(100);
set.initialize_from(vec![rt(b"a", b"m", 10), rt(b"b", b"z", 20)]);
let mut set = ActiveTombstoneSet::new();
set.initialize_from(vec![(rt(b"a", b"m", 10), 100), (rt(b"b", b"z", 20), 100)]);
assert_eq!(set.max_active_seqno(), Some(20));
}

#[test]
fn forward_initialize_and_expire() {
let mut set = ActiveTombstoneSet::new(100);
set.initialize_from(vec![rt(b"a", b"d", 10), rt(b"b", b"f", 20)]);
let mut set = ActiveTombstoneSet::new();
set.initialize_from(vec![(rt(b"a", b"d", 10), 100), (rt(b"b", b"f", 20), 100)]);
set.expire_until(b"e"); // expires [a,d) but not [b,f)
assert_eq!(set.max_active_seqno(), Some(20));
set.expire_until(b"f"); // expires [b,f)
assert!(set.is_empty());
}

#[test]
fn forward_per_source_cutoff_mixed() {
let mut set = ActiveTombstoneSet::new();
// RT from source with cutoff 15 — visible (10 < 15)
set.activate(&rt(b"a", b"m", 10), 15);
// RT from source with cutoff 5 — NOT visible (10 >= 5)
set.activate(&rt(b"a", b"z", 10), 5);
assert_eq!(set.max_active_seqno(), Some(10));
// Only one RT activated, not two
Comment thread
polaz marked this conversation as resolved.
Outdated
}

// ──── Reverse tests ────

#[test]
fn reverse_activate_and_suppress() {
let mut set = ActiveTombstoneSetReverse::new(100);
set.activate(&rt(b"a", b"m", 10));
let mut set = ActiveTombstoneSetReverse::new();
set.activate(&rt(b"a", b"m", 10), 100);
assert!(set.is_suppressed(5));
assert!(!set.is_suppressed(10));
}

#[test]
fn reverse_expire_before_start() {
let mut set = ActiveTombstoneSetReverse::new(100);
set.activate(&rt(b"d", b"m", 10));
let mut set = ActiveTombstoneSetReverse::new();
set.activate(&rt(b"d", b"m", 10), 100);

set.expire_until(b"c");
assert!(set.is_empty());
}

#[test]
fn reverse_initialize_from() {
let mut set = ActiveTombstoneSetReverse::new(100);
set.initialize_from(vec![rt(b"a", b"m", 10), rt(b"b", b"z", 20)]);
let mut set = ActiveTombstoneSetReverse::new();
set.initialize_from(vec![(rt(b"a", b"m", 10), 100), (rt(b"b", b"z", 20), 100)]);
assert_eq!(set.max_active_seqno(), Some(20));
}

#[test]
fn reverse_not_expired_at_start() {
let mut set = ActiveTombstoneSetReverse::new(100);
set.activate(&rt(b"d", b"m", 10));
let mut set = ActiveTombstoneSetReverse::new();
set.activate(&rt(b"d", b"m", 10), 100);

set.expire_until(b"d");
assert!(set.is_suppressed(5));
}

#[test]
fn reverse_invisible_tombstone_not_activated() {
let mut set = ActiveTombstoneSetReverse::new(5);
set.activate(&rt(b"a", b"m", 10));
let mut set = ActiveTombstoneSetReverse::new();
set.activate(&rt(b"a", b"m", 10), 5);
assert!(set.is_empty());
}

#[test]
fn reverse_duplicate_end_seqno_accounting() {
let mut set = ActiveTombstoneSetReverse::new(100);
set.activate(&rt(b"d", b"m", 10));
set.activate(&rt(b"d", b"n", 10));
let mut set = ActiveTombstoneSetReverse::new();
set.activate(&rt(b"d", b"m", 10), 100);
set.activate(&rt(b"d", b"n", 10), 100);
assert_eq!(set.max_active_seqno(), Some(10));

set.expire_until(b"c");
Expand All @@ -400,12 +413,22 @@ mod tests {

#[test]
fn reverse_multiple_tombstones() {
let mut set = ActiveTombstoneSetReverse::new(100);
set.activate(&rt(b"a", b"m", 10));
set.activate(&rt(b"f", b"z", 20));
let mut set = ActiveTombstoneSetReverse::new();
set.activate(&rt(b"a", b"m", 10), 100);
set.activate(&rt(b"f", b"z", 20), 100);
assert_eq!(set.max_active_seqno(), Some(20));

set.expire_until(b"e");
assert_eq!(set.max_active_seqno(), Some(10));
}

#[test]
fn reverse_per_source_cutoff_mixed() {
let mut set = ActiveTombstoneSetReverse::new();
// RT from source with cutoff 15 — visible (10 < 15)
set.activate(&rt(b"a", b"m", 10), 15);
// RT from source with cutoff 5 — NOT visible (10 >= 5)
set.activate(&rt(b"a", b"z", 10), 5);
assert_eq!(set.max_active_seqno(), Some(10));
Comment thread
polaz marked this conversation as resolved.
Outdated
}
}
Loading
Loading