diff --git a/src/compaction/drop_range.rs b/src/compaction/drop_range.rs index 4134a195b..2ebe32f7a 100644 --- a/src/compaction/drop_range.rs +++ b/src/compaction/drop_range.rs @@ -34,12 +34,16 @@ impl RangeBounds for OwnedBounds { } impl OwnedBounds { + /// Returns `true` if the key range is fully contained in these bounds, + /// using the given comparator for key ordering. #[must_use] - pub fn contains(&self, range: &KeyRange) -> bool { + pub fn contains(&self, range: &KeyRange, cmp: &dyn crate::comparator::UserComparator) -> bool { + use std::cmp::Ordering; + let lower_ok = match &self.start { Bound::Unbounded => true, - Bound::Included(key) => key.as_ref() <= range.min().as_ref(), - Bound::Excluded(key) => key.as_ref() < range.min().as_ref(), + Bound::Included(key) => cmp.compare(key.as_ref(), range.min()) != Ordering::Greater, + Bound::Excluded(key) => cmp.compare(key.as_ref(), range.min()) == Ordering::Less, }; if !lower_ok { @@ -48,8 +52,8 @@ impl OwnedBounds { match &self.end { Bound::Unbounded => true, - Bound::Included(key) => key.as_ref() >= range.max().as_ref(), - Bound::Excluded(key) => key.as_ref() > range.max().as_ref(), + Bound::Included(key) => cmp.compare(key.as_ref(), range.max()) != Ordering::Less, + Bound::Excluded(key) => cmp.compare(key.as_ref(), range.max()) == Ordering::Greater, } } } @@ -72,16 +76,18 @@ impl CompactionStrategy for Strategy { "DropRangeCompaction" } - fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice { + fn choose(&self, version: &Version, config: &Config, state: &CompactionState) -> Choice { + let cmp = config.comparator.as_ref(); + let table_ids: HashSet<_> = version .iter_levels() .flat_map(|lvl| lvl.iter()) .flat_map(|run| { - run.range_overlap_indexes(&self.bounds) + run.range_overlap_indexes_cmp(&self.bounds, cmp) .and_then(|(lo, hi)| run.get(lo..=hi)) .unwrap_or_default() .iter() - .filter(|x| self.bounds.contains(x.key_range())) + .filter(|x| self.bounds.contains(x.key_range(), cmp)) }) .map(Table::id) .collect(); diff --git a/src/compaction/leveled/mod.rs b/src/compaction/leveled/mod.rs index 8ab5f1e94..fcf1fac10 100644 --- a/src/compaction/leveled/mod.rs +++ b/src/compaction/leveled/mod.rs @@ -11,100 +11,102 @@ use crate::{ config::Config, slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt}, table::{util::aggregate_run_key_range, Table}, - version::{run::Ranged, Run, Version}, + version::{run::Ranged, Level, Version}, HashSet, TableId, }; /// Tries to find the most optimal compaction set from one level into the other. +/// +/// Scans all runs in both levels to handle transient multi-run states from +/// multi-level compaction (#108). See #122 Part 3. fn pick_minimal_compaction( - curr_run: &Run, - next_run: Option<&Run
>, + curr_level: &Level, + next_level: &Level, hidden_set: &HiddenSet, _overshoot: u64, table_base_size: u64, + cmp: &dyn crate::comparator::UserComparator, ) -> Option<(HashSet, bool)> { // NOTE: Find largest trivial move (if it exists) - if let Some(window) = curr_run.shrinking_windows().find(|window| { - if hidden_set.is_blocked(window.iter().map(Table::id)) { - // IMPORTANT: Compaction is blocked because of other - // on-going compaction - return false; - } + // Check all runs in curr_level for a window that doesn't overlap ANY run + // in next_level. + for curr_run in curr_level.iter() { + if let Some(window) = curr_run.shrinking_windows().find(|window| { + if hidden_set.is_blocked(window.iter().map(Table::id)) { + return false; + } - let Some(next_run) = &next_run else { - // No run in next level, so we can trivially move - return true; - }; + if next_level.is_empty() { + return true; + } - let key_range = aggregate_run_key_range(window); + let key_range = aggregate_run_key_range(window); - next_run.get_overlapping(&key_range).is_empty() - }) { - let ids = window.iter().map(Table::id).collect(); - return Some((ids, true)); + // Must not overlap ANY run in the next level + next_level + .iter() + .all(|run| run.get_overlapping_cmp(&key_range, cmp).is_empty()) + }) { + let ids = window.iter().map(Table::id).collect(); + return Some((ids, true)); + } } // NOTE: Look for merges - if let Some(next_run) = &next_run { - next_run - .growing_windows() - .take_while(|window| { - // Cap at 50x tables per compaction for now - // - // At this point, all compactions are too large anyway - // so we can escape early - let next_level_size = window.iter().map(Table::file_size).sum::(); - next_level_size <= (50 * table_base_size) - }) - .filter_map(|window| { - if hidden_set.is_blocked(window.iter().map(Table::id)) { - // IMPORTANT: Compaction is blocked because of other - // on-going compaction - return None; - } - - let key_range = aggregate_run_key_range(window); - - // Pull in all contained tables in current level into compaction - let curr_level_pull_in = curr_run.get_contained(&key_range); - - let curr_level_size = curr_level_pull_in.iter().map(Table::file_size).sum::(); + // Iterate windows across all runs in next_level, pull in from all runs + // in curr_level. + if next_level.is_empty() { + return None; + } - if curr_level_size == 0 { - return None; - } + next_level + .iter() + .flat_map(|run| { + // Cap per-run windows at 50x table_base_size. take_while is safe + // here because growing_windows within a single run are monotonically + // increasing in size — once one exceeds the cap, all subsequent will too. + run.growing_windows().take_while(|window| { + let size = window.iter().map(Table::file_size).sum::(); + size <= (50 * table_base_size) + }) + }) + .filter_map(|window| { + if hidden_set.is_blocked(window.iter().map(Table::id)) { + return None; + } - // TODO: toggling this statement can deadlock compactions because if there are only larger-than-overshoot - // compactions, they would not be chosen - // if curr_level_size < overshoot { - // return None; - // } + let key_range = aggregate_run_key_range(window); - if hidden_set.is_blocked(curr_level_pull_in.iter().map(Table::id)) { - // IMPORTANT: Compaction is blocked because of other - // on-going compaction - return None; - } + // Pull in contained tables from ALL runs in curr_level + let curr_level_pull_in: Vec<&Table> = curr_level + .iter() + .flat_map(|run| run.get_contained_cmp(&key_range, cmp)) + .collect(); - let next_level_size = window.iter().map(Table::file_size).sum::(); + let curr_level_size = curr_level_pull_in + .iter() + .map(|t| Table::file_size(t)) + .sum::(); - let compaction_bytes = curr_level_size + next_level_size; + if curr_level_size == 0 { + return None; + } - #[expect(clippy::cast_precision_loss)] - let write_amp = (next_level_size as f32) / (curr_level_size as f32); + if hidden_set.is_blocked(curr_level_pull_in.iter().map(|t| Table::id(t))) { + return None; + } - Some((window, curr_level_pull_in, write_amp, compaction_bytes)) - }) - // Find the compaction with the smallest write set - .min_by_key(|(_, _, _waf, bytes)| *bytes) - .map(|(window, curr_level_pull_in, _, _)| { - let mut ids: HashSet<_> = window.iter().map(Table::id).collect(); - ids.extend(curr_level_pull_in.iter().map(Table::id)); - (ids, false) - }) - } else { - None - } + let next_level_size = window.iter().map(Table::file_size).sum::(); + let compaction_bytes = curr_level_size + next_level_size; + + Some((window, curr_level_pull_in, compaction_bytes)) + }) + .min_by_key(|(_, _, bytes)| *bytes) + .map(|(window, curr_level_pull_in, _)| { + let mut ids: HashSet<_> = window.iter().map(Table::id).collect(); + ids.extend(curr_level_pull_in.iter().map(|t| Table::id(t))); + (ids, false) + }) } #[doc(hidden)] @@ -456,8 +458,9 @@ impl CompactionStrategy for Strategy { } #[expect(clippy::too_many_lines)] - fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice { + fn choose(&self, version: &Version, config: &Config, state: &CompactionState) -> Choice { assert!(version.level_count() == 7, "should have exactly 7 levels"); + let cmp = config.comparator.as_ref(); // Trivial move into Lmax 'trivial_lmax: { @@ -489,8 +492,8 @@ impl CompactionStrategy for Strategy { let lmax = version.level(lmax_index).expect("last level should exist"); if !lmax - .aggregate_key_range() - .overlaps_with_key_range(&l0.aggregate_key_range()) + .aggregate_key_range_cmp(cmp) + .overlaps_with_key_range_cmp(&l0.aggregate_key_range_cmp(cmp), cmp) { return Choice::Move(CompactionInput { table_ids: l0.list_ids(), @@ -573,12 +576,12 @@ impl CompactionStrategy for Strategy { break 'trivial; } - let key_range = first_level.aggregate_key_range(); + let key_range = first_level.aggregate_key_range_cmp(cmp); // Get overlapping tables in next level let get_overlapping = target_level .iter() - .flat_map(|run| run.get_overlapping(&key_range)) + .flat_map(|run| run.get_overlapping_cmp(&key_range, cmp)) .map(Table::id) .next(); @@ -719,12 +722,12 @@ impl CompactionStrategy for Strategy { let mut table_ids = first_level.list_ids(); - let key_range = first_level.aggregate_key_range(); + let key_range = first_level.aggregate_key_range_cmp(cmp); // Get overlapping tables in next level let target_level_overlapping_table_ids: Vec<_> = target_level .iter() - .flat_map(|run| run.get_overlapping(&key_range)) + .flat_map(|run| run.get_overlapping_cmp(&key_range, cmp)) .map(Table::id) .collect(); @@ -762,7 +765,7 @@ impl CompactionStrategy for Strategy { for run in l2.iter() { for input_run in target_level.iter().chain(first_level.iter()) { for t in input_run.iter() { - for l2t in run.get_overlapping(t.key_range()) { + for l2t in run.get_overlapping_cmp(t.key_range(), cmp) { table_ids.insert(Table::id(l2t)); } } @@ -816,19 +819,15 @@ impl CompactionStrategy for Strategy { return Choice::DoNothing; }; - debug_assert!(level.is_disjoint(), "level should be disjoint"); - debug_assert!(next_level.is_disjoint(), "next level should be disjoint"); - - #[expect( - clippy::expect_used, - reason = "first run should exist because score is >0.0" - )] + // pick_minimal_compaction scans all runs in both levels, handling + // transient multi-run states from multi-level compaction (#108, #122). let Some((table_ids, can_trivial_move)) = pick_minimal_compaction( - level.first_run().expect("should have exactly one run"), - next_level.first_run().map(std::ops::Deref::deref), + level, + next_level, state.hidden_set(), overshoot_bytes, self.target_size, + cmp, ) else { return Choice::DoNothing; }; diff --git a/src/key_range.rs b/src/key_range.rs index 6e25e1bb8..ed77e6485 100644 --- a/src/key_range.rs +++ b/src/key_range.rs @@ -54,9 +54,9 @@ impl KeyRange { /// Returns `true` if the key falls within this key range. /// - /// Uses lexicographic ordering. See [`overlaps_with_key_range_cmp`] for - /// custom comparator support; comparable `_cmp` variants for `contains_key` - /// and `contains_range` can be added when needed (#116). + /// Uses lexicographic ordering. See [`overlaps_with_key_range_cmp`] and + /// [`contains_range_cmp`] for custom comparator support; a `contains_key_cmp` + /// variant can be added when needed (#116). #[must_use] pub fn contains_key(&self, key: &[u8]) -> bool { let (start, end) = self.as_tuple(); @@ -71,6 +71,19 @@ impl KeyRange { start1 <= start2 && end1 >= end2 } + /// Like [`contains_range`], but uses a custom comparator for key ordering. + #[must_use] + pub fn contains_range_cmp( + &self, + other: &Self, + cmp: &dyn crate::comparator::UserComparator, + ) -> bool { + let (start1, end1) = self.as_tuple(); + let (start2, end2) = other.as_tuple(); + cmp.compare(start1, start2) != std::cmp::Ordering::Greater + && cmp.compare(end1, end2) != std::cmp::Ordering::Less + } + /// Returns `true` if the `other` overlaps at least partially with this range. #[must_use] pub fn overlaps_with_key_range(&self, other: &Self) -> bool { @@ -156,6 +169,33 @@ impl KeyRange { Self(min.clone(), max.clone()) } + + /// Like [`aggregate`], but uses a custom comparator for key ordering. + pub fn aggregate_cmp<'a>( + mut iter: impl Iterator, + cmp: &dyn crate::comparator::UserComparator, + ) -> Self { + let Some(first) = iter.next() else { + return Self::empty(); + }; + + let mut min = first.min(); + let mut max = first.max(); + + for other in iter { + let x = other.min(); + if cmp.compare(x, min) == std::cmp::Ordering::Less { + min = x; + } + + let x = other.max(); + if cmp.compare(x, max) == std::cmp::Ordering::Greater { + max = x; + } + } + + Self(min.clone(), max.clone()) + } } #[cfg(test)] diff --git a/src/range.rs b/src/range.rs index 745e281ef..f4815b35c 100644 --- a/src/range.rs +++ b/src/range.rs @@ -317,9 +317,11 @@ impl TreeIter { )] let new_run = Run::new(surviving).expect("non-empty surviving tables"); - if let Some(reader) = - RunReader::new(Arc::new(new_run), user_range.clone()) - { + if let Some(reader) = RunReader::new_cmp( + Arc::new(new_run), + user_range.clone(), + lock.comparator.as_ref(), + ) { iters.push(Box::new(reader.filter(move |item| match item { Ok(item) => seqno_filter(item.key.seqno, seqno), Err(_) => true, @@ -613,7 +615,9 @@ impl TreeIter { } for run in multi_runs { - if let Some(reader) = RunReader::new(run, user_range.clone()) { + if let Some(reader) = + RunReader::new_cmp(run, user_range.clone(), lock.comparator.as_ref()) + { iters.push(Box::new(reader.filter(move |item| match item { Ok(item) => seqno_filter(item.key.seqno, seqno), Err(_) => true, diff --git a/src/run_reader.rs b/src/run_reader.rs index a45f5dfa0..701f0deca 100644 --- a/src/run_reader.rs +++ b/src/run_reader.rs @@ -41,14 +41,31 @@ pub struct RunReader { } impl RunReader { + /// Creates a new `RunReader` using default lexicographic key ordering. + /// + /// For trees with a custom [`crate::comparator::UserComparator`], use [`new_cmp`] instead. #[must_use] + #[cfg_attr( + not(test), + expect(dead_code, reason = "crate-internal API — used by other modules") + )] pub fn new + Clone + Send + 'static>( run: Arc>, range: R, + ) -> Option { + Self::new_cmp(run, range, &crate::comparator::DefaultUserComparator) + } + + /// Like [`new`], but uses a custom comparator for key ordering. + #[must_use] + pub fn new_cmp + Clone + Send + 'static>( + run: Arc>, + range: R, + cmp: &dyn crate::comparator::UserComparator, ) -> Option { assert!(!run.is_empty(), "level reader cannot read empty level"); - let (lo, hi) = run.range_overlap_indexes(&range)?; + let (lo, hi) = run.range_overlap_indexes_cmp(&range, cmp)?; Some(Self::culled(run, range, (Some(lo), Some(hi)))) } diff --git a/src/table/util.rs b/src/table/util.rs index 6a5a9a0e4..39d923b2f 100644 --- a/src/table/util.rs +++ b/src/table/util.rs @@ -12,6 +12,11 @@ use std::{path::Path, sync::Arc}; #[cfg(feature = "metrics")] use crate::metrics::Metrics; +/// Returns the bounding key range of a table slice. +/// +/// Takes `first().min()` and `last().max()` — no comparison needed because +/// callers pass tables that are already sorted in comparator order (via +/// `push_cmp` / `sort_by_cmp`). Works correctly for any comparator. #[must_use] pub fn aggregate_run_key_range(tables: &[Table]) -> KeyRange { #[expect(clippy::expect_used, reason = "runs are never empty by definition")] diff --git a/src/version/mod.rs b/src/version/mod.rs index 49c008fad..905216214 100644 --- a/src/version/mod.rs +++ b/src/version/mod.rs @@ -137,6 +137,33 @@ impl Level { KeyRange::aggregate(key_ranges.iter()) } } + + /// Like [`aggregate_key_range`], but uses a custom comparator for key ordering. + /// + /// Per-run aggregation via [`Run::aggregate_key_range`] is comparator-correct + /// because runs are sorted in comparator order (ensured by `push_cmp`), so + /// `first().min()` and `last().max()` yield the true extremes under the + /// configured comparator. The cross-run aggregation then uses `aggregate_cmp` + /// to find the global min/max. + pub fn aggregate_key_range_cmp(&self, cmp: &dyn crate::comparator::UserComparator) -> KeyRange { + if self.run_count() == 1 { + #[expect( + clippy::expect_used, + reason = "we check for run_count, so the first run must exist" + )] + self.runs + .first() + .expect("should exist") + .aggregate_key_range() + } else { + let key_ranges = self + .iter() + .map(|x| Run::aggregate_key_range(x)) + .collect::>(); + + KeyRange::aggregate_cmp(key_ranges.iter(), cmp) + } + } } pub struct VersionInner { @@ -246,6 +273,10 @@ impl Version { }) .collect::>>()?; + // Tables are in persisted order, which preserves the + // comparator-sorted order from when the run was written. + // No re-sort needed — the manifest faithfully round-trips + // the run's table sequence. Ok(Arc::new( #[expect( clippy::expect_used, diff --git a/src/version/run.rs b/src/version/run.rs index 80ab96b14..46aa07257 100644 --- a/src/version/run.rs +++ b/src/version/run.rs @@ -62,6 +62,25 @@ impl std::ops::Deref for Run { } } +/// Returns the span between the first and last element matching `pred`. +/// +/// Note: non-matching elements *between* matches are included. This is +/// correct for `get_contained` / `get_contained_cmp` where the overlap +/// window guarantees contiguity of matching tables. +fn trim_slice(s: &[T], pred: F) -> &[T] +where + F: Fn(&T) -> bool, +{ + let start = s.iter().position(&pred).unwrap_or(s.len()); + let end = s.iter().rposition(&pred).map_or(start, |i| i + 1); + + #[expect( + clippy::expect_used, + reason = "start..end are derived from position/rposition on the same slice" + )] + s.get(start..end).expect("should be in range") +} + impl Run { pub fn new(items: Vec) -> Option { if items.is_empty() { @@ -195,23 +214,6 @@ impl Run { /// Returns the sub slice of tables of tables in the run that have /// a key range fully contained in the input key range. pub fn get_contained<'a>(&'a self, key_range: &KeyRange) -> &'a [T] { - fn trim_slice(s: &[T], pred: F) -> &[T] - where - F: Fn(&T) -> bool, - { - // find first index where pred holds - let start = s.iter().position(&pred).unwrap_or(s.len()); - - // find last index where pred holds - let end = s.iter().rposition(&pred).map_or(start, |i| i + 1); - - #[expect( - clippy::expect_used, - reason = "start..end are derived from position/rposition on the same slice" - )] - s.get(start..end).expect("should be in range") - } - let range = key_range.min()..=key_range.max(); let Some((lo, hi)) = self.range_overlap_indexes::(&range) else { @@ -223,6 +225,23 @@ impl Run { .unwrap_or_default() } + /// Like [`get_contained`], but uses a custom comparator for key ordering. + pub fn get_contained_cmp<'a>( + &'a self, + key_range: &KeyRange, + cmp: &dyn UserComparator, + ) -> &'a [T] { + let range = key_range.min()..=key_range.max(); + + let Some((lo, hi)) = self.range_overlap_indexes_cmp::(&range, cmp) else { + return &[]; + }; + + self.get(lo..=hi) + .map(|slice| trim_slice(slice, |x| key_range.contains_range_cmp(x.key_range(), cmp))) + .unwrap_or_default() + } + /// Returns the indexes of the interval [min, max] of tables that overlap with a given range. pub fn range_overlap_indexes, R: RangeBounds>( &self, @@ -542,6 +561,74 @@ mod tests { ); } + #[test] + fn run_range_contained_cmp_reverse() { + use crate::comparator::UserComparator; + use crate::TableId; + + struct ReverseCmp; + impl UserComparator for ReverseCmp { + fn name(&self) -> &'static str { + "reverse" + } + fn compare(&self, a: &[u8], b: &[u8]) -> std::cmp::Ordering { + b.cmp(a) + } + } + + // Reverse comparator: tables store (comparator_min, comparator_max). + // In reverse order "z" < "p" < "o" < ... < "a", so key ranges are + // (z,p), (o,k), (j,e), (d,a) — matching production SST metadata. + let items = vec![ + s(0, "z", "p"), + s(1, "o", "k"), + s(2, "j", "e"), + s(3, "d", "a"), + ]; + let run = Run(items); + let cmp = ReverseCmp; + + // Full range contains all + assert_eq!( + &[0, 1, 2, 3], + &*run + .get_contained_cmp(&KeyRange::new((b"z".into(), b"a".into())), &cmp) + .iter() + .map(|x| x.id) + .collect::>(), + ); + + // Partial: z..k contains tables 0 and 1 + assert_eq!( + &[0, 1], + &*run + .get_contained_cmp(&KeyRange::new((b"z".into(), b"k".into())), &cmp) + .iter() + .map(|x| x.id) + .collect::>(), + ); + + // Exact match: single table + assert_eq!( + &[2 as TableId], + &*run + .get_contained_cmp(&KeyRange::new((b"j".into(), b"e".into())), &cmp) + .iter() + .map(|x| x.id) + .collect::>(), + ); + + // No table fully contained + assert_eq!( + &[] as &[TableId], + &*run + .get_contained_cmp(&KeyRange::new((b"z".into(), b"z".into())), &cmp) + .iter() + .map(|x| x.id) + .collect::>(), + ); + } + #[test] fn run_range_overlaps() { let items = vec![ diff --git a/tests/custom_comparator.rs b/tests/custom_comparator.rs index 9e2404539..1c0273c19 100644 --- a/tests/custom_comparator.rs +++ b/tests/custom_comparator.rs @@ -288,6 +288,8 @@ fn u64_comparator_bounded_range_scan() -> lsm_tree::Result<()> { Ok(()) } +// --- Tests from #101: comparator name persistence and mismatch detection --- + #[test] fn reopen_with_same_comparator_succeeds() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; @@ -423,3 +425,243 @@ fn oversized_comparator_name_rejected_on_create() { Err(e) => panic!("expected InvalidInput Io error, got {e:?}"), } } + +// --- Regression tests for #98: Run::push() comparator bug --- + +/// Regression test for #98: Run::push() sorted tables lexicographically, +/// breaking iteration order after compaction with non-lexicographic comparators. +#[test] +fn reverse_comparator_after_compaction() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let cmp: SharedComparator = Arc::new(ReverseComparator); + + let tree = Config::new(folder, Default::default(), Default::default()) + .comparator(cmp) + .open()?; + + // Flush two separate SSTs so compaction merges them + tree.insert("a", "val_a", 0); + tree.insert("b", "val_b", 1); + tree.flush_active_memtable(2)?; + + tree.insert("c", "val_c", 3); + tree.insert("d", "val_d", 4); + tree.flush_active_memtable(5)?; + + // Major compaction merges the two SSTs into one + tree.major_compact(u64::MAX, 6)?; + + // Iteration order must be reverse: d, c, b, a + let items: Vec<_> = tree + .iter(7, None) + .map(|g| { + let (k, _) = g.into_inner().unwrap(); + String::from_utf8(k.to_vec()).unwrap() + }) + .collect(); + + assert_eq!(items, vec!["d", "c", "b", "a"]); + + // Point reads must still work + assert_eq!(tree.get("a", 7)?, Some("val_a".as_bytes().into())); + assert_eq!(tree.get("d", 7)?, Some("val_d".as_bytes().into())); + + Ok(()) +} + +/// Regression test for #98: verify leveled compaction preserves +/// comparator ordering across multiple flushes. +#[test] +fn reverse_comparator_leveled_compaction() -> lsm_tree::Result<()> { + use lsm_tree::compaction::Leveled; + + let folder = tempfile::tempdir()?; + let cmp: SharedComparator = Arc::new(ReverseComparator); + + let tree = Config::new(folder, Default::default(), Default::default()) + .comparator(cmp) + .open()?; + + // Create multiple flushes to trigger L0 -> L1 compaction. + // Use monotonically increasing seqnos to avoid MVCC visibility issues. + let mut seqno: u64 = 0; + for batch in 0..4u8 { + let base = batch * 3; + for i in 0..3u8 { + let key = [base + i + b'a']; + tree.insert(key, format!("val_{}", key[0] as char), seqno); + seqno += 1; + } + tree.flush_active_memtable(seqno)?; + seqno += 1; + } + + // Compact with leveled strategy + tree.compact(Arc::new(Leveled::default()), seqno)?; + + // All 12 keys should iterate in reverse order + let items: Vec<_> = tree + .iter(seqno, None) + .map(|g| { + let (k, _) = g.into_inner().unwrap(); + String::from_utf8(k.to_vec()).unwrap() + }) + .collect(); + + // Reverse of a..l is l, k, j, i, h, g, f, e, d, c, b, a + let mut expected: Vec = (b'a'..=b'l').map(|c| String::from(c as char)).collect(); + expected.reverse(); + + assert_eq!(items, expected); + + Ok(()) +} + +/// Regression test for #98: merge operators must resolve correctly +/// after compaction with custom comparator. +#[test] +fn reverse_comparator_compaction_with_merge_operator() -> lsm_tree::Result<()> { + use lsm_tree::MergeOperator; + + struct ConcatMerge; + impl MergeOperator for ConcatMerge { + fn merge( + &self, + _key: &[u8], + _base: Option<&[u8]>, + operands: &[&[u8]], + ) -> lsm_tree::Result { + let mut result = Vec::new(); + for (i, op) in operands.iter().enumerate() { + if i > 0 { + result.push(b','); + } + result.extend_from_slice(op); + } + Ok(result.into()) + } + } + + let folder = tempfile::tempdir()?; + let cmp: SharedComparator = Arc::new(ReverseComparator); + + let tree = Config::new(folder, Default::default(), Default::default()) + .comparator(cmp) + .with_merge_operator(Some(Arc::new(ConcatMerge))) + .open()?; + + // Two flushes with merge values for the same key + tree.merge("key", "v1", 0); + tree.flush_active_memtable(1)?; + + tree.merge("key", "v2", 2); + tree.flush_active_memtable(3)?; + + // Compaction should merge the values correctly + tree.major_compact(u64::MAX, 4)?; + + let val = tree.get("key", 5)?; + assert_eq!(val, Some(b"v1,v2".as_ref().into())); + + Ok(()) +} + +/// Regression test for #98: tombstones should be applied correctly +/// after compaction with custom comparator. +#[test] +fn reverse_comparator_compaction_with_tombstone() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?; + let cmp: SharedComparator = Arc::new(ReverseComparator); + + let tree = Config::new(folder, Default::default(), Default::default()) + .comparator(cmp) + .open()?; + + tree.insert("a", "val_a", 0); + tree.insert("b", "val_b", 1); + tree.insert("c", "val_c", 2); + tree.flush_active_memtable(3)?; + + // Delete "b" in a second flush + tree.remove("b", 4); + tree.flush_active_memtable(5)?; + + tree.major_compact(u64::MAX, 6)?; + + // "b" should be gone + assert_eq!(tree.get("b", 7)?, None); + + // Remaining keys in reverse order + let items: Vec<_> = tree + .iter(7, None) + .map(|g| { + let (k, _) = g.into_inner().unwrap(); + String::from_utf8(k.to_vec()).unwrap() + }) + .collect(); + + assert_eq!(items, vec!["c", "a"]); + + Ok(()) +} + +/// Exercises RunReader::new_cmp path in range scans. +/// Needs multiple SSTs in a single disjoint run (L1+) so RunReader +/// is used instead of single-table fast path. +#[test] +fn u64_comparator_range_scan_multi_table_run() -> lsm_tree::Result<()> { + use lsm_tree::compaction::Leveled; + + let folder = tempfile::tempdir()?; + let cmp: SharedComparator = Arc::new(U64BigEndianComparator); + + let tree = Config::new(folder, Default::default(), Default::default()) + .comparator(cmp) + .open()?; + + // Create 3 flushes with disjoint key ranges → after leveled compaction + // they end up as one multi-table run in L1. + for &key in &[10u64, 20, 30] { + tree.insert(key.to_be_bytes(), format!("v{key}"), key); + } + tree.flush_active_memtable(0)?; + + for &key in &[40u64, 50, 60] { + tree.insert(key.to_be_bytes(), format!("v{key}"), key); + } + tree.flush_active_memtable(0)?; + + for &key in &[70u64, 80, 90] { + tree.insert(key.to_be_bytes(), format!("v{key}"), key); + } + tree.flush_active_memtable(0)?; + + // Compact into L1 — creates multi-table run + tree.compact(Arc::new(Leveled::default()), 100)?; + + // Full range scan — exercises RunReader::new_cmp on multi-table run + let items: Vec = tree + .iter(100, None) + .map(|g| { + let (k, _) = g.into_inner().unwrap(); + u64::from_be_bytes(k[..8].try_into().unwrap()) + }) + .collect(); + + assert_eq!(items, vec![10, 20, 30, 40, 50, 60, 70, 80, 90]); + + // Bounded range scan — exercises RunReader::new_cmp with bounds + let lo = 30u64.to_be_bytes(); + let hi = 70u64.to_be_bytes(); + let items: Vec = tree + .range(lo..=hi, 100, None) + .map(|g| { + let (k, _) = g.into_inner().unwrap(); + u64::from_be_bytes(k[..8].try_into().unwrap()) + }) + .collect(); + + assert_eq!(items, vec![30, 40, 50, 60, 70]); + + Ok(()) +} diff --git a/tests/custom_comparator_compaction.rs b/tests/custom_comparator_compaction.rs index f4835c001..3a315bf08 100644 --- a/tests/custom_comparator_compaction.rs +++ b/tests/custom_comparator_compaction.rs @@ -450,7 +450,7 @@ fn reverse_comparator_compaction_with_updates() -> lsm_tree::Result<()> { } #[test] -#[ignore = "RunReader needs comparator plumbing (#116)"] +#[ignore = "range bounds interpretation for reverse comparator (#116)"] fn reverse_comparator_range_scan_after_compaction() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; let seqno = SequenceNumberCounter::default(); @@ -829,7 +829,7 @@ fn reverse_comparator_merge_after_compaction() -> lsm_tree::Result<()> { } #[test] -#[ignore = "RunReader needs comparator plumbing (#116)"] +#[ignore = "range bounds interpretation for reverse comparator (#116)"] fn reverse_comparator_merge_range_scan_after_compaction() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; let seqno = SequenceNumberCounter::default();