diff --git a/src/table/data_block/iter.rs b/src/table/data_block/iter.rs index e8c605ba4..c8b4ba663 100644 --- a/src/table/data_block/iter.rs +++ b/src/table/data_block/iter.rs @@ -8,7 +8,7 @@ use crate::{ block::{Decoder, ParsedItem}, data_block::DataBlockParsedItem, }, - InternalValue, + InternalValue, SeqNo, }; /// The data block iterator handles double-ended scans over a data block @@ -34,15 +34,31 @@ impl<'a> Iter<'a> { true } - pub fn seek(&mut self, needle: &[u8]) -> bool { - // Find the restart interval whose head key is the last one strictly below `needle`. - // The decoder then performs a linear scan within that interval; we stop as soon as we - // reach a key ≥ needle. This minimizes parsing work while preserving correctness. - if !self - .decoder - .inner_mut() - .seek(|head_key, _| head_key < needle, false) - { + /// Seeks to the last restart interval whose head key is strictly below the + /// target `needle`, or equal to it with a seqno that is at least the given + /// snapshot boundary. + /// + /// Here `seqno` is a snapshot boundary: point reads return the first item + /// with `item.seqno < seqno`. Using the internal key ordering + /// (`user_key` ASC, `seqno` DESC), this skips restart intervals that can only + /// contain versions newer than the snapshot, so any visible version for + /// `needle` will be found within roughly one restart interval of the + /// resulting position. + pub fn seek_to_key_seqno(&mut self, needle: &[u8], seqno: SeqNo) -> bool { + self.decoder.inner_mut().seek( + |head_key, head_seqno| match head_key.cmp(needle) { + std::cmp::Ordering::Less => true, + std::cmp::Ordering::Equal => head_seqno >= seqno, + std::cmp::Ordering::Greater => false, + }, + false, + ) + } + + pub fn seek(&mut self, needle: &[u8], seqno: SeqNo) -> bool { + // Reuse the seqno-aware binary search from `seek_to_key_seqno`, then + // follow up with a linear scan to position at the exact key. + if !self.seek_to_key_seqno(needle, seqno) { return false; } @@ -75,9 +91,14 @@ impl<'a> Iter<'a> { } } - pub fn seek_upper(&mut self, needle: &[u8]) -> bool { - // Reverse-bound seek: position the high scanner at the first restart whose head key is - // ≤ needle, then walk backwards inside the interval until we find a key ≤ needle. + /// Reverse inclusive seek: position at the last key `<= needle`. + /// + /// `seqno` is accepted for API uniformity with the forward seek methods + /// ([`seek`], [`seek_exclusive`]) but is **intentionally unused** here. + /// Backward binary search cannot leverage seqno because intervals are + /// visited from the selected one toward index 0 — a tighter predicate + /// would skip intervals that may contain the visible version. + pub fn seek_upper(&mut self, needle: &[u8], _seqno: SeqNo) -> bool { if !self .decoder .inner_mut() @@ -112,15 +133,10 @@ impl<'a> Iter<'a> { } } - pub fn seek_exclusive(&mut self, needle: &[u8]) -> bool { - // Exclusive lower bound: identical to `seek`, except we must not yield entries equal to - // `needle`. We therefore keep consuming while keys compare equal and only stop once we - // observe a strictly greater key. - if !self - .decoder - .inner_mut() - .seek(|head_key, _| head_key < needle, false) - { + pub fn seek_exclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool { + // Exclusive lower bound: same seqno-aware binary search, but the linear + // scan below skips entries equal to `needle`. + if !self.seek_to_key_seqno(needle, seqno) { return false; } @@ -144,9 +160,11 @@ impl<'a> Iter<'a> { } } - pub fn seek_upper_exclusive(&mut self, needle: &[u8]) -> bool { - // Exclusive upper bound: mirror of `seek_upper`. We must not include entries equal to - // `needle`, so we consume equals from the high end until we see a strictly smaller key. + /// Reverse exclusive seek: position at the last key `< needle`. + /// + /// See [`seek_upper`] for why `seqno` is accepted but unused in reverse + /// seeks. + pub fn seek_upper_exclusive(&mut self, needle: &[u8], _seqno: SeqNo) -> bool { if !self .decoder .inner_mut() diff --git a/src/table/data_block/iter_test.rs b/src/table/data_block/iter_test.rs index 8ff8fcdc0..1086d7b65 100644 --- a/src/table/data_block/iter_test.rs +++ b/src/table/data_block/iter_test.rs @@ -5,7 +5,7 @@ mod tests { block::{BlockType, Header, ParsedItem}, Block, DataBlock, }, - Checksum, InternalValue, Slice, + Checksum, InternalValue, SeqNo, Slice, ValueType::{Tombstone, Value}, }; use test_log::test; @@ -71,8 +71,8 @@ mod tests { { let mut iter = data_block.iter(); - iter.seek(&10u64.to_be_bytes()); - iter.seek_upper(&110u64.to_be_bytes()); + iter.seek(&10u64.to_be_bytes(), SeqNo::MAX); + iter.seek_upper(&110u64.to_be_bytes(), SeqNo::MAX); let iter = iter.map(|x| x.materialize(data_block.as_slice())); assert_eq!( @@ -83,8 +83,8 @@ mod tests { { let mut iter: crate::table::data_block::Iter<'_> = data_block.iter(); - iter.seek(&10u64.to_be_bytes()); - iter.seek_upper(&110u64.to_be_bytes()); + iter.seek(&10u64.to_be_bytes(), SeqNo::MAX); + iter.seek_upper(&110u64.to_be_bytes(), SeqNo::MAX); let iter = iter.map(|x| x.materialize(data_block.as_slice())); assert_eq!( @@ -95,8 +95,8 @@ mod tests { { let mut iter = data_block.iter(); - iter.seek(&10u64.to_be_bytes()); - iter.seek_upper(&110u64.to_be_bytes()); + iter.seek(&10u64.to_be_bytes(), SeqNo::MAX); + iter.seek_upper(&110u64.to_be_bytes(), SeqNo::MAX); let mut iter = iter.map(|item| item.materialize(&data_block.inner.data)); let mut count = 0; @@ -145,8 +145,8 @@ mod tests { { let mut iter = data_block.iter(); - iter.seek(&10u64.to_be_bytes()); - iter.seek_upper(&109u64.to_be_bytes()); + iter.seek(&10u64.to_be_bytes(), SeqNo::MAX); + iter.seek_upper(&109u64.to_be_bytes(), SeqNo::MAX); let iter = iter.map(|x| x.materialize(data_block.as_slice())); assert_eq!( @@ -157,8 +157,8 @@ mod tests { { let mut iter: crate::table::data_block::Iter<'_> = data_block.iter(); - iter.seek(&10u64.to_be_bytes()); - iter.seek_upper(&109u64.to_be_bytes()); + iter.seek(&10u64.to_be_bytes(), SeqNo::MAX); + iter.seek_upper(&109u64.to_be_bytes(), SeqNo::MAX); let iter = iter.map(|x| x.materialize(data_block.as_slice())); assert_eq!( @@ -169,8 +169,8 @@ mod tests { { let mut iter = data_block.iter(); - iter.seek(&10u64.to_be_bytes()); - iter.seek_upper(&109u64.to_be_bytes()); + iter.seek(&10u64.to_be_bytes(), SeqNo::MAX); + iter.seek_upper(&109u64.to_be_bytes(), SeqNo::MAX); let mut iter = iter.map(|item| item.materialize(&data_block.inner.data)); let mut count = 0; @@ -218,8 +218,8 @@ mod tests { }); let mut iter = data_block.iter(); - iter.seek(&5u64.to_be_bytes()); - iter.seek_upper(&9u64.to_be_bytes()); + iter.seek(&5u64.to_be_bytes(), SeqNo::MAX); + iter.seek_upper(&9u64.to_be_bytes(), SeqNo::MAX); let mut iter = iter.map(|item| item.materialize(&data_block.inner.data)); let mut count = 0; @@ -345,7 +345,7 @@ mod tests { let mut iter = data_block.iter(); - assert!(iter.seek_upper(b"d"), "should seek"); + assert!(iter.seek_upper(b"d", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -386,7 +386,7 @@ mod tests { { let mut iter = data_block.iter(); - assert!(!iter.seek(b"a"), "should not seek"); + assert!(!iter.seek(b"a", SeqNo::MAX), "should not seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -398,7 +398,7 @@ mod tests { { let mut iter = data_block.iter(); - assert!(!iter.seek_upper(b"g"), "should not seek"); + assert!(!iter.seek_upper(b"g", SeqNo::MAX), "should not seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -410,7 +410,7 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek_upper(b"b"), "should seek"); + assert!(iter.seek_upper(b"b", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -425,7 +425,7 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek(b"f"), "should seek"); + assert!(iter.seek(b"f", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -466,8 +466,8 @@ mod tests { let mut iter = data_block.iter(); - assert!(iter.seek(b"c"), "should seek"); - assert!(iter.seek_upper(b"d"), "should seek"); + assert!(iter.seek(b"c", SeqNo::MAX), "should seek"); + assert!(iter.seek_upper(b"d", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -507,7 +507,7 @@ mod tests { let mut iter = data_block.iter(); - assert!(iter.seek_upper(b"b"), "should seek"); + assert!(iter.seek_upper(b"b", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -548,8 +548,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek(b"d"), "should seek"); - assert!(iter.seek_upper(b"d"), "should seek"); + assert!(iter.seek(b"d", SeqNo::MAX), "should seek"); + assert!(iter.seek_upper(b"d", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -564,8 +564,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek_upper(b"d"), "should seek"); - assert!(iter.seek(b"d"), "should seek"); + assert!(iter.seek_upper(b"d", SeqNo::MAX), "should seek"); + assert!(iter.seek(b"d", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -580,8 +580,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek(b"d"), "should seek"); - assert!(iter.seek_upper(b"d"), "should seek"); + assert!(iter.seek(b"d", SeqNo::MAX), "should seek"); + assert!(iter.seek_upper(b"d", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -602,8 +602,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek_upper(b"d"), "should seek"); - assert!(iter.seek(b"d"), "should seek"); + assert!(iter.seek_upper(b"d", SeqNo::MAX), "should seek"); + assert!(iter.seek(b"d", SeqNo::MAX), "should seek"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -651,8 +651,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek(b"f"), "should seek"); - iter.seek_upper(b"e"); + assert!(iter.seek(b"f", SeqNo::MAX), "should seek"); + iter.seek_upper(b"e", SeqNo::MAX); let mut iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -662,8 +662,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek(b"f"), "should seek"); - iter.seek_upper(b"e"); + assert!(iter.seek(b"f", SeqNo::MAX), "should seek"); + iter.seek_upper(b"e", SeqNo::MAX); let mut iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -673,8 +673,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek_upper(b"e"), "should seek"); - iter.seek(b"f"); + assert!(iter.seek_upper(b"e", SeqNo::MAX), "should seek"); + iter.seek(b"f", SeqNo::MAX); let mut iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -684,8 +684,8 @@ mod tests { { let mut iter = data_block.iter(); - assert!(iter.seek_upper(b"e"), "should seek"); - iter.seek(b"f"); + assert!(iter.seek_upper(b"e", SeqNo::MAX), "should seek"); + iter.seek(b"f", SeqNo::MAX); let mut iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -721,7 +721,7 @@ mod tests { let mut iter = data_block.iter(); - assert!(iter.seek(b"b"), "should seek correctly"); + assert!(iter.seek(b"b", SeqNo::MAX), "should seek correctly"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -758,7 +758,7 @@ mod tests { let mut iter = data_block.iter(); - assert!(iter.seek(b"d"), "should seek correctly"); + assert!(iter.seek(b"d", SeqNo::MAX), "should seek correctly"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -798,7 +798,7 @@ mod tests { let mut iter = data_block.iter(); - assert!(iter.seek(b"f"), "should seek correctly"); + assert!(iter.seek(b"f", SeqNo::MAX), "should seek correctly"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -838,7 +838,7 @@ mod tests { let mut iter = data_block.iter(); - assert!(!iter.seek(b"a"), "should not find exact match"); + assert!(!iter.seek(b"a", SeqNo::MAX), "should not find exact match"); let iter = iter.map(|item| item.materialize(&data_block.inner.data)); @@ -875,7 +875,7 @@ mod tests { let mut iter = data_block.iter(); - assert!(!iter.seek(b"g"), "should not find exact match"); + assert!(!iter.seek(b"g", SeqNo::MAX), "should not find exact match"); assert!(iter.next().is_none(), "should not collect any items"); } @@ -1270,11 +1270,151 @@ mod tests { assert_eq!(data_block.iter().count(), items.len()); let mut iter = data_block.iter(); - iter.seek(&[0]); - iter.seek_upper(&[0]); + iter.seek(&[0], SeqNo::MAX); + iter.seek_upper(&[0], SeqNo::MAX); assert_eq!(0, iter.count()); Ok(()) } + + /// Verifies that `seek(needle, seqno)` with a seqno-aware predicate still + /// positions the iterator correctly when a key has many versions spanning + /// multiple restart intervals. + #[test] + fn data_block_seek_seqno_aware() -> crate::Result<()> { + // Build a block where key "b" has 10 versions (seqno 10..1) with + // restart_interval=2, so versions span 5 restart intervals. + let mut items = Vec::new(); + for seqno in (1..=10).rev() { + items.push(InternalValue::from_components(b"b", b"", seqno, Value)); + } + + for restart_interval in [1, 2, 3, 5] { + let bytes = DataBlock::encode_into_vec(&items, restart_interval, 0.0)?; + let data_block = DataBlock::new(Block { + data: bytes.into(), + header: Header { + block_type: BlockType::Data, + checksum: Checksum::from_raw(0), + data_length: 0, + uncompressed_length: 0, + }, + }); + + // With SeqNo::MAX, seek behaves like key-only (no seqno filtering). + { + let mut iter = data_block.iter(); + assert!( + iter.seek(b"b", SeqNo::MAX), + "should find key with MAX seqno" + ); + let entry = iter.next().expect("should have entry"); + let materialized = entry.materialize(&data_block.inner.data); + assert_eq!(materialized.key.user_key.as_ref(), b"b"); + // First version returned is the newest (seqno 10). + assert_eq!(materialized.key.seqno, 10); + } + + // With a specific snapshot seqno, the binary search lands on the + // restart interval containing (or nearest to) the target seqno. + // The first entry returned is the head of that interval. + { + let mut iter = data_block.iter(); + assert!(iter.seek(b"b", 5), "should find key with snapshot seqno 5"); + let entry = iter.next().expect("should have entry"); + let materialized = entry.materialize(&data_block.inner.data); + assert_eq!(materialized.key.user_key.as_ref(), b"b"); + // The landing entry's seqno must be >= the snapshot boundary, + // proving the seqno-aware predicate skipped past older intervals. + assert!( + materialized.key.seqno >= 5, + "restart_interval={restart_interval}: landing seqno {} should be >= snapshot 5", + materialized.key.seqno, + ); + // With restart_interval=1 each entry is its own interval, so + // the predicate lands exactly on the target seqno — a key-only + // seek would land on seqno 10 instead. + if restart_interval == 1 { + assert_eq!( + materialized.key.seqno, 5, + "with restart_interval=1, seqno-aware seek must land exactly on target" + ); + } + } + } + + Ok(()) + } + + /// Verifies that `seek` with seqno still works correctly when the block + /// contains multiple distinct keys with versions. + #[test] + fn data_block_seek_seqno_aware_mixed_keys() -> crate::Result<()> { + let items = vec![ + InternalValue::from_components(b"a", b"", 10, Value), + InternalValue::from_components(b"a", b"", 5, Value), + InternalValue::from_components(b"b", b"", 10, Value), + InternalValue::from_components(b"b", b"", 7, Value), + InternalValue::from_components(b"b", b"", 3, Value), + InternalValue::from_components(b"c", b"", 10, Value), + ]; + + for restart_interval in [1, 2, 3] { + let bytes = DataBlock::encode_into_vec(&items, restart_interval, 0.0)?; + let data_block = DataBlock::new(Block { + data: bytes.into(), + header: Header { + block_type: BlockType::Data, + checksum: Checksum::from_raw(0), + data_length: 0, + uncompressed_length: 0, + }, + }); + + // Forward seek with seqno narrows restart interval selection. + { + let mut iter = data_block.iter(); + assert!(iter.seek(b"b", 5), "should find b at snapshot 5"); + let entry = iter.next().expect("should have entry"); + let mat = entry.materialize(&data_block.inner.data); + assert_eq!(mat.key.user_key.as_ref(), b"b"); + // Landing seqno must be >= snapshot boundary. + assert!( + mat.key.seqno >= 5, + "restart_interval={restart_interval}: seqno {} should be >= 5", + mat.key.seqno, + ); + // With restart_interval=1, seqno-aware seek lands on (b,7) — + // the last head with seqno >= 5 — whereas key-only would land + // on (b,10). + if restart_interval == 1 { + assert_eq!(mat.key.seqno, 7); + } + } + + // Exclusive forward seek with seqno. + { + let mut iter = data_block.iter(); + assert!( + iter.seek_exclusive(b"b", 5), + "should find entry > b at snapshot 5" + ); + let entry = iter.next().expect("should have entry"); + let mat = entry.materialize(&data_block.inner.data); + assert_eq!(mat.key.user_key.as_ref(), b"c"); + } + + // Upper seek still works with seqno (predicate unchanged for backward). + { + let mut iter = data_block.iter(); + assert!(iter.seek_upper(b"b", 5), "should find upper bound b"); + let entry = iter.next_back().expect("should have entry"); + let mat = entry.materialize(&data_block.inner.data); + assert_eq!(mat.key.user_key.as_ref(), b"b"); + } + } + + Ok(()) + } } diff --git a/src/table/data_block/mod.rs b/src/table/data_block/mod.rs index 7104ccfaa..4a906bba2 100644 --- a/src/table/data_block/mod.rs +++ b/src/table/data_block/mod.rs @@ -407,7 +407,6 @@ impl DataBlock { .map(|reader| reader.bucket_count()) } - // TODO: handle seqno more nicely (make Key generic, so we can do binary search over (key, seqno)) #[must_use] pub fn point_read(&self, needle: &[u8], seqno: SeqNo) -> Option { let iter = if let Some(hash_index_reader) = self.get_hash_index_reader() { @@ -416,10 +415,10 @@ impl DataBlock { return None; } MARKER_CONFLICT => { - // NOTE: Fallback to binary search + // NOTE: Fallback to seqno-aware binary search let mut iter = self.iter(); - if !iter.seek(needle) { + if !iter.seek_to_key_seqno(needle, seqno) { return None; } @@ -437,8 +436,9 @@ impl DataBlock { } else { let mut iter = self.iter(); - // NOTE: Fallback to binary search - if !iter.seek(needle) { + // NOTE: Seqno-aware binary search reduces linear scanning by skipping most + // restart intervals that contain only versions newer than the target seqno + if !iter.seek_to_key_seqno(needle, seqno) { return None; } @@ -449,14 +449,14 @@ impl DataBlock { for item in iter { match item.compare_key(needle, &self.inner.data) { std::cmp::Ordering::Greater => { - // We are before our searched key/seqno + // We are past our searched key return None; } std::cmp::Ordering::Equal => { // If key is same as needle, check sequence number } std::cmp::Ordering::Less => { - // We are past our searched key + // We are before our searched key continue; } } @@ -1233,4 +1233,194 @@ mod tests { Ok(()) } + + #[test] + fn data_block_point_read_seqno_aware_seek() -> crate::Result<()> { + // Key "a" with seqno 5,4,3,2,1 — point_read("a", seqno=3) + // returns the first version with seqno < 3, i.e., v2 ("a2") + let items = [ + InternalValue::from_components(b"a", b"a5", 5, Value), + InternalValue::from_components(b"a", b"a4", 4, Value), + InternalValue::from_components(b"a", b"a3", 3, Value), + InternalValue::from_components(b"a", b"a2", 2, Value), + InternalValue::from_components(b"a", b"a1", 1, Value), + ]; + + // Test across various restart intervals: at restart_interval=1 every item + // is a restart head so binary search lands exactly; at larger intervals it + // may scan within the restart range but must still return the correct version. + for restart_interval in 1..=4 { + let bytes = DataBlock::encode_into_vec(&items, restart_interval, 0.0)?; + + let data_block = DataBlock::new(Block { + data: bytes.into(), + header: Header { + block_type: BlockType::Data, + checksum: Checksum::from_raw(0), + data_length: 0, + uncompressed_length: 0, + }, + }); + + // seqno=4 → should see version with seqno=3 (first with seqno < 4) + assert_eq!( + Some(items[2].clone()), + data_block.point_read(b"a", 4), + "restart_interval={restart_interval}: seqno=4 should return v3", + ); + + // seqno=3 → should see version with seqno=2 + assert_eq!( + Some(items[3].clone()), + data_block.point_read(b"a", 3), + "restart_interval={restart_interval}: seqno=3 should return v2", + ); + + // seqno=6 → should see latest version (seqno=5) + assert_eq!( + Some(items[0].clone()), + data_block.point_read(b"a", 6), + "restart_interval={restart_interval}: seqno=6 should return v5", + ); + + // seqno=1 → no visible version (all seqno >= 1) + assert!( + data_block.point_read(b"a", 1).is_none(), + "restart_interval={restart_interval}: seqno=1 should return None", + ); + + // Non-existent key + assert!( + data_block.point_read(b"b", SeqNo::MAX).is_none(), + "restart_interval={restart_interval}: key 'b' should not exist", + ); + } + + Ok(()) + } + + #[test] + fn data_block_point_read_seqno_aware_seek_mixed_keys() -> crate::Result<()> { + // Multiple keys with multiple versions + let items = [ + InternalValue::from_components(b"a", b"a3", 3, Value), + InternalValue::from_components(b"a", b"a2", 2, Value), + InternalValue::from_components(b"a", b"a1", 1, Value), + InternalValue::from_components(b"b", b"b5", 5, Value), + InternalValue::from_components(b"b", b"b4", 4, Value), + InternalValue::from_components(b"b", b"b3", 3, Value), + InternalValue::from_components(b"b", b"b2", 2, Value), + InternalValue::from_components(b"b", b"b1", 1, Value), + InternalValue::from_components(b"c", b"c1", 1, Value), + ]; + + for restart_interval in 1..=4 { + let bytes = DataBlock::encode_into_vec(&items, restart_interval, 0.0)?; + + let data_block = DataBlock::new(Block { + data: bytes.into(), + header: Header { + block_type: BlockType::Data, + checksum: Checksum::from_raw(0), + data_length: 0, + uncompressed_length: 0, + }, + }); + + // Read "b" at seqno=4 → should return version with seqno=3 + assert_eq!( + Some(items[5].clone()), + data_block.point_read(b"b", 4), + "restart_interval={restart_interval}: b@4 should return b3", + ); + + // Read "a" at seqno=2 → should return version with seqno=1 + assert_eq!( + Some(items[2].clone()), + data_block.point_read(b"a", 2), + "restart_interval={restart_interval}: a@2 should return a1", + ); + + // Read "c" at seqno=2 → should return version with seqno=1 + assert_eq!( + Some(items[8].clone()), + data_block.point_read(b"c", 2), + "restart_interval={restart_interval}: c@2 should return c1", + ); + } + + Ok(()) + } + + #[test] + fn data_block_point_read_seqno_aware_seek_hash_conflict() -> crate::Result<()> { + // Multiple versions of the same key with a hash index enabled. + // Duplicate user keys hash to the same bucket, producing MARKER_CONFLICT, + // which forces point_read through the seek_to_key_seqno fallback path. + let items = [ + InternalValue::from_components(b"a", b"a5", 5, Value), + InternalValue::from_components(b"a", b"a4", 4, Value), + InternalValue::from_components(b"a", b"a3", 3, Value), + InternalValue::from_components(b"a", b"a2", 2, Value), + InternalValue::from_components(b"a", b"a1", 1, Value), + ]; + + for restart_interval in 1..=4 { + let bytes = DataBlock::encode_into_vec(&items, restart_interval, 1.33)?; + + let data_block = DataBlock::new(Block { + data: bytes.into(), + header: Header { + block_type: BlockType::Data, + checksum: Checksum::from_raw(0), + data_length: 0, + uncompressed_length: 0, + }, + }); + + // Verify hash index is present and the duplicate key triggers conflict + assert!( + data_block + .hash_bucket_count() + .expect("should have built hash index") + > 0, + "restart_interval={restart_interval}: hash index should be built", + ); + + // seqno=4 -> first version with seqno < 4, i.e. seqno=3 + assert_eq!( + Some(items[2].clone()), + data_block.point_read(b"a", 4), + "restart_interval={restart_interval}: seqno=4 should return v3 via conflict fallback", + ); + + // seqno=3 -> seqno=2 + assert_eq!( + Some(items[3].clone()), + data_block.point_read(b"a", 3), + "restart_interval={restart_interval}: seqno=3 should return v2 via conflict fallback", + ); + + // seqno=6 -> latest (seqno=5) + assert_eq!( + Some(items[0].clone()), + data_block.point_read(b"a", 6), + "restart_interval={restart_interval}: seqno=6 should return v5 via conflict fallback", + ); + + // seqno=1 -> no visible version + assert!( + data_block.point_read(b"a", 1).is_none(), + "restart_interval={restart_interval}: seqno=1 should return None via conflict fallback", + ); + + // Non-existent key + assert!( + data_block.point_read(b"z", SeqNo::MAX).is_none(), + "restart_interval={restart_interval}: key 'z' should not exist", + ); + } + + Ok(()) + } } diff --git a/src/table/iter.rs b/src/table/iter.rs index b03b69da1..3809caa9b 100644 --- a/src/table/iter.rs +++ b/src/table/iter.rs @@ -37,20 +37,20 @@ self_cell!( ); impl OwnedDataBlockIter { - fn seek_lower_inclusive(&mut self, needle: &[u8], _seqno: SeqNo) -> bool { - self.with_dependent_mut(|_, m| m.seek(needle /* TODO: , seqno */)) + fn seek_lower_inclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool { + self.with_dependent_mut(|_, m| m.seek(needle, seqno)) } - fn seek_upper_inclusive(&mut self, needle: &[u8], _seqno: SeqNo) -> bool { - self.with_dependent_mut(|_, m| m.seek_upper(needle /* TODO: , seqno */)) + fn seek_upper_inclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool { + self.with_dependent_mut(|_, m| m.seek_upper(needle, seqno)) } - fn seek_lower_exclusive(&mut self, needle: &[u8], _seqno: SeqNo) -> bool { - self.with_dependent_mut(|_, m| m.seek_exclusive(needle /* TODO: , seqno */)) + fn seek_lower_exclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool { + self.with_dependent_mut(|_, m| m.seek_exclusive(needle, seqno)) } - fn seek_upper_exclusive(&mut self, needle: &[u8], _seqno: SeqNo) -> bool { - self.with_dependent_mut(|_, m| m.seek_upper_exclusive(needle /* TODO: , seqno */)) + fn seek_upper_exclusive(&mut self, needle: &[u8], seqno: SeqNo) -> bool { + self.with_dependent_mut(|_, m| m.seek_upper_exclusive(needle, seqno)) } pub fn seek_lower_bound(&mut self, bound: &Bound, seqno: SeqNo) -> bool {