Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 99 additions & 64 deletions src/active_tombstone_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,31 +26,29 @@ use std::collections::{BTreeMap, BinaryHeap};
pub struct ActiveTombstoneSet {
seqno_counts: BTreeMap<SeqNo, u32>,
pending_expiry: BinaryHeap<Reverse<(UserKey, u64, SeqNo)>>,
cutoff_seqno: SeqNo,
next_id: u64,
}

impl ActiveTombstoneSet {
/// Creates a new forward active tombstone set.
///
/// Only tombstones with `seqno < cutoff_seqno` will be activated.
#[must_use]
pub fn new(cutoff_seqno: SeqNo) -> Self {
pub fn new() -> Self {
Self {
Comment thread
polaz marked this conversation as resolved.
seqno_counts: BTreeMap::new(),
pending_expiry: BinaryHeap::new(),
cutoff_seqno,
next_id: 0,
}
}

/// Activates a range tombstone, adding it to the active set.
///
/// The tombstone is only activated if it is visible at the cutoff seqno
/// (i.e., `rt.seqno < cutoff_seqno`). Duplicate activations (same seqno
/// from different sources) are handled correctly via multiset accounting.
pub fn activate(&mut self, rt: &RangeTombstone) {
if !rt.visible_at(self.cutoff_seqno) {
/// The tombstone is only activated if it is visible at `cutoff_seqno`
/// (i.e., `rt.seqno < cutoff_seqno`). Each source may supply a different
/// cutoff (e.g., ephemeral memtable uses its own `index_seqno`).
/// Duplicate activations (same seqno from different sources) are handled
/// correctly via multiset accounting.
pub fn activate(&mut self, rt: &RangeTombstone, cutoff_seqno: SeqNo) {
if !rt.visible_at(cutoff_seqno) {
return;
}
let id = self.next_id;
Expand Down Expand Up @@ -112,17 +110,20 @@ impl ActiveTombstoneSet {
/// # Invariant
///
/// At any iterator position, the active set contains only tombstones
/// where `start <= current_key < end` (and visible at `cutoff_seqno`).
/// Seek prefill must collect truly overlapping tombstones
/// where `start <= current_key < end` (and visible at their respective
/// `cutoff_seqno`). Seek prefill must collect truly overlapping tombstones
/// (`start <= key < end`); `expire_until` immediately enforces the
/// `end` bound.
#[cfg_attr(
not(test),
expect(dead_code, reason = "used by iterator initialization logic")
)]
pub fn initialize_from(&mut self, tombstones: impl IntoIterator<Item = RangeTombstone>) {
for rt in tombstones {
self.activate(&rt);
pub fn initialize_from(
&mut self,
tombstones: impl IntoIterator<Item = (RangeTombstone, SeqNo)>,
) {
for (rt, cutoff) in tombstones {
self.activate(&rt, cutoff);
}
}

Expand All @@ -148,35 +149,33 @@ impl ActiveTombstoneSet {
pub struct ActiveTombstoneSetReverse {
seqno_counts: BTreeMap<SeqNo, u32>,
pending_expiry: BinaryHeap<(UserKey, u64, SeqNo)>,
cutoff_seqno: SeqNo,
next_id: u64,
}

impl ActiveTombstoneSetReverse {
/// Creates a new reverse active tombstone set.
///
/// Only tombstones with `seqno < cutoff_seqno` will be activated.
#[must_use]
pub fn new(cutoff_seqno: SeqNo) -> Self {
pub fn new() -> Self {
Self {
seqno_counts: BTreeMap::new(),
pending_expiry: BinaryHeap::new(),
cutoff_seqno,
next_id: 0,
}
}

/// Activates a range tombstone, adding it to the active set.
///
/// The tombstone is only activated if it is visible at the cutoff seqno
/// (i.e., `rt.seqno < cutoff_seqno`). Duplicate activations (same seqno
/// from different sources) are handled correctly via multiset accounting.
/// The tombstone is only activated if it is visible at `cutoff_seqno`
/// (i.e., `rt.seqno < cutoff_seqno`). Each source may supply a different
/// cutoff (e.g., ephemeral memtable uses its own `index_seqno`).
/// Duplicate activations (same seqno from different sources) are handled
/// correctly via multiset accounting.
///
/// For reverse iteration, activation uses strict `>`: tombstones with
/// `rt.end > current_key` are activated. `key == end` is NOT covered
/// (half-open).
pub fn activate(&mut self, rt: &RangeTombstone) {
if !rt.visible_at(self.cutoff_seqno) {
pub fn activate(&mut self, rt: &RangeTombstone, cutoff_seqno: SeqNo) {
if !rt.visible_at(cutoff_seqno) {
return;
}
let id = self.next_id;
Expand Down Expand Up @@ -236,9 +235,12 @@ impl ActiveTombstoneSetReverse {
not(test),
expect(dead_code, reason = "used by iterator initialization logic")
)]
pub fn initialize_from(&mut self, tombstones: impl IntoIterator<Item = RangeTombstone>) {
for rt in tombstones {
self.activate(&rt);
pub fn initialize_from(
&mut self,
tombstones: impl IntoIterator<Item = (RangeTombstone, SeqNo)>,
) {
for (rt, cutoff) in tombstones {
self.activate(&rt, cutoff);
}
}

Expand Down Expand Up @@ -266,60 +268,60 @@ mod tests {

#[test]
fn forward_activate_and_suppress() {
let mut set = ActiveTombstoneSet::new(100);
set.activate(&rt(b"a", b"m", 10));
let mut set = ActiveTombstoneSet::new();
set.activate(&rt(b"a", b"m", 10), 100);
assert!(set.is_suppressed(5));
assert!(!set.is_suppressed(10));
assert!(!set.is_suppressed(15));
}

#[test]
fn forward_expire_at_end() {
let mut set = ActiveTombstoneSet::new(100);
set.activate(&rt(b"a", b"m", 10));
let mut set = ActiveTombstoneSet::new();
set.activate(&rt(b"a", b"m", 10), 100);
assert!(set.is_suppressed(5));
set.expire_until(b"m"); // key == end, tombstone expires
assert!(!set.is_suppressed(5));
}

#[test]
fn forward_expire_past_end() {
let mut set = ActiveTombstoneSet::new(100);
set.activate(&rt(b"a", b"m", 10));
let mut set = ActiveTombstoneSet::new();
set.activate(&rt(b"a", b"m", 10), 100);
set.expire_until(b"z");
assert!(set.is_empty());
}

#[test]
fn forward_not_expired_before_end() {
let mut set = ActiveTombstoneSet::new(100);
set.activate(&rt(b"a", b"m", 10));
let mut set = ActiveTombstoneSet::new();
set.activate(&rt(b"a", b"m", 10), 100);
set.expire_until(b"l");
assert!(set.is_suppressed(5)); // still active
}

#[test]
fn forward_invisible_tombstone_not_activated() {
let mut set = ActiveTombstoneSet::new(5);
set.activate(&rt(b"a", b"m", 10)); // seqno 10 > cutoff 5
let mut set = ActiveTombstoneSet::new();
set.activate(&rt(b"a", b"m", 10), 5); // seqno 10 > cutoff 5
assert!(!set.is_suppressed(1));
assert!(set.is_empty());
}

#[test]
fn forward_multiple_tombstones_max_seqno() {
let mut set = ActiveTombstoneSet::new(100);
set.activate(&rt(b"a", b"m", 10));
set.activate(&rt(b"b", b"n", 20));
let mut set = ActiveTombstoneSet::new();
set.activate(&rt(b"a", b"m", 10), 100);
set.activate(&rt(b"b", b"n", 20), 100);
assert_eq!(set.max_active_seqno(), Some(20));
assert!(set.is_suppressed(15)); // 15 < 20
}

#[test]
fn forward_duplicate_end_seqno_accounting() {
let mut set = ActiveTombstoneSet::new(100);
set.activate(&rt(b"a", b"m", 10));
set.activate(&rt(b"b", b"m", 10));
let mut set = ActiveTombstoneSet::new();
set.activate(&rt(b"a", b"m", 10), 100);
set.activate(&rt(b"b", b"m", 10), 100);
assert_eq!(set.max_active_seqno(), Some(10));

set.expire_until(b"m");
Expand All @@ -329,68 +331,84 @@ mod tests {

#[test]
fn forward_initialize_from() {
let mut set = ActiveTombstoneSet::new(100);
set.initialize_from(vec![rt(b"a", b"m", 10), rt(b"b", b"z", 20)]);
let mut set = ActiveTombstoneSet::new();
set.initialize_from(vec![(rt(b"a", b"m", 10), 100), (rt(b"b", b"z", 20), 100)]);
assert_eq!(set.max_active_seqno(), Some(20));
}

#[test]
fn forward_initialize_and_expire() {
let mut set = ActiveTombstoneSet::new(100);
set.initialize_from(vec![rt(b"a", b"d", 10), rt(b"b", b"f", 20)]);
let mut set = ActiveTombstoneSet::new();
set.initialize_from(vec![(rt(b"a", b"d", 10), 100), (rt(b"b", b"f", 20), 100)]);
set.expire_until(b"e"); // expires [a,d) but not [b,f)
assert_eq!(set.max_active_seqno(), Some(20));
set.expire_until(b"f"); // expires [b,f)
assert!(set.is_empty());
}

#[test]
fn forward_mixed_cutoffs_activates_only_visible_rt() {
let mut set = ActiveTombstoneSet::new();
// RT from source with cutoff 15 — visible (10 < 15)
set.activate(&rt(b"a", b"m", 10), 15);
// RT from source with cutoff 5 — NOT visible (10 >= 5)
set.activate(&rt(b"a", b"z", 10), 5);
assert_eq!(set.max_active_seqno(), Some(10));
assert!(!set.is_empty());

// Expire past the first RT's end; the set should now be empty if the
// second RT was never incorrectly activated.
set.expire_until(b"m");
assert!(set.is_empty());
}

// ──── Reverse tests ────

#[test]
fn reverse_activate_and_suppress() {
let mut set = ActiveTombstoneSetReverse::new(100);
set.activate(&rt(b"a", b"m", 10));
let mut set = ActiveTombstoneSetReverse::new();
set.activate(&rt(b"a", b"m", 10), 100);
assert!(set.is_suppressed(5));
assert!(!set.is_suppressed(10));
}

#[test]
fn reverse_expire_before_start() {
let mut set = ActiveTombstoneSetReverse::new(100);
set.activate(&rt(b"d", b"m", 10));
let mut set = ActiveTombstoneSetReverse::new();
set.activate(&rt(b"d", b"m", 10), 100);

set.expire_until(b"c");
assert!(set.is_empty());
}

#[test]
fn reverse_initialize_from() {
let mut set = ActiveTombstoneSetReverse::new(100);
set.initialize_from(vec![rt(b"a", b"m", 10), rt(b"b", b"z", 20)]);
let mut set = ActiveTombstoneSetReverse::new();
set.initialize_from(vec![(rt(b"a", b"m", 10), 100), (rt(b"b", b"z", 20), 100)]);
assert_eq!(set.max_active_seqno(), Some(20));
}

#[test]
fn reverse_not_expired_at_start() {
let mut set = ActiveTombstoneSetReverse::new(100);
set.activate(&rt(b"d", b"m", 10));
let mut set = ActiveTombstoneSetReverse::new();
set.activate(&rt(b"d", b"m", 10), 100);

set.expire_until(b"d");
assert!(set.is_suppressed(5));
}

#[test]
fn reverse_invisible_tombstone_not_activated() {
let mut set = ActiveTombstoneSetReverse::new(5);
set.activate(&rt(b"a", b"m", 10));
let mut set = ActiveTombstoneSetReverse::new();
set.activate(&rt(b"a", b"m", 10), 5);
assert!(set.is_empty());
}

#[test]
fn reverse_duplicate_end_seqno_accounting() {
let mut set = ActiveTombstoneSetReverse::new(100);
set.activate(&rt(b"d", b"m", 10));
set.activate(&rt(b"d", b"n", 10));
let mut set = ActiveTombstoneSetReverse::new();
set.activate(&rt(b"d", b"m", 10), 100);
set.activate(&rt(b"d", b"n", 10), 100);
assert_eq!(set.max_active_seqno(), Some(10));

set.expire_until(b"c");
Expand All @@ -400,12 +418,29 @@ mod tests {

#[test]
fn reverse_multiple_tombstones() {
let mut set = ActiveTombstoneSetReverse::new(100);
set.activate(&rt(b"a", b"m", 10));
set.activate(&rt(b"f", b"z", 20));
let mut set = ActiveTombstoneSetReverse::new();
set.activate(&rt(b"a", b"m", 10), 100);
set.activate(&rt(b"f", b"z", 20), 100);
assert_eq!(set.max_active_seqno(), Some(20));

set.expire_until(b"e");
assert_eq!(set.max_active_seqno(), Some(10));
}

#[test]
fn reverse_mixed_cutoffs_activates_only_visible_rt() {
let mut set = ActiveTombstoneSetReverse::new();
// RT from source with cutoff 15 — visible (10 < 15)
set.activate(&rt(b"n", b"z", 10), 15);
// RT from source with cutoff 5 — NOT visible (10 >= 5)
set.activate(&rt(b"a", b"m", 10), 5);
assert_eq!(set.max_active_seqno(), Some(10));

// Advance expiry past the visible tombstone's start but not the
// invisible one's. If only the visible RT was activated, the set
// should become empty.
set.expire_until(b"l");
assert_eq!(set.max_active_seqno(), None);
assert!(set.is_empty());
}
}
Loading
Loading