Skip to content
Merged
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
0c95794
feat: add merge operators for commutative LSM operations
polaz Mar 18, 2026
0ea3ea8
fix(merge): correct MVCC safety and operand collection in merge paths
polaz Mar 18, 2026
62ce167
test(merge): blob tree merge, sealed memtable resolution, edge cases
polaz Mar 18, 2026
a42b0c5
fix(merge): correct reverse merge trigger, clarify Indirection handling
polaz Mar 18, 2026
0431741
fix(merge): remove coverage(off) from trait (unsupported target)
polaz Mar 18, 2026
bb4ea63
fix(merge): prevent operand loss in compaction GC and table scan
polaz Mar 18, 2026
83eb390
docs(merge): document idempotency contract for MergeOperator
polaz Mar 18, 2026
0aa9e95
refactor(merge): avoid Arc clone in forward merge, add #[must_use], r…
polaz Mar 18, 2026
4bdac68
fix(merge): handle Indirection base in compaction merge resolution
polaz Mar 18, 2026
bc060b3
fix(merge): do not drain entries after Indirection in merge resolution
polaz Mar 18, 2026
c15e08c
docs(merge): qualify intra-doc links for cross-module references
polaz Mar 18, 2026
82ab1e7
fix(merge): pass Indirection through as base, document lazy alloc lim…
polaz Mar 18, 2026
60dee44
fix(merge): reject merge when base is Indirection (blob pointer)
polaz Mar 18, 2026
4e54100
fix(merge): correct Indirection handling in merge resolution paths
polaz Mar 21, 2026
f85ad44
fix(lint): remove orphaned expect(clippy::expect_used) attribute
polaz Mar 21, 2026
7a8a424
docs(merge): add TODO for range tombstone interaction
polaz Mar 21, 2026
0286fe9
test(merge): add unit and integration tests for edge cases
polaz Mar 21, 2026
54aa9f7
feat(merge): add range tombstone awareness to merge resolution
polaz Mar 21, 2026
1e256e9
test(merge): BlobTree indirection fallback, seqno=0 boundary, RT+flush
polaz Mar 21, 2026
33452ed
fix(merge): pending buffer pipeline bypass, partial merge type, RT he…
polaz Mar 21, 2026
e6fef94
fix(merge): RT head suppression in reverse None branch, targeted tests
polaz Mar 21, 2026
fd74f2c
test(merge): memtable get_all_for_key edge cases, RT sealed/cross-layer
polaz Mar 21, 2026
1eed669
fix(test): use Memtable::new(0) and flush-based RT sealed test
polaz Mar 21, 2026
ea3b394
docs(merge): partial merge keeps MergeOperand, not Value
polaz Mar 21, 2026
f704bfa
fix(merge): complete doctest, correct insert arity, deduplicate doc
polaz Mar 21, 2026
5e86556
fix(merge): skip seqno zeroing for preserved MergeOperands
polaz Mar 21, 2026
1922ec5
fix(lint): expect too_many_lines on CompactionStream::next
polaz Mar 21, 2026
561b63e
docs(merge): hide doctest setup lines from codecov diff
polaz Mar 21, 2026
0985389
refactor(merge): simplify expired-tail drain, no MergeOperand guard n…
polaz Mar 21, 2026
9fcbb4d
perf(merge): reuse key_entries buffer across next_back() calls
polaz Mar 21, 2026
a016f2f
fix(merge): per-source RT cutoffs, seqno=0 sentinel, reusable buffer
polaz Mar 21, 2026
9d9091c
fix(lint): remove unused mut, unfulfilled too_many_lines expect
polaz Mar 21, 2026
f4ac637
fix(merge): partial merge seqno zeroing, RT on no-operator path, reve…
polaz Mar 21, 2026
86806b7
fix(lint): CompactionStream::next under 100 lines
polaz Mar 21, 2026
35bbc19
fix(lint): restore missing brace, keep CompactionStream::next at 100 …
polaz Mar 21, 2026
289ed56
fix(merge): add RT suppression check to multi_get no-operator path
polaz Mar 21, 2026
05be183
perf(merge): avoid clone in get match, O(1) pending pop, remove unuse…
polaz Mar 21, 2026
d3fe671
refactor(merge): extract resolve_or_passthrough for get/multi_get
polaz Mar 21, 2026
6c28eaa
refactor(merge): use &dyn MergeOperator in resolve_or_passthrough
polaz Mar 21, 2026
4b246ed
perf(merge): drain key_entries_buf instead of mem::take
polaz Mar 21, 2026
def410c
fix(merge): document InternalKey ordering, pass TempDir by reference
polaz Mar 21, 2026
501be93
docs(merge): note seek-based optimization opportunity in range scan
polaz Mar 21, 2026
622091f
fix(merge): only preserve MergeOperand type for Value replacement
polaz Mar 22, 2026
8b73bbf
docs(merge): document partial merge re-stability and panic propagation
polaz Mar 22, 2026
0c394d1
test(merge): disk range scan RT, point lookup base, tombstone, cross-…
polaz Mar 22, 2026
ebca11f
fix(error): remove duplicate MergeOperator enum variant
polaz Mar 22, 2026
5799df9
docs(merge): document L0 overlap limitation in resolve_merge_get
polaz Mar 22, 2026
5944026
docs(merge): correct RT suppression semantics in test and flush path
polaz Mar 22, 2026
18d9b1f
fix(merge): collect all disk entries before processing in resolve_mer…
polaz Mar 22, 2026
c7b6d0c
fix(lint): remove unused key_hash and dead found_base assignments
polaz Mar 22, 2026
5592216
perf(merge): gate RT check on merge_operator presence in next()
polaz Mar 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion src/abstract_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ pub trait AbstractTree: sealed::Sealed {
.map(|mt| mt.iter().map(Ok))
.collect::<Vec<_>>(),
);
let stream = CompactionStream::new(merger, seqno_threshold);
let stream = CompactionStream::new(merger, seqno_threshold)
.with_merge_operator(self.tree_config().merge_operator.clone());
Comment thread
coderabbitai[bot] marked this conversation as resolved.

drop(version_history);

Expand Down Expand Up @@ -700,6 +701,41 @@ pub trait AbstractTree: sealed::Sealed {
/// Will return `Err` if an IO error occurs.
fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64);

/// Writes a merge operand for a key.
///
/// The operand is stored as a partial update that will be combined with
/// other operands and/or a base value via the configured [`crate::MergeOperator`]
/// during reads and compaction.
///
/// Returns the added item's size and new size of the memtable.
///
/// # Examples
///
/// ```
/// # let folder = tempfile::tempdir()?;
/// # use lsm_tree::{AbstractTree, Config, MergeOperator, UserValue};
/// # use std::sync::Arc;
/// # struct SumMerge;
/// # impl MergeOperator for SumMerge {
/// # fn merge(&self, _key: &[u8], base: Option<&[u8]>, operands: &[&[u8]]) -> lsm_tree::Result<UserValue> {
/// # let mut sum: i64 = base.map_or(0, |b| i64::from_le_bytes(b.try_into().unwrap()));
/// # for op in operands { sum += i64::from_le_bytes((*op).try_into().unwrap()); }
/// # Ok(sum.to_le_bytes().to_vec().into())
/// # }
/// # }
/// # let tree = Config::new(folder, Default::default(), Default::default())
/// # .with_merge_operator(Some(Arc::new(SumMerge)))
/// # .open()?;
/// tree.merge("counter", 1_i64.to_le_bytes(), 0);
/// # Ok::<(), lsm_tree::Error>(())
/// ```
fn merge<K: Into<UserKey>, V: Into<UserValue>>(
&self,
key: K,
operand: V,
seqno: SeqNo,
) -> (u64, u64);

/// Removes an item from the tree.
///
/// The tombstone marker of this delete operation will vanish when it
Expand Down
17 changes: 13 additions & 4 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ impl AbstractTree for BlobTree {
&range,
seqno,
index,
None, // BlobTree does not use merge operators for prefix scans
prefix_hash,
)
.map(move |kv| {
Expand All @@ -277,15 +278,14 @@ impl AbstractTree for BlobTree {
let tree = self.clone();

Box::new(
crate::Tree::create_internal_range(super_version.clone(), &range, seqno, index).map(
move |kv| {
crate::Tree::create_internal_range(super_version.clone(), &range, seqno, index, None)
.map(move |kv| {
IterGuardImpl::Blob(Guard {
tree: tree.clone(),
version: super_version.version.clone(),
kv,
})
},
),
}),
)
}

Expand Down Expand Up @@ -661,6 +661,15 @@ impl AbstractTree for BlobTree {
.collect()
}

fn merge<K: Into<UserKey>, V: Into<UserValue>>(
&self,
key: K,
operand: V,
seqno: SeqNo,
) -> (u64, u64) {
self.index.merge(key, operand, seqno)
}
Comment thread
polaz marked this conversation as resolved.

fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
self.index.remove(key, seqno)
}
Expand Down
4 changes: 3 additions & 1 deletion src/compaction/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,9 @@ impl<'a> ItemAccessor<'a> {
/// This method will return an error if blob retrieval fails.
pub fn value(&self) -> crate::Result<UserValue> {
match self.item.key.value_type {
crate::ValueType::Value => Ok(self.item.value.clone()),
// MergeOperand: return raw operand bytes so filters can inspect them.
// The compaction pipeline preserves MergeOperand type on Replace.
crate::ValueType::Value | crate::ValueType::MergeOperand => Ok(self.item.value.clone()),
crate::ValueType::Indirection => {
Comment thread
polaz marked this conversation as resolved.
Comment thread
polaz marked this conversation as resolved.
// Resolve and read the value from a blob file.
let mut reader = &self.item.value[..];
Expand Down
Loading
Loading