Skip to content
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,9 @@ name = "prefix_bloom"
harness = false
path = "benches/prefix_bloom.rs"
required-features = []

[[bench]]
name = "merge_point_read"
harness = false
path = "benches/merge_point_read.rs"
required-features = []
116 changes: 116 additions & 0 deletions benches/merge_point_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use criterion::{criterion_group, criterion_main, Criterion};
use lsm_tree::{AbstractTree, Cache, Config, MergeOperator, SequenceNumberCounter, UserValue};
use std::sync::Arc;
use tempfile::tempdir;

/// Simple counter merge operator for benchmarks.
struct CounterMerge;

impl MergeOperator for CounterMerge {
fn merge(
&self,
_key: &[u8],
base_value: Option<&[u8]>,
operands: &[&[u8]],
) -> lsm_tree::Result<UserValue> {
let mut counter: i64 = match base_value {
Some(bytes) if bytes.len() == 8 => {
i64::from_le_bytes(bytes.try_into().expect("checked"))
}
_ => 0,
};
for op in operands {
if op.len() == 8 {
counter += i64::from_le_bytes((*op).try_into().expect("checked"));
}
}
Ok(counter.to_le_bytes().to_vec().into())
}
}

fn merge_point_read_deep_tree(c: &mut Criterion) {
let mut group = c.benchmark_group("merge point read");
group.sample_size(100);

for table_count in [10, 50, 100] {
// --- Uncached: cold disk reads ---
let folder = tempdir().unwrap();
let tree = Config::new(
&folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.use_cache(Arc::new(Cache::with_capacity_bytes(0)))
.with_merge_operator(Some(Arc::new(CounterMerge)))
.open()
.unwrap();

Comment thread
coderabbitai[bot] marked this conversation as resolved.
let mut seqno = 0u64;

// Base value on disk
tree.insert("counter", 100_i64.to_le_bytes(), seqno);
seqno += 1;
tree.flush_active_memtable(0).unwrap();

// Create many tables with unrelated keys (bloom should reject these)
for i in 1..table_count {
let key = format!("other_{i:04}");
tree.insert(key, 0_i64.to_le_bytes(), seqno);
seqno += 1;
tree.flush_active_memtable(0).unwrap();
}

// Merge operand in active memtable
tree.merge("counter", 1_i64.to_le_bytes(), seqno);
seqno += 1;

group.bench_function(format!("merge get, {table_count} tables (uncached)"), |b| {
b.iter(|| {
let val = tree.get("counter", seqno).unwrap().unwrap();
let n = i64::from_le_bytes((*val).try_into().unwrap());
assert_eq!(n, 101);
});
});

// --- Cached: warm block cache ---
let folder2 = tempdir().unwrap();
let tree_cached = Config::new(
&folder2,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.use_cache(Arc::new(Cache::with_capacity_bytes(64 * 1_024 * 1_024)))
.with_merge_operator(Some(Arc::new(CounterMerge)))
.open()
.unwrap();

let mut s = 0u64;
tree_cached.insert("counter", 100_i64.to_le_bytes(), s);
s += 1;
tree_cached.flush_active_memtable(0).unwrap();

for i in 1..table_count {
let key = format!("other_{i:04}");
tree_cached.insert(key, 0_i64.to_le_bytes(), s);
s += 1;
tree_cached.flush_active_memtable(0).unwrap();
}

tree_cached.merge("counter", 1_i64.to_le_bytes(), s);
s += 1;

// Warm the cache
let _ = tree_cached.get("counter", s).unwrap();

group.bench_function(format!("merge get, {table_count} tables (cached)"), |b| {
b.iter(|| {
let val = tree_cached.get("counter", s).unwrap().unwrap();
let n = i64::from_le_bytes((*val).try_into().unwrap());
assert_eq!(n, 101);
});
});
}
}

criterion_group!(benches, merge_point_read_deep_tree);
criterion_main!(benches);
68 changes: 0 additions & 68 deletions src/memtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,31 +144,6 @@ impl Memtable {
})
}

/// Collects all entries for a given key with seqno < `seqno`,
/// ordered by descending sequence number (newest first).
///
/// Used by the merge operator read path to collect all operands for a key.
// Allocates a Vec and clones entries — acceptable for the merge slow-path.
// A zero-copy iterator API would avoid this but changes the skiplist contract.
pub(crate) fn get_all_for_key(&self, key: &[u8], seqno: SeqNo) -> Vec<InternalValue> {
if seqno == 0 {
return Vec::new();
}

// ValueType is not part of InternalKey ordering (only user_key + Reverse(seqno)),
// so the value type here is arbitrary — it does not affect seek position.
let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);

self.items
.range(lower_bound..)
.take_while(|entry| &*entry.key().user_key == key)
.map(|entry| InternalValue {
key: entry.key().clone(),
value: entry.value().clone(),
})
.collect()
}

/// Gets approximate size of memtable in bytes.
pub fn size(&self) -> u64 {
self.approximate_size
Expand Down Expand Up @@ -693,47 +668,4 @@ mod tests {
memtable.get(b"abc", 50)
);
}

#[test]
fn get_all_for_key_seqno_zero_returns_empty() {
let memtable = Memtable::new(0);
memtable.insert(crate::InternalValue::from_components(
"key",
"val",
1,
ValueType::Value,
));

// seqno=0 means nothing is visible — early return
assert!(memtable.get_all_for_key(b"key", 0).is_empty());
}

#[test]
fn get_all_for_key_returns_all_versions() {
let memtable = Memtable::new(0);
memtable.insert(crate::InternalValue::from_components(
"key",
"op2",
3,
ValueType::MergeOperand,
));
memtable.insert(crate::InternalValue::from_components(
"key",
"op1",
2,
ValueType::MergeOperand,
));
memtable.insert(crate::InternalValue::from_components(
"key",
"base",
1,
ValueType::Value,
));

let entries = memtable.get_all_for_key(b"key", 4);
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].key.seqno, 3);
assert_eq!(entries[1].key.seqno, 2);
assert_eq!(entries[2].key.seqno, 1);
}
}
107 changes: 50 additions & 57 deletions src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ pub struct IterState {
/// hash will be skipped entirely during the scan.
pub(crate) prefix_hash: Option<u64>,

/// Optional key hash for standard bloom filter pre-filtering.
///
/// When set (typically for single-key point-read pipelines), segments
/// whose bloom filter reports no match for this hash will be skipped.
pub(crate) key_hash: Option<u64>,

/// Optional metrics handle for recording prefix-related statistics (e.g. bloom skips).
///
/// `None` when the caller does not wish to record metrics; this is
Expand Down Expand Up @@ -130,6 +136,41 @@ fn range_tombstone_overlaps_bounds(
overlaps_lo && overlaps_hi
}

/// Checks prefix and key bloom filters for a table.
///
/// Returns `true` if the table should be included (bloom says "maybe" or no
/// filter available), `false` if it can be safely skipped.
fn bloom_passes(state: &IterState, table: &crate::table::Table) -> bool {
Comment thread
polaz marked this conversation as resolved.
if let Some(prefix_hash) = state.prefix_hash {
match table.maybe_contains_prefix(prefix_hash) {
Ok(false) => {
#[cfg(feature = "metrics")]
if let Some(m) = &state.metrics {
m.prefix_bloom_skips
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
return false;
}
Err(e) => {
log::debug!("prefix bloom check failed for table {:?}: {e}", table.id(),);
}
_ => {}
}
}

if let Some(key_hash) = state.key_hash {
match table.bloom_may_contain_key_hash(key_hash) {
Ok(false) => return false,
Err(e) => {
log::debug!("key bloom check failed for table {:?}: {e}", table.id(),);
}
_ => {}
}
}

true
}

impl TreeIter {
#[expect(
clippy::too_many_lines,
Expand Down Expand Up @@ -231,39 +272,9 @@ impl TreeIter {
if table.check_key_range_overlap(&(
user_range.0.as_ref().map(std::convert::AsRef::as_ref),
user_range.1.as_ref().map(std::convert::AsRef::as_ref),
)) {
// If a prefix hash is available (prefix scan with prefix bloom
// filters configured), check the bloom filter for the prefix.
// Skip the segment if the prefix is definitely absent.
if let Some(prefix_hash) = lock.prefix_hash {
match table.maybe_contains_prefix(prefix_hash) {
Ok(false) => {
// Prefix bloom says this segment has no matching keys
// — skip it entirely.
#[cfg(feature = "metrics")]
if let Some(m) = &lock.metrics {
m.prefix_bloom_skips
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
Ok(true) => {
single_tables.push(table.clone());
}
Err(e) => {
// On I/O error reading the filter, include the segment
// conservatively to avoid missing data. Use debug level
// to avoid log noise during transient I/O issues in
// prefix-heavy workloads.
log::debug!(
"prefix bloom check failed for table {:?}: {e}",
table.id(),
);
single_tables.push(table.clone());
}
}
} else {
single_tables.push(table.clone());
}
)) && bloom_passes(lock, table)
{
single_tables.push(table.clone());
}
Comment thread
polaz marked this conversation as resolved.
}
_ => {
Expand All @@ -280,9 +291,11 @@ impl TreeIter {
);
}

// If a prefix hash is available, filter individual tables
// within the multi-table run using their bloom filters.
if let Some(prefix_hash) = lock.prefix_hash {
// If a prefix or key hash is available, filter individual
// tables within the multi-table run using their bloom
// filters. This covers both prefix scans (prefix_hash)
// and point-read merge pipelines (key_hash).
if lock.prefix_hash.is_some() || lock.key_hash.is_some() {
let bounds = (
user_range.0.as_ref().map(std::convert::AsRef::as_ref),
user_range.1.as_ref().map(std::convert::AsRef::as_ref),
Expand All @@ -297,27 +310,7 @@ impl TreeIter {
return false;
}

// On I/O error reading the filter, include the
// table conservatively to avoid missing data.
let contains = table
.maybe_contains_prefix(prefix_hash)
.inspect_err(|e| {
log::debug!(
"prefix bloom check failed for table {:?}: {e}",
table.id(),
);
})
.unwrap_or(true);

#[cfg(feature = "metrics")]
if !contains {
if let Some(m) = &lock.metrics {
m.prefix_bloom_skips
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}

contains
bloom_passes(lock, table)
})
.cloned()
.collect();
Expand Down
Loading
Loading