Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 0 additions & 68 deletions src/memtable/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,31 +144,6 @@ impl Memtable {
})
}

/// Collects all entries for a given key with seqno < `seqno`,
/// ordered by descending sequence number (newest first).
///
/// Used by the merge operator read path to collect all operands for a key.
// Allocates a Vec and clones entries — acceptable for the merge slow-path.
// A zero-copy iterator API would avoid this but changes the skiplist contract.
pub(crate) fn get_all_for_key(&self, key: &[u8], seqno: SeqNo) -> Vec<InternalValue> {
if seqno == 0 {
return Vec::new();
}

// ValueType is not part of InternalKey ordering (only user_key + Reverse(seqno)),
// so the value type here is arbitrary — it does not affect seek position.
let lower_bound = InternalKey::new(key, seqno - 1, ValueType::Value);

self.items
.range(lower_bound..)
.take_while(|entry| &*entry.key().user_key == key)
.map(|entry| InternalValue {
key: entry.key().clone(),
value: entry.value().clone(),
})
.collect()
}

/// Gets approximate size of memtable in bytes.
pub fn size(&self) -> u64 {
self.approximate_size
Expand Down Expand Up @@ -693,47 +668,4 @@ mod tests {
memtable.get(b"abc", 50)
);
}

#[test]
fn get_all_for_key_seqno_zero_returns_empty() {
let memtable = Memtable::new(0);
memtable.insert(crate::InternalValue::from_components(
"key",
"val",
1,
ValueType::Value,
));

// seqno=0 means nothing is visible — early return
assert!(memtable.get_all_for_key(b"key", 0).is_empty());
}

#[test]
fn get_all_for_key_returns_all_versions() {
let memtable = Memtable::new(0);
memtable.insert(crate::InternalValue::from_components(
"key",
"op2",
3,
ValueType::MergeOperand,
));
memtable.insert(crate::InternalValue::from_components(
"key",
"op1",
2,
ValueType::MergeOperand,
));
memtable.insert(crate::InternalValue::from_components(
"key",
"base",
1,
ValueType::Value,
));

let entries = memtable.get_all_for_key(b"key", 4);
assert_eq!(entries.len(), 3);
assert_eq!(entries[0].key.seqno, 3);
assert_eq!(entries[1].key.seqno, 2);
assert_eq!(entries[2].key.seqno, 1);
}
}
107 changes: 50 additions & 57 deletions src/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,12 @@ pub struct IterState {
/// hash will be skipped entirely during the scan.
pub(crate) prefix_hash: Option<u64>,

/// Optional key hash for standard bloom filter pre-filtering.
///
/// When set (typically for single-key point-read pipelines), segments
/// whose bloom filter reports no match for this hash will be skipped.
pub(crate) key_hash: Option<u64>,

/// Optional metrics handle for recording prefix-related statistics (e.g. bloom skips).
///
/// `None` when the caller does not wish to record metrics; this is
Expand Down Expand Up @@ -130,6 +136,41 @@ fn range_tombstone_overlaps_bounds(
overlaps_lo && overlaps_hi
}

/// Checks prefix and key bloom filters for a table.
///
/// Returns `true` if the table should be included (bloom says "maybe" or no
/// filter available), `false` if it can be safely skipped.
fn bloom_passes(state: &IterState, table: &crate::table::Table) -> bool {
Comment thread
polaz marked this conversation as resolved.
if let Some(prefix_hash) = state.prefix_hash {
match table.maybe_contains_prefix(prefix_hash) {
Ok(false) => {
#[cfg(feature = "metrics")]
if let Some(m) = &state.metrics {
m.prefix_bloom_skips
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
return false;
}
Err(e) => {
log::debug!("prefix bloom check failed for table {:?}: {e}", table.id(),);
}
_ => {}
}
}

if let Some(key_hash) = state.key_hash {
match table.bloom_may_contain_key_hash(key_hash) {
Ok(false) => return false,
Err(e) => {
log::debug!("key bloom check failed for table {:?}: {e}", table.id(),);
}
_ => {}
}
}

true
}

impl TreeIter {
#[expect(
clippy::too_many_lines,
Expand Down Expand Up @@ -231,39 +272,9 @@ impl TreeIter {
if table.check_key_range_overlap(&(
user_range.0.as_ref().map(std::convert::AsRef::as_ref),
user_range.1.as_ref().map(std::convert::AsRef::as_ref),
)) {
// If a prefix hash is available (prefix scan with prefix bloom
// filters configured), check the bloom filter for the prefix.
// Skip the segment if the prefix is definitely absent.
if let Some(prefix_hash) = lock.prefix_hash {
match table.maybe_contains_prefix(prefix_hash) {
Ok(false) => {
// Prefix bloom says this segment has no matching keys
// — skip it entirely.
#[cfg(feature = "metrics")]
if let Some(m) = &lock.metrics {
m.prefix_bloom_skips
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}
Ok(true) => {
single_tables.push(table.clone());
}
Err(e) => {
// On I/O error reading the filter, include the segment
// conservatively to avoid missing data. Use debug level
// to avoid log noise during transient I/O issues in
// prefix-heavy workloads.
log::debug!(
"prefix bloom check failed for table {:?}: {e}",
table.id(),
);
single_tables.push(table.clone());
}
}
} else {
single_tables.push(table.clone());
}
)) && bloom_passes(lock, table)
{
single_tables.push(table.clone());
}
Comment thread
polaz marked this conversation as resolved.
}
_ => {
Expand All @@ -280,9 +291,11 @@ impl TreeIter {
);
}

// If a prefix hash is available, filter individual tables
// within the multi-table run using their bloom filters.
if let Some(prefix_hash) = lock.prefix_hash {
// If a prefix or key hash is available, filter individual
// tables within the multi-table run using their bloom
// filters. This covers both prefix scans (prefix_hash)
// and point-read merge pipelines (key_hash).
if lock.prefix_hash.is_some() || lock.key_hash.is_some() {
let bounds = (
user_range.0.as_ref().map(std::convert::AsRef::as_ref),
user_range.1.as_ref().map(std::convert::AsRef::as_ref),
Expand All @@ -297,27 +310,7 @@ impl TreeIter {
return false;
}

// On I/O error reading the filter, include the
// table conservatively to avoid missing data.
let contains = table
.maybe_contains_prefix(prefix_hash)
.inspect_err(|e| {
log::debug!(
"prefix bloom check failed for table {:?}: {e}",
table.id(),
);
})
.unwrap_or(true);

#[cfg(feature = "metrics")]
if !contains {
if let Some(m) = &lock.metrics {
m.prefix_bloom_skips
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
}

contains
bloom_passes(lock, table)
})
.cloned()
.collect();
Expand Down
48 changes: 36 additions & 12 deletions src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -780,25 +780,24 @@ impl Table {
self.metadata.key_range.overlaps_with_bounds(bounds)
}

/// Checks the bloom filter for a prefix hash.
/// Checks the full-table bloom filter for a hash value.
///
/// Returns `Ok(true)` if the prefix may exist in this table (or if no
/// filter is available), `Ok(false)` if the prefix is definitely absent.
/// Returns `Ok(true)` if the hash may exist in the filter (or if no full
/// filter is available), `Ok(false)` if the hash is definitely absent.
///
/// This is used by prefix scans to skip segments that contain no keys
/// with a matching prefix. The prefix must have been indexed at write
/// time via a [`PrefixExtractor`](crate::PrefixExtractor).
pub(crate) fn maybe_contains_prefix(&self, prefix_hash: u64) -> crate::Result<bool> {
/// Handles full (non-partitioned) filters directly. Partitioned / TLI
/// filters are keyed by user key, not raw hash, so this method returns
/// `Ok(true)` conservatively for those types.
fn bloom_may_contain_hash(&self, hash: u64) -> crate::Result<bool> {
// Full (non-partitioned) filter — single bloom covers the entire table
if let Some(block) = &self.pinned_filter_block {
return block.maybe_contains_hash(prefix_hash);
return block.maybe_contains_hash(hash);
}

// Partitioned / TLI filters: partition index is keyed by user key, not
// prefix hash — we would need to scan ALL partitions to check the prefix,
// raw hash — we would need to scan ALL partitions to check,
// which is O(partitions) I/O and defeats the purpose of bloom skip.
// Returning Ok(true) is correct (conservative: segment is NOT skipped).
// Future: accept prefix bounds to seek overlapping partitions only.
if self.pinned_filter_index.is_some() || self.regions.filter_tli.is_some() {
return Ok(true);
}
Expand All @@ -813,13 +812,38 @@ impl Table {
CompressionType::None, // NOTE: Filter blocks are never compressed (crate invariant)
)?;
let block = FilterBlock::new(block);
return block.maybe_contains_hash(prefix_hash);
return block.maybe_contains_hash(hash);
}

// No filter available — cannot rule out the prefix
// No filter available — cannot rule out the hash
Ok(true)
}

/// Checks the bloom filter for a prefix hash.
///
/// Returns `Ok(true)` if the prefix may exist in this table (or if no
/// filter is available), `Ok(false)` if the prefix is definitely absent.
///
/// This is used by prefix scans to skip segments that contain no keys
/// with a matching prefix. The prefix must have been indexed at write
/// time via a [`PrefixExtractor`](crate::PrefixExtractor).
pub(crate) fn maybe_contains_prefix(&self, prefix_hash: u64) -> crate::Result<bool> {
self.bloom_may_contain_hash(prefix_hash)
}

/// Checks the bloom filter for a precomputed key hash.
///
/// Returns `Ok(true)` if the key may exist in this table (or if no
/// filter is available), `Ok(false)` if the key is definitely absent.
///
/// Used by the point-read merge pipeline to pre-filter disk tables
/// before building range iterators. For partitioned or TLI filter
/// configurations, the underlying check returns `Ok(true)` conservatively,
/// so pre-filtering is best-effort and configuration-dependent.
pub(crate) fn bloom_may_contain_key_hash(&self, key_hash: u64) -> crate::Result<bool> {
self.bloom_may_contain_hash(key_hash)
}
Comment thread
polaz marked this conversation as resolved.

/// Returns the highest effective sequence number in the table.
///
/// For tables produced by flush/compaction (`global_seqno == 0`), this
Expand Down
Loading
Loading