Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions src/compaction/leveled/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,6 +398,24 @@ impl CompactionStrategy for Strategy {
}
}

// Intra-L0 compaction: merge multiple L0 runs into a single run within L0
// when table count is below the L0→L1 threshold
{
let first_level = version.l0();

if first_level.run_count() > 1
&& first_level.table_count() < usize::from(self.l0_threshold)
&& !version.level_is_busy(0, state.hidden_set())
{
return Choice::Merge(CompactionInput {
table_ids: first_level.list_ids(),
dest_level: 0,
canonical_level: 0,
target_size: self.target_size,
});
}
Comment thread
polaz marked this conversation as resolved.
Comment on lines +406 to +416
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This misses the intention of intra-L0:

  1. it's too expensive (it does not consider the size ratio between L0 and L1)
  2. not the reason why Intra-L0 compaction is typically performed - it is done when L1 is blocked
  3. wrong because the table_count is not relevant too us if all tables form a single run (though rare)
  4. we may not want to merge all tables in L0

}

// Scoring
let mut scores = [(/* score */ 0.0, /* overshoot */ 0u64); 7];

Expand Down
113 changes: 112 additions & 1 deletion src/compaction/leveled/test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::*;
use crate::{AbstractTree, Config, SequenceNumberCounter};
use crate::{AbstractTree, Config, SeqNo, SequenceNumberCounter};
use std::sync::Arc;
use test_log::test;

Expand Down Expand Up @@ -46,6 +46,117 @@ fn leveled_l0_below_limit() -> crate::Result<()> {
Ok(())
}

#[test]
fn leveled_intra_l0_compaction() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;

// Flush 3 overlapping memtables with distinct values (below configured l0_threshold=4)
for i in 0..3u8 {
tree.insert("a", [b'v', i].as_slice(), u64::from(i));
tree.insert([b'k', i].as_slice(), "v", 0);
tree.insert("z", [b'v', i].as_slice(), u64::from(i));
tree.flush_active_memtable(0)?;
}

assert_eq!(3, tree.table_count());
assert!(
tree.l0_run_count() > 1,
"L0 should have multiple overlapping runs"
);

let strategy = Arc::new(
Strategy::default()
.with_l0_threshold(4)
.with_table_target_size(128 * 1024 * 1024),
);
tree.compact(strategy, 0)?;

// Intra-L0 compaction should consolidate runs within L0
assert_eq!(
1,
tree.l0_run_count(),
"L0 should have exactly 1 run after intra-L0 compaction"
);
assert_eq!(
1,
tree.table_count(),
"Tables should be merged into 1 after intra-L0 compaction"
);

// All data must still be readable with correct values
for i in 0..3u8 {
assert!(tree.get([b'k', i].as_slice(), SeqNo::MAX)?.is_some());
}
// Latest visible versions should be the last written values
assert_eq!(
tree.get("a", SeqNo::MAX)?.as_deref(),
Some([b'v', 2].as_slice()),
);
assert_eq!(
tree.get("z", SeqNo::MAX)?.as_deref(),
Some([b'v', 2].as_slice()),
);

// Verify data stayed in L0 (not pushed to L1)
assert!(
tree.current_version()
.level(1)
.map_or(true, |l| l.is_empty()),
"L1 should remain empty after intra-L0 compaction"
);

Ok(())
}

#[test]
fn leveled_intra_l0_preserves_newer_run_ordering() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;

// Flush 2 overlapping memtables (below l0_threshold=4)
tree.insert("key", "old_1", 0);
tree.flush_active_memtable(0)?;
tree.insert("key", "old_2", 1);
tree.flush_active_memtable(0)?;

assert_eq!(2, tree.l0_run_count());

// Intra-L0 compaction merges the 2 runs
let strategy = Arc::new(
Strategy::default()
.with_l0_threshold(4)
.with_table_target_size(128 * 1024 * 1024),
);
tree.compact(strategy, 0)?;
assert_eq!(1, tree.l0_run_count());

// Flush a newer memtable AFTER compaction — this run must be searched first
tree.insert("key", "newest", 2);
tree.flush_active_memtable(0)?;

assert_eq!(2, tree.l0_run_count());

// The newest flush must win: merged (older) run is appended, newer run is at front
assert_eq!(
tree.get("key", SeqNo::MAX)?.as_deref(),
Some(b"newest".as_slice()),
"newer L0 run must be found before merged (older) run"
);
Comment thread
polaz marked this conversation as resolved.

Ok(())
}

#[test]
fn leveled_l0_reached_limit() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
Expand Down
11 changes: 10 additions & 1 deletion src/version/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,16 @@ impl Version {

if level_idx == dest_level {
if let Some(run) = Run::new(new_tables.to_vec()) {
runs.insert(0, run);
if dest_level == 0 {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is flimsy once intra-L0 are not all-or-nothing compactions.

// NOTE: dest_level == 0 in with_merge only occurs for intra-L0
// compaction (memtable flushes use with_new_l0_run, not with_merge).
// Append the merged (older) run so that any concurrently flushed
// (newer) runs remain at the front and are searched first during
// point reads.
runs.push(run);
} else {
runs.insert(0, run);
}
}
}

Expand Down
Loading