Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
5ac4925
fix: thread UserComparator through Run, KeyRange, and Version methods
polaz Mar 22, 2026
6d97a83
fix: use comparator-aware RunReader in point-read merge path
polaz Mar 22, 2026
735cfbf
fix: use monotonic seqnos in leveled compaction test
polaz Mar 22, 2026
1827f0e
fix: resolve CI lint errors
polaz Mar 22, 2026
a70a468
fix: use comparator-aware containment in drop_range strategy
polaz Mar 22, 2026
2fa21bf
fix(compaction): allow multi-run levels in L1+ debug assert
polaz Mar 23, 2026
1d0c672
fix(run_reader): keep RunReader::new in public API, delegate to new_cmp
polaz Mar 23, 2026
9f55dfe
fix(compaction): skip L1+ pick when levels are multi-run
polaz Mar 23, 2026
f13e771
fix(compaction): restore relaxed debug_assert for multi-run levels
polaz Mar 23, 2026
7d55651
fix(run_reader): gate dead_code expect behind cfg(not(test))
polaz Mar 23, 2026
fa8110e
fix(run_reader): qualify doc link and correct dead_code reason
polaz Mar 23, 2026
27694a1
refactor(run): extract shared trim_slice helper from get_contained
polaz Mar 23, 2026
c9d440b
docs(key_range,leveled): correct contains_range_cmp reference and exp…
polaz Mar 23, 2026
e9d4c74
refactor(compaction): replace debug_assert with log::debug for multi-…
polaz Mar 23, 2026
05e1efb
docs(version): clarify recovery preserves comparator-sorted run order
polaz Mar 23, 2026
7901dad
docs(test): clarify ignore reason for reverse comparator range scans
polaz Mar 23, 2026
b01a22c
test(run): add unit test for get_contained_cmp with reverse comparator
polaz Mar 23, 2026
d3c9868
docs(compaction): link multi-run picker concern to #122 Part 3
polaz Mar 23, 2026
2f25a1c
perf(compaction): make pick_minimal_compaction multi-run aware
polaz Mar 23, 2026
8cef990
fix(test): use comparator-ordered key ranges in reverse cmp fixture
polaz Mar 23, 2026
1766f1d
fix(compaction): scope take_while per-run in multi-run picker
polaz Mar 23, 2026
2c1feb9
test(comparator): add multi-table run range scan with u64 comparator
polaz Mar 23, 2026
9dfceab
docs(run): clarify trim_slice returns span between first and last match
polaz Mar 23, 2026
9eb1234
docs(table): clarify aggregate_run_key_range is comparator-agnostic
polaz Mar 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 14 additions & 8 deletions src/compaction/drop_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ impl RangeBounds<Slice> for OwnedBounds {
}

impl OwnedBounds {
/// Returns `true` if the key range is fully contained in these bounds,
/// using the given comparator for key ordering.
#[must_use]
pub fn contains(&self, range: &KeyRange) -> bool {
pub fn contains(&self, range: &KeyRange, cmp: &dyn crate::comparator::UserComparator) -> bool {
use std::cmp::Ordering;

let lower_ok = match &self.start {
Bound::Unbounded => true,
Bound::Included(key) => key.as_ref() <= range.min().as_ref(),
Bound::Excluded(key) => key.as_ref() < range.min().as_ref(),
Bound::Included(key) => cmp.compare(key.as_ref(), range.min()) != Ordering::Greater,
Bound::Excluded(key) => cmp.compare(key.as_ref(), range.min()) == Ordering::Less,
};

if !lower_ok {
Expand All @@ -48,8 +52,8 @@ impl OwnedBounds {

match &self.end {
Bound::Unbounded => true,
Bound::Included(key) => key.as_ref() >= range.max().as_ref(),
Bound::Excluded(key) => key.as_ref() > range.max().as_ref(),
Bound::Included(key) => cmp.compare(key.as_ref(), range.max()) != Ordering::Less,
Bound::Excluded(key) => cmp.compare(key.as_ref(), range.max()) == Ordering::Greater,
}
}
}
Expand All @@ -72,16 +76,18 @@ impl CompactionStrategy for Strategy {
"DropRangeCompaction"
}

fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
fn choose(&self, version: &Version, config: &Config, state: &CompactionState) -> Choice {
let cmp = config.comparator.as_ref();

let table_ids: HashSet<_> = version
.iter_levels()
.flat_map(|lvl| lvl.iter())
.flat_map(|run| {
run.range_overlap_indexes(&self.bounds)
run.range_overlap_indexes_cmp(&self.bounds, cmp)
.and_then(|(lo, hi)| run.get(lo..=hi))
.unwrap_or_default()
.iter()
.filter(|x| self.bounds.contains(x.key_range()))
.filter(|x| self.bounds.contains(x.key_range(), cmp))
})
.map(Table::id)
.collect();
Expand Down
175 changes: 87 additions & 88 deletions src/compaction/leveled/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,100 +11,102 @@ use crate::{
config::Config,
slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
table::{util::aggregate_run_key_range, Table},
version::{run::Ranged, Run, Version},
version::{run::Ranged, Level, Version},
HashSet, TableId,
};

/// Tries to find the most optimal compaction set from one level into the other.
///
/// Scans all runs in both levels to handle transient multi-run states from
/// multi-level compaction (#108). See #122 Part 3.
fn pick_minimal_compaction(
curr_run: &Run<Table>,
next_run: Option<&Run<Table>>,
curr_level: &Level,
next_level: &Level,
hidden_set: &HiddenSet,
_overshoot: u64,
table_base_size: u64,
cmp: &dyn crate::comparator::UserComparator,
) -> Option<(HashSet<TableId>, bool)> {
// NOTE: Find largest trivial move (if it exists)
if let Some(window) = curr_run.shrinking_windows().find(|window| {
if hidden_set.is_blocked(window.iter().map(Table::id)) {
// IMPORTANT: Compaction is blocked because of other
// on-going compaction
return false;
}
// Check all runs in curr_level for a window that doesn't overlap ANY run
// in next_level.
for curr_run in curr_level.iter() {
if let Some(window) = curr_run.shrinking_windows().find(|window| {
if hidden_set.is_blocked(window.iter().map(Table::id)) {
return false;
}

let Some(next_run) = &next_run else {
// No run in next level, so we can trivially move
return true;
};
if next_level.is_empty() {
return true;
}

let key_range = aggregate_run_key_range(window);
let key_range = aggregate_run_key_range(window);

next_run.get_overlapping(&key_range).is_empty()
}) {
let ids = window.iter().map(Table::id).collect();
return Some((ids, true));
// Must not overlap ANY run in the next level
next_level
.iter()
.all(|run| run.get_overlapping_cmp(&key_range, cmp).is_empty())
Comment thread
polaz marked this conversation as resolved.
}) {
let ids = window.iter().map(Table::id).collect();
return Some((ids, true));
}
}

// NOTE: Look for merges
if let Some(next_run) = &next_run {
next_run
.growing_windows()
.take_while(|window| {
// Cap at 50x tables per compaction for now
//
// At this point, all compactions are too large anyway
// so we can escape early
let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
next_level_size <= (50 * table_base_size)
})
.filter_map(|window| {
if hidden_set.is_blocked(window.iter().map(Table::id)) {
// IMPORTANT: Compaction is blocked because of other
// on-going compaction
return None;
}

let key_range = aggregate_run_key_range(window);

// Pull in all contained tables in current level into compaction
let curr_level_pull_in = curr_run.get_contained(&key_range);

let curr_level_size = curr_level_pull_in.iter().map(Table::file_size).sum::<u64>();
// Iterate windows across all runs in next_level, pull in from all runs
// in curr_level.
if next_level.is_empty() {
return None;
}

if curr_level_size == 0 {
return None;
}
next_level
.iter()
.flat_map(|run| {
// Cap per-run windows at 50x table_base_size. take_while is safe
// here because growing_windows within a single run are monotonically
// increasing in size — once one exceeds the cap, all subsequent will too.
run.growing_windows().take_while(|window| {
let size = window.iter().map(Table::file_size).sum::<u64>();
size <= (50 * table_base_size)
})
})
Comment thread
polaz marked this conversation as resolved.
.filter_map(|window| {
if hidden_set.is_blocked(window.iter().map(Table::id)) {
return None;
}

// TODO: toggling this statement can deadlock compactions because if there are only larger-than-overshoot
// compactions, they would not be chosen
// if curr_level_size < overshoot {
// return None;
// }
let key_range = aggregate_run_key_range(window);
Comment thread
polaz marked this conversation as resolved.

if hidden_set.is_blocked(curr_level_pull_in.iter().map(Table::id)) {
// IMPORTANT: Compaction is blocked because of other
// on-going compaction
return None;
}
// Pull in contained tables from ALL runs in curr_level
let curr_level_pull_in: Vec<&Table> = curr_level
.iter()
.flat_map(|run| run.get_contained_cmp(&key_range, cmp))
.collect();

let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
let curr_level_size = curr_level_pull_in
.iter()
.map(|t| Table::file_size(t))
.sum::<u64>();

let compaction_bytes = curr_level_size + next_level_size;
if curr_level_size == 0 {
return None;
}

#[expect(clippy::cast_precision_loss)]
let write_amp = (next_level_size as f32) / (curr_level_size as f32);
if hidden_set.is_blocked(curr_level_pull_in.iter().map(|t| Table::id(t))) {
return None;
}

Some((window, curr_level_pull_in, write_amp, compaction_bytes))
})
// Find the compaction with the smallest write set
.min_by_key(|(_, _, _waf, bytes)| *bytes)
.map(|(window, curr_level_pull_in, _, _)| {
let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
ids.extend(curr_level_pull_in.iter().map(Table::id));
(ids, false)
})
} else {
None
}
let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
let compaction_bytes = curr_level_size + next_level_size;

Some((window, curr_level_pull_in, compaction_bytes))
})
.min_by_key(|(_, _, bytes)| *bytes)
.map(|(window, curr_level_pull_in, _)| {
let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
ids.extend(curr_level_pull_in.iter().map(|t| Table::id(t)));
(ids, false)
})
}

#[doc(hidden)]
Expand Down Expand Up @@ -456,8 +458,9 @@ impl CompactionStrategy for Strategy {
}

#[expect(clippy::too_many_lines)]
fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
fn choose(&self, version: &Version, config: &Config, state: &CompactionState) -> Choice {
assert!(version.level_count() == 7, "should have exactly 7 levels");
let cmp = config.comparator.as_ref();

// Trivial move into Lmax
'trivial_lmax: {
Expand Down Expand Up @@ -489,8 +492,8 @@ impl CompactionStrategy for Strategy {
let lmax = version.level(lmax_index).expect("last level should exist");

if !lmax
.aggregate_key_range()
.overlaps_with_key_range(&l0.aggregate_key_range())
.aggregate_key_range_cmp(cmp)
.overlaps_with_key_range_cmp(&l0.aggregate_key_range_cmp(cmp), cmp)
{
return Choice::Move(CompactionInput {
table_ids: l0.list_ids(),
Expand Down Expand Up @@ -573,12 +576,12 @@ impl CompactionStrategy for Strategy {
break 'trivial;
}

let key_range = first_level.aggregate_key_range();
let key_range = first_level.aggregate_key_range_cmp(cmp);

// Get overlapping tables in next level
let get_overlapping = target_level
.iter()
.flat_map(|run| run.get_overlapping(&key_range))
.flat_map(|run| run.get_overlapping_cmp(&key_range, cmp))
.map(Table::id)
.next();

Expand Down Expand Up @@ -719,12 +722,12 @@ impl CompactionStrategy for Strategy {

let mut table_ids = first_level.list_ids();

let key_range = first_level.aggregate_key_range();
let key_range = first_level.aggregate_key_range_cmp(cmp);

// Get overlapping tables in next level
let target_level_overlapping_table_ids: Vec<_> = target_level
.iter()
.flat_map(|run| run.get_overlapping(&key_range))
.flat_map(|run| run.get_overlapping_cmp(&key_range, cmp))
.map(Table::id)
.collect();

Expand Down Expand Up @@ -762,7 +765,7 @@ impl CompactionStrategy for Strategy {
for run in l2.iter() {
for input_run in target_level.iter().chain(first_level.iter()) {
for t in input_run.iter() {
for l2t in run.get_overlapping(t.key_range()) {
for l2t in run.get_overlapping_cmp(t.key_range(), cmp) {
table_ids.insert(Table::id(l2t));
}
}
Expand Down Expand Up @@ -816,19 +819,15 @@ impl CompactionStrategy for Strategy {
return Choice::DoNothing;
};

debug_assert!(level.is_disjoint(), "level should be disjoint");
debug_assert!(next_level.is_disjoint(), "next level should be disjoint");

#[expect(
clippy::expect_used,
reason = "first run should exist because score is >0.0"
)]
// pick_minimal_compaction scans all runs in both levels, handling
// transient multi-run states from multi-level compaction (#108, #122).
let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
level.first_run().expect("should have exactly one run"),
next_level.first_run().map(std::ops::Deref::deref),
level,
next_level,
state.hidden_set(),
overshoot_bytes,
self.target_size,
cmp,
Comment thread
polaz marked this conversation as resolved.
Comment thread
polaz marked this conversation as resolved.
) else {
return Choice::DoNothing;
};
Expand Down
46 changes: 43 additions & 3 deletions src/key_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ impl KeyRange {

/// Returns `true` if the key falls within this key range.
///
/// Uses lexicographic ordering. See [`overlaps_with_key_range_cmp`] for
/// custom comparator support; comparable `_cmp` variants for `contains_key`
/// and `contains_range` can be added when needed (#116).
/// Uses lexicographic ordering. See [`overlaps_with_key_range_cmp`] and
/// [`contains_range_cmp`] for custom comparator support; a `contains_key_cmp`
/// variant can be added when needed (#116).
#[must_use]
pub fn contains_key(&self, key: &[u8]) -> bool {
let (start, end) = self.as_tuple();
Expand All @@ -71,6 +71,19 @@ impl KeyRange {
start1 <= start2 && end1 >= end2
}

/// Like [`contains_range`], but uses a custom comparator for key ordering.
#[must_use]
pub fn contains_range_cmp(
&self,
other: &Self,
cmp: &dyn crate::comparator::UserComparator,
) -> bool {
let (start1, end1) = self.as_tuple();
let (start2, end2) = other.as_tuple();
cmp.compare(start1, start2) != std::cmp::Ordering::Greater
&& cmp.compare(end1, end2) != std::cmp::Ordering::Less
}
Comment thread
polaz marked this conversation as resolved.

/// Returns `true` if the `other` overlaps at least partially with this range.
#[must_use]
pub fn overlaps_with_key_range(&self, other: &Self) -> bool {
Expand Down Expand Up @@ -156,6 +169,33 @@ impl KeyRange {

Self(min.clone(), max.clone())
}

/// Like [`aggregate`], but uses a custom comparator for key ordering.
pub fn aggregate_cmp<'a>(
mut iter: impl Iterator<Item = &'a Self>,
cmp: &dyn crate::comparator::UserComparator,
) -> Self {
let Some(first) = iter.next() else {
return Self::empty();
};

let mut min = first.min();
let mut max = first.max();

for other in iter {
let x = other.min();
if cmp.compare(x, min) == std::cmp::Ordering::Less {
min = x;
}

let x = other.max();
if cmp.compare(x, max) == std::cmp::Ordering::Greater {
max = x;
}
}

Self(min.clone(), max.clone())
}
}

#[cfg(test)]
Expand Down
Loading
Loading