diff --git a/src/range.rs b/src/range.rs index cfcc05990..745e281ef 100644 --- a/src/range.rs +++ b/src/range.rs @@ -88,6 +88,14 @@ pub struct IterState { /// whose bloom filter reports no match for this hash will be skipped. pub(crate) key_hash: Option, + /// Optional user key for partition-aware bloom filter seeking. + /// + /// When set alongside `key_hash`, enables partitioned/TLI bloom filters + /// to seek directly to the relevant partition instead of returning the + /// conservative `Ok(true)` fallback. Only set for single-key pipelines + /// (e.g. `resolve_merge_via_pipeline`). + pub(crate) bloom_key: Option, + /// Optional metrics handle for recording prefix-related statistics (e.g. bloom skips). /// /// `None` when the caller does not wish to record metrics; this is @@ -161,8 +169,20 @@ fn bloom_passes(state: &IterState, table: &crate::table::Table) -> bool { } } + // bloom_key without key_hash is meaningless — catch misuse early + debug_assert!( + state.bloom_key.is_none() || state.key_hash.is_some(), + "bloom_key requires key_hash to be set" + ); + if let Some(key_hash) = state.key_hash { - match table.bloom_may_contain_key_hash(key_hash) { + let result = if let Some(bloom_key) = &state.bloom_key { + // UserKey (Slice) implements Deref, coerces to &[u8] + table.bloom_may_contain_key(bloom_key, key_hash) + } else { + table.bloom_may_contain_key_hash(key_hash) + }; + match result { Ok(false) => return false, Err(e) => { log::debug!("key bloom check failed for table {:?}: {e}", table.id(),); diff --git a/src/table/mod.rs b/src/table/mod.rs index 243c3d179..3c1c067e2 100644 --- a/src/table/mod.rs +++ b/src/table/mod.rs @@ -889,6 +889,58 @@ impl Table { self.bloom_may_contain_hash(key_hash) } + /// Checks the bloom filter for a key, with partition-aware seeking. + /// + /// Unlike [`bloom_may_contain_key_hash`](Self::bloom_may_contain_key_hash) + /// which falls back to `Ok(true)` for partitioned filters, this method + /// uses the user key to seek the partition index and check only the + /// matching partition's bloom filter. + /// + /// `key_hash` must be the xxh3 hash of `key` (pre-computed by the caller + /// to avoid redundant hashing — same pattern as [`Table::get`]). + pub(crate) fn bloom_may_contain_key(&self, key: &[u8], key_hash: u64) -> crate::Result { + debug_assert_eq!( + crate::table::filter::standard_bloom::Builder::get_hash(key), + key_hash, + "bloom_may_contain_key: key_hash must be BloomBuilder::get_hash(key)" + ); + + // Full (non-partitioned) filter — delegate to hash-only path. + // A table has either pinned_filter_block (full) or pinned_filter_index + // (partitioned), never both — checked at construction time. + if self.pinned_filter_block.is_some() { + return self.bloom_may_contain_hash(key_hash); + } + + // Partitioned filter with pinned TLI — seek to the matching partition + if let Some(filter_idx) = &self.pinned_filter_index { + let mut iter = filter_idx.iter(self.comparator.clone()); + iter.seek(key, crate::seqno::MAX_SEQNO); + + if let Some(filter_block_handle) = iter.next() { + let filter_block_handle = filter_block_handle.materialize(filter_idx.as_slice()); + + let block = self.load_block( + &filter_block_handle.into_inner(), + BlockType::Filter, + CompressionType::None, + )?; + let block = FilterBlock::new(block); + return block.maybe_contains_hash(key_hash); + } + + // iter.next() == None means the key is beyond all partition + // boundaries (seek found no ceiling entry in the TLI, which is + // ordered by each partition's last user key). The key cannot + // exist in this table. Same logic as Table::get (line ~265). + return Ok(false); + } + + // Unpinned filter — fall through to hash-only path (handles both + // unpinned full filters and the no-filter case) + self.bloom_may_contain_hash(key_hash) + } + /// Returns the highest effective sequence number in the table. /// /// For tables produced by flush/compaction (`global_seqno == 0`), this diff --git a/src/table/tests.rs b/src/table/tests.rs index 6b3a0e3d6..8ac5456d8 100644 --- a/src/table/tests.rs +++ b/src/table/tests.rs @@ -1814,3 +1814,98 @@ fn meta_seqno_kv_max_corruption_returns_invalid_data() -> crate::Result<()> { Ok(()) } + +/// bloom_may_contain_key with full (non-partitioned) filter delegates to +/// bloom_may_contain_hash. Both methods agree for full filters. +#[test] +fn bloom_may_contain_key_full_filter() -> crate::Result<()> { + let items: Vec = ["a", "c", "e"] + .iter() + .enumerate() + .map(|(i, &k)| { + InternalValue::from_components(k, "v", i as u64 + 1, crate::ValueType::Value) + }) + .collect(); + + test_with_table( + &items, + |table| { + let hash_a = BloomBuilder::get_hash(b"a"); + let hash_b = BloomBuilder::get_hash(b"b"); + + // Existing key: both methods must accept + assert!( + table.bloom_may_contain_key(b"a", hash_a)?, + "bloom_may_contain_key must not reject existing key" + ); + assert!( + table.bloom_may_contain_key_hash(hash_a)?, + "bloom_may_contain_key_hash must not reject existing key" + ); + + // For full filters, bloom_may_contain_key delegates to the same + // hash-only path, so both methods return the same result. + let key_result = table.bloom_may_contain_key(b"b", hash_b)?; + let hash_result = table.bloom_may_contain_key_hash(hash_b)?; + assert_eq!( + key_result, hash_result, + "full filter: key-based and hash-only should agree" + ); + + Ok(()) + }, + None, + Some(|w: Writer| w.use_bloom_policy(BloomConstructionPolicy::BitsPerKey(10.0))), + ) +} + +/// bloom_may_contain_key with partitioned filter seeks the correct partition +/// and returns Ok(false) for a key beyond all partition boundaries. +/// +/// Contrast: bloom_may_contain_key_hash returns Ok(true) conservatively +/// for the same key because it cannot seek partitions by hash alone. +/// This is the core behavioral improvement introduced by this PR. +#[test] +fn bloom_may_contain_key_partitioned_filter() -> crate::Result<()> { + let items: Vec = (0u64..100) + .map(|i| { + let key = format!("key_{i:04}"); + InternalValue::from_components(key, "v", i + 1, crate::ValueType::Value) + }) + .collect(); + + test_with_table( + &items, + |table| { + // Key that exists: both methods must accept + let hash_exist = BloomBuilder::get_hash(b"key_0050"); + assert!( + table.bloom_may_contain_key(b"key_0050", hash_exist)?, + "bloom must not reject existing key in partitioned filter" + ); + + // Key beyond all partitions: with a pinned partition index, key-based + // seek finds no ceiling and must return Ok(false). + // Note: pinned_filter_index is always loaded when filter_tli exists + // (unconditional in Table::recover), so this is always the partition-aware path. + let hash_beyond = BloomBuilder::get_hash(b"zzz_beyond"); + assert!( + !table.bloom_may_contain_key(b"zzz_beyond", hash_beyond)?, + "key beyond all partitions should be rejected when partition index is available" + ); + + // Hash-only path always returns Ok(true) conservatively for partitioned filters + assert!( + table.bloom_may_contain_key_hash(hash_beyond)?, + "hash-only bloom check should remain conservative for partitioned filters" + ); + + Ok(()) + }, + None, + Some(|w: Writer| { + w.use_bloom_policy(BloomConstructionPolicy::BitsPerKey(10.0)) + .use_partitioned_filter() + }), + ) +} diff --git a/src/tree/mod.rs b/src/tree/mod.rs index d24176e33..dd022981d 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -821,6 +821,10 @@ impl Tree { use crate::range::{IterState, TreeIter}; let key_hash = crate::table::filter::standard_bloom::Builder::get_hash(key); + // NOTE: Slice::from(&[u8]) copies the key (small, typically < 100 bytes). + // This runs once per merge resolution, not per-table — cost is negligible + // compared to the I/O saved by partition-aware bloom filtering. + let bloom_key = crate::Slice::from(key); let comparator = version.active_memtable.comparator.clone(); let iter_state = IterState { @@ -830,6 +834,7 @@ impl Tree { comparator, prefix_hash: None, key_hash: Some(key_hash), + bloom_key: Some(bloom_key), #[cfg(feature = "metrics")] metrics: None, }; @@ -906,6 +911,7 @@ impl Tree { comparator, prefix_hash, key_hash: None, + bloom_key: None, #[cfg(feature = "metrics")] metrics: None, }; @@ -1295,6 +1301,7 @@ impl Tree { comparator: self.config.comparator.clone(), prefix_hash, key_hash: None, + bloom_key: None, #[cfg(feature = "metrics")] metrics: Some(self.0.metrics.clone()), }; diff --git a/tests/partitioned_bloom_skip.rs b/tests/partitioned_bloom_skip.rs new file mode 100644 index 000000000..60ffe7c73 --- /dev/null +++ b/tests/partitioned_bloom_skip.rs @@ -0,0 +1,194 @@ +use lsm_tree::MergeOperator; + +/// i64 summation merge operator shared across merge pipeline tests. +struct SumMerge; +impl MergeOperator for SumMerge { + fn merge( + &self, + _key: &[u8], + base_value: Option<&[u8]>, + operands: &[&[u8]], + ) -> lsm_tree::Result { + let mut sum: i64 = base_value + .map(|b| { + i64::from_le_bytes( + b.try_into() + .expect("invalid base value length for i64 in SumMerge"), + ) + }) + .unwrap_or(0); + for op in operands { + sum += i64::from_le_bytes( + (*op) + .try_into() + .expect("invalid operand length for i64 in SumMerge"), + ); + } + Ok(sum.to_le_bytes().to_vec().into()) + } +} + +/// Tests that partitioned bloom filters are consulted for non-matching keys +/// via the Table::get path (which has partition-aware bloom seeking). +/// +/// Metrics confirm that a filter lookup occurred for a non-matching key. +#[test_log::test] +#[cfg(feature = "metrics")] +fn partitioned_bloom_skip_for_point_reads() -> lsm_tree::Result<()> { + use lsm_tree::{ + config::PinningPolicy, get_tmp_folder, AbstractTree, Config, SequenceNumberCounter, + MAX_SEQNO, + }; + + let folder = get_tmp_folder(); + let path = folder.path(); + + let seqno = SequenceNumberCounter::default(); + + let tree = Config::new(path, seqno.clone(), SequenceNumberCounter::default()) + .filter_block_partitioning_policy(PinningPolicy::all(true)) + .open()?; + + tree.insert("a", "val_a", seqno.next()); + tree.insert("c", "val_c", seqno.next()); + tree.flush_active_memtable(0)?; + + assert!(tree.get("b", MAX_SEQNO)?.is_none()); + + // Bloom filters are probabilistic — a false positive for "b" is possible + // (though unlikely at 10 bpk with 2 keys, FPR ~0.8%). We assert the filter + // was queried; the skip fires in the common case. + assert!( + tree.metrics().filter_queries() >= 1, + "expected at least one filter query for non-matching key, got {}", + tree.metrics().filter_queries() + ); + + assert!(tree.get("a", MAX_SEQNO)?.is_some()); + assert!(tree.get("c", MAX_SEQNO)?.is_some()); + + Ok(()) +} + +/// Tests that a key beyond all partition boundaries is correctly rejected. +/// +/// For keys beyond the table's key range, the tree/run selection layer +/// (e.g. `Run::get_for_key_cmp`) skips the table via a key-range overlap +/// check before `Table::get` (and thus before any bloom lookup). The unit +/// test in `table::tests` covers the `bloom_may_contain_key` `Ok(false)` +/// path in `Table::get` directly. +#[test_log::test] +fn partitioned_bloom_skip_beyond_partitions() -> lsm_tree::Result<()> { + use lsm_tree::{ + config::PinningPolicy, get_tmp_folder, AbstractTree, Config, SequenceNumberCounter, + MAX_SEQNO, + }; + + let folder = get_tmp_folder(); + let path = folder.path(); + + let seqno = SequenceNumberCounter::default(); + + let tree = Config::new(path, seqno.clone(), SequenceNumberCounter::default()) + .filter_block_partitioning_policy(PinningPolicy::all(true)) + .open()?; + + tree.insert("a", "val_a", seqno.next()); + tree.insert("b", "val_b", seqno.next()); + tree.flush_active_memtable(0)?; + + assert!(tree.get("z", MAX_SEQNO)?.is_none()); + assert!(tree.get("a", MAX_SEQNO)?.is_some()); + + Ok(()) +} + +/// Exercises bloom_may_contain_key through the merge pipeline +/// (resolve_merge_via_pipeline → TreeIter → bloom_passes → bloom_may_contain_key). +/// +/// With a merge operator, point reads go through the iterator pipeline where +/// bloom_key enables partition-aware filtering. Correctness of the merge +/// result (110 = merge(100, [10])) confirms the pipeline executes without +/// errors through the new bloom_may_contain_key code path. +/// +/// Note: io_skipped_by_filter is only incremented by Table::get, not by +/// bloom_passes in the pipeline path, so we assert correctness not metrics. +#[test_log::test] +fn partitioned_bloom_skip_merge_pipeline() -> lsm_tree::Result<()> { + use lsm_tree::{ + config::PinningPolicy, get_tmp_folder, AbstractTree, Config, SequenceNumberCounter, + MAX_SEQNO, + }; + + let folder = get_tmp_folder(); + let path = folder.path(); + + let seqno = SequenceNumberCounter::default(); + + let tree = Config::new(path, seqno.clone(), SequenceNumberCounter::default()) + .filter_block_partitioning_policy(PinningPolicy::all(true)) + .with_merge_operator(Some(std::sync::Arc::new(SumMerge))) + .open()?; + + // Table 1: base value for "counter" + tree.insert("counter", &100_i64.to_le_bytes(), seqno.next()); + tree.flush_active_memtable(0)?; + + // Table 2: keys that bracket "counter" so key_range_overlap passes, + // but bloom filter does NOT contain "counter" — bloom is the deciding filter. + tree.insert("aaa", &1_i64.to_le_bytes(), seqno.next()); + tree.insert("zzz", &2_i64.to_le_bytes(), seqno.next()); + tree.flush_active_memtable(0)?; + + // Merge operand in active memtable — triggers resolve_merge_via_pipeline + tree.merge("counter", 10_i64.to_le_bytes(), seqno.next()); + + let result = tree.get("counter", MAX_SEQNO)?; + assert!(result.is_some()); + + let value = i64::from_le_bytes(result.unwrap().as_ref().try_into().unwrap()); + assert_eq!(110, value, "merge(100, [10]) = 110"); + + Ok(()) +} + +/// Exercises bloom_may_contain_key with a full (non-partitioned) filter +/// through the merge pipeline — covers the delegation to bloom_may_contain_hash. +/// +/// Same note as above: pipeline bloom skips don't increment io_skipped_by_filter. +#[test_log::test] +fn full_filter_bloom_skip_merge_pipeline() -> lsm_tree::Result<()> { + use lsm_tree::{ + config::PinningPolicy, get_tmp_folder, AbstractTree, Config, SequenceNumberCounter, + MAX_SEQNO, + }; + + let folder = get_tmp_folder(); + let path = folder.path(); + + let seqno = SequenceNumberCounter::default(); + + let tree = Config::new(path, seqno.clone(), SequenceNumberCounter::default()) + .filter_block_partitioning_policy(PinningPolicy::all(false)) + .with_merge_operator(Some(std::sync::Arc::new(SumMerge))) + .open()?; + + tree.insert("counter", &100_i64.to_le_bytes(), seqno.next()); + tree.flush_active_memtable(0)?; + + // Keys that bracket "counter" so key_range_overlap passes, + // but bloom filter does NOT contain "counter". + tree.insert("aaa", &1_i64.to_le_bytes(), seqno.next()); + tree.insert("zzz", &2_i64.to_le_bytes(), seqno.next()); + tree.flush_active_memtable(0)?; + + tree.merge("counter", 10_i64.to_le_bytes(), seqno.next()); + + let result = tree.get("counter", MAX_SEQNO)?; + assert!(result.is_some()); + + let value = i64::from_le_bytes(result.unwrap().as_ref().try_into().unwrap()); + assert_eq!(110, value, "merge(100, [10]) = 110"); + + Ok(()) +}