feat: merge operators for commutative LSM operations#28
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds persisted MergeOperand value type, a public MergeOperator trait, and end-to-end merge support: merge-aware point-reads, MVCC and range iteration, compaction merging, memtable helpers, config wiring, an AbstractTree::merge API, error variant, and comprehensive tests. Changes
Sequence DiagramssequenceDiagram
participant Client
participant Tree as AbstractTree
participant Mem as Memtable
participant SST as SSTables
participant MergeOp as MergeOperator
Client->>Tree: get(key, seqno)
Tree->>Mem: scan memtables (new→old)
Tree->>SST: scan SST runs (new→old)
Mem-->>Tree: InternalValue(s)
SST-->>Tree: InternalValue(s)
alt MergeOperator configured
Tree->>Tree: collect operands (oldest→newest) until boundary (Value/Tombstone/Indirection/RT)
Tree->>MergeOp: merge(key, base_opt, operands)
MergeOp-->>Tree: UserValue
Tree-->>Client: merged value (Value)
else
Tree-->>Client: return raw operand/value bytes
end
sequenceDiagram
participant Client
participant Mvcc as MvccStream
participant Source as Underlying Iterator
participant MergeOp as MergeOperator
Client->>Mvcc: next()
Mvcc->>Source: next_item()
Source-->>Mvcc: InternalValue (MergeOperand)
alt Merge operator present & not RT-suppressed
Mvcc->>Mvcc: collect same-key entries until boundary
Mvcc->>MergeOp: merge(key, base_opt, operands_chronological)
MergeOp-->>Mvcc: merged UserValue
Mvcc-->>Client: synthesized InternalValue(Value)
else
Mvcc-->>Client: MergeOperand as-is
end
sequenceDiagram
participant Compaction as CompactionStream
participant Source as SourceIterator
participant MergeOp as MergeOperator
participant Dest as Compaction Output
Compaction->>Source: iterate items
Source-->>Compaction: InternalValues (grouped by user-key)
loop per key group
alt merge_operator configured AND below GC threshold AND boundary found
Compaction->>Compaction: collect operands + base
Compaction->>MergeOp: merge(key, base_opt, operands)
MergeOp-->>Compaction: merged UserValue
Compaction->>Dest: emit single Value (maybe zero seqno)
else
Compaction->>Dest: emit items unchanged
end
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~65 minutes Possibly related issues
Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/compaction/filter.rs (1)
142-169:⚠️ Potential issue | 🟡 MinorFix Clippy error: merge identical match arms.
The pipeline failure indicates that match arms for
Value(line 144) andMergeOperand(line 165) have identical bodies. This causesclippy -D warningsto fail.🔧 Proposed fix to merge identical arms
pub fn value(&self) -> crate::Result<UserValue> { match self.item.key.value_type { - crate::ValueType::Value => Ok(self.item.value.clone()), + crate::ValueType::Value | crate::ValueType::MergeOperand => { + Ok(self.item.value.clone()) + } crate::ValueType::Indirection => { // Resolve and read the value from a blob file. let mut reader = &self.item.value[..]; let indirection = BlobIndirection::decode_from(&mut reader)?; let vhandle = indirection.vhandle; let value = self .shared .get_indirect_value(&self.item.key.user_key, &vhandle)?; if let Some(value) = value { Ok(value) } else { log::error!( "failed to read referenced blob file during execution of compaction filter. key: {:?}, vptr: {indirection:?}", self.item.key, ); Err(crate::Error::Unrecoverable) } } - crate::ValueType::MergeOperand => Ok(self.item.value.clone()), crate::ValueType::WeakTombstone | crate::ValueType::Tombstone => { unreachable!("tombstones are filtered out before calling filter") } } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/compaction/filter.rs` around lines 142 - 169, The match in value() has identical bodies for crate::ValueType::Value and crate::ValueType::MergeOperand; combine those two variants into one arm (e.g., crate::ValueType::Value | crate::ValueType::MergeOperand) that returns Ok(self.item.value.clone()) to satisfy Clippy, leaving the Indirection and tombstone arms unchanged and keeping the BlobIndirection branch using shared.get_indirect_value as-is.
🧹 Nitpick comments (1)
src/mvcc_stream.rs (1)
90-103: Consider makingresolve_merge_singlean associated function.The pipeline reports unused
selfparameter. Since this method only uses themerge_opargument passed in, it could be an associated function. However, keeping it as a method maintains API consistency withresolve_merge_forwardandresolve_merge_buffered.To silence the lint while keeping the method pattern, you can use
&selfand referenceself.merge_operatorinternally instead of passingmerge_opas a parameter:♻️ Option 1: Use self.merge_operator internally
fn resolve_merge_single( &self, entry: &InternalValue, - merge_op: &dyn MergeOperator, ) -> crate::Result<InternalValue> { + let merge_op = self.merge_operator.as_ref() + .ok_or(crate::Error::Unrecoverable)?; let operand_refs: Vec<&[u8]> = vec![entry.value.as_ref()]; - let merged = merge_op.merge(&entry.key.user_key, None, &operand_refs)?; + let merged = merge_op.merge(&entry.key.user_key, None, &operand_refs)?;♻️ Option 2: Allow the lint with justification
+ #[expect(clippy::unused_self, reason = "maintains API consistency with other resolve methods")] fn resolve_merge_single(🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/mvcc_stream.rs` around lines 90 - 103, The method resolve_merge_single currently has an unused self; change it to use the instance merge operator instead of the passed parameter: remove the merge_op parameter from resolve_merge_single and call self.merge_operator (or whatever field name holds the MergeOperator) inside the function to perform the merge, keeping the rest of the logic and returning InternalValue; this preserves method shape consistent with resolve_merge_forward and resolve_merge_buffered while eliminating the unused-self lint.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/compaction/stream.rs`:
- Line 182: Replace the redundant closure in the creation of operand_refs by
using the method reference AsRef::as_ref instead of |v| v.as_ref(); locate the
expression creating operand_refs from operands (operands.iter().map(|v|
v.as_ref()).collect()) and change the map call to use AsRef::as_ref so the
result still yields a Vec<&[u8]>.
- Around line 127-131: Update the doc comment for the function
resolve_merge_operands to mark the type name as code by wrapping MergeOperand in
backticks (e.g., change "first MergeOperand entry" to "first `MergeOperand`
entry"); ensure any other occurrences of MergeOperand in that doc block are
similarly backticked so the markdown linter stops complaining.
In `@src/mvcc_stream.rs`:
- Line 78: Replace the redundant closures used when mapping operands to byte
slices with method references; change the closures like operands.iter().map(|v|
v.as_ref()) to use operands.iter().map(AsRef::as_ref) (apply the same change at
both occurrences around the lines referencing operand_refs).
- Around line 105-117: The doc comment needs backticks around MergeOperand and
the match on self.merge_operator should be refactored to a let-else for clarity:
update the comment above resolve_merge_buffered to use `MergeOperand`, then
replace the match ... None branch with a let Some(op) = &self.merge_operator
else { ... } that performs the same early return of the newest entry
(entries.into_iter().last().ok_or(crate::Error::Unrecoverable)); keep the rest
of resolve_merge_buffered using op as before.
In `@src/tree/mod.rs`:
- Around line 655-661: resolve_merge_get now takes a reference to InternalValue,
so update the call site inside the ValueType::MergeOperand arm to pass the entry
by reference rather than moving it; i.e., call Self::resolve_merge_get with a
borrowed entry (use &entry) and keep passing merge_op.as_ref() as before so the
signatures match.
- Around line 681-689: The call to Self::resolve_merge_get should pass the merge
operator by reference to match the updated signature; locate the branch handling
ValueType::MergeOperand where config.merge_operator is Some(merge_op) and change
the argument you pass (currently merge_op.as_ref() or similar) to pass a
reference to the operator (e.g., &**merge_op or &*merge_op depending on its
container) so that the resolve_merge_get call receives a &MergeOperator
consistent with its new signature; update only the argument expression in the
Self::resolve_merge_get invocation.
- Around line 769-867: The resolve_merge_get function has doc, signature,
closure and duplicate-operand bugs: add backticks around `MergeOperand` in the
doc comment; change the parameter first_entry from InternalValue to
&InternalValue (adjust uses to borrow instead of consuming/cloning); avoid
re-processing the same entry by skipping any scanned entry whose key.seqno
equals first_entry.key.seqno in the active memtable loop, the sealed_memtables
loops, and after table.get(...) in the disk table loop; and replace the
redundant closure operands.iter().map(|v| v.as_ref()) with a method reference
operands.iter().map(UserValue::as_ref) to satisfy clippy. Ensure you update all
internal references (process_entry(&first_entry) stays valid) and any callers of
resolve_merge_get to pass a reference.
---
Outside diff comments:
In `@src/compaction/filter.rs`:
- Around line 142-169: The match in value() has identical bodies for
crate::ValueType::Value and crate::ValueType::MergeOperand; combine those two
variants into one arm (e.g., crate::ValueType::Value |
crate::ValueType::MergeOperand) that returns Ok(self.item.value.clone()) to
satisfy Clippy, leaving the Indirection and tombstone arms unchanged and keeping
the BlobIndirection branch using shared.get_indirect_value as-is.
---
Nitpick comments:
In `@src/mvcc_stream.rs`:
- Around line 90-103: The method resolve_merge_single currently has an unused
self; change it to use the instance merge operator instead of the passed
parameter: remove the merge_op parameter from resolve_merge_single and call
self.merge_operator (or whatever field name holds the MergeOperator) inside the
function to perform the merge, keeping the rest of the logic and returning
InternalValue; this preserves method shape consistent with resolve_merge_forward
and resolve_merge_buffered while eliminating the unused-self lint.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 962ed1d3-4725-4431-a290-d03aea1cfd43
📒 Files selected for processing (17)
src/abstract_tree.rssrc/blob_tree/mod.rssrc/compaction/filter.rssrc/compaction/stream.rssrc/compaction/worker.rssrc/config/mod.rssrc/error.rssrc/key.rssrc/lib.rssrc/memtable/mod.rssrc/merge_operator.rssrc/mvcc_stream.rssrc/range.rssrc/tree/mod.rssrc/value.rssrc/value_type.rstests/merge_operator.rs
There was a problem hiding this comment.
Pull request overview
Adds first-class “merge operand” support to the LSM to enable commutative updates (e.g., counters / edge mutations) without read-modify-write, by introducing a user-provided MergeOperator and propagating merge-aware behavior through reads, scans, flush, and compaction.
Changes:
- Introduces
MergeOperatortrait andValueType::MergeOperandfor storing merge operands. - Adds merge-aware resolution for point reads (
get/multi_get) and range iteration (MvccStream), with opt-out behavior when no operator is configured (raw bytes returned). - Threads merge-operator configuration through
Config, flushing, and compaction (CompactionStream) and adds integration tests for merge semantics.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/merge_operator.rs | New integration tests covering merge behavior across reads/flush/range scans. |
| src/merge_operator.rs | Defines the MergeOperator trait and documentation/example. |
| src/value_type.rs | Adds ValueType::MergeOperand and helper is_merge_operand(). |
| src/value.rs | Adds InternalValue::new_merge_operand constructor. |
| src/config/mod.rs | Adds merge_operator config field and with_merge_operator builder. |
| src/lib.rs | Exposes MergeOperator and registers module. |
| src/tree/mod.rs | Adds AbstractTree::merge, merge-aware get/multi_get, and passes merge operator into range iter; adds point-read merge resolution helper. |
| src/range.rs | Extends iterator state to carry merge operator into MvccStream. |
| src/mvcc_stream.rs | Implements merge-aware forward and reverse iteration resolution. |
| src/memtable/mod.rs | Adds helper to collect all versions for a key within a snapshot bound. |
| src/compaction/worker.rs | Passes merge operator into compaction stream creation. |
| src/compaction/stream.rs | Adds merge-operand collapsing during flush/compaction when below GC threshold. |
| src/compaction/filter.rs | Ensures compaction filters can access MergeOperand values. |
| src/blob_tree/mod.rs | Wires merge() through BlobTree index and passes None operator for iterators (known limitation). |
| src/abstract_tree.rs | Threads merge operator into flush stream. |
| src/key.rs | Debug formatting includes MergeOperand marker. |
| src/error.rs | Adds Error::MergeOperator. |
You can also share your feedback on Copilot code review. Take the survey.
There was a problem hiding this comment.
🧹 Nitpick comments (1)
src/mvcc_stream.rs (1)
216-220: Consider simplifying redundant branches.Lines 216-220 both return
Some(Ok(tail)). Theif !key_entries.is_empty()check is defensive but could be simplified:- if !key_entries.is_empty() { - // Had multi-version key but no merge operands — return newest (tail) - return Some(Ok(tail)); - } return Some(Ok(tail));This is a minor readability nit — the logic is correct either way.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/mvcc_stream.rs` around lines 216 - 220, In src/mvcc_stream.rs simplify the redundant branch that returns Some(Ok(tail)) twice: remove the conditional around key_entries and collapse to a single return of Some(Ok(tail)); update the code in the function/method containing key_entries and tail (mvcc stream logic) so only one unconditional return Some(Ok(tail)) remains, preserving behavior but improving readability.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/mvcc_stream.rs`:
- Around line 216-220: In src/mvcc_stream.rs simplify the redundant branch that
returns Some(Ok(tail)) twice: remove the conditional around key_entries and
collapse to a single return of Some(Ok(tail)); update the code in the
function/method containing key_entries and tail (mvcc stream logic) so only one
unconditional return Some(Ok(tail)) remains, preserving behavior but improving
readability.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 73c305da-418b-46ea-a26a-fbe139ff3a25
📒 Files selected for processing (8)
src/compaction/filter.rssrc/compaction/stream.rssrc/memtable/mod.rssrc/mvcc_stream.rssrc/table/iter.rssrc/table/util.rssrc/tree/mod.rstests/merge_operator.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/merge_operator.rs
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/mvcc_stream.rs`:
- Around line 211-215: The reverse-iteration branch is incorrectly invoking
merge resolution whenever any historical merge operand exists; change it to only
call resolve_merge_buffered when the newest entry (the variable tail) is a
MergeOperand: before pushing tail and returning
resolve_merge_buffered(key_entries), match on tail's enum variant and if it is
MergeOperand then push and call resolve_merge_buffered(key_entries), otherwise
(Value or Tombstone) return the tail as-is (respecting tombstone semantics)
without calling merge. Apply the same conditional check to the other occurrence
that currently uses has_merge_op/has_merge_operand (the block that calls
merge()/resolve_merge_buffered around lines 222–225) so both reverse branches
mirror next() behavior.
In `@tests/merge_operator.rs`:
- Line 1: Remove the unused Guard import from the use statement that currently
lists AbstractTree, Config, Guard, MergeOperator, SequenceNumberCounter, and
UserValue; edit the import so it no longer includes Guard (leaving the other
symbols unchanged) to eliminate the clippy unused-import warning.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 8af2f6d2-2630-4227-9347-823fdbd8cc24
📒 Files selected for processing (2)
src/mvcc_stream.rstests/merge_operator.rs
There was a problem hiding this comment.
Pull request overview
This PR introduces commutative merge semantics to the LSM by adding a MergeOperator trait and a new on-disk ValueType::MergeOperand, then wiring merge-aware resolution into point reads, range iteration (MVCC stream), and compaction/flush.
Changes:
- Add
MergeOperatorAPI + config plumbing (Config::with_merge_operator) and newValueType::MergeOperand. - Implement merge-aware iteration via
MvccStreamand merge collapsing during compaction viaCompactionStream. - Add a comprehensive integration test suite for merge behavior (
tests/merge_operator.rs).
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/merge_operator.rs | New integration tests covering merge semantics across reads, scans, flush/compaction. |
| src/value_type.rs | Add ValueType::MergeOperand and helper is_merge_operand(). |
| src/value.rs | Add InternalValue::new_merge_operand. |
| src/tree/mod.rs | Add AbstractTree::merge, merge-aware get/multi_get, and plumb merge operator into range iteration. |
| src/range.rs | Extend iterator state to carry merge operator into MVCC stream. |
| src/mvcc_stream.rs | Resolve merge operands during forward and reverse iteration when merge operator is configured. |
| src/merge_operator.rs | New MergeOperator trait definition + docs. |
| src/memtable/mod.rs | Add get_all_for_key helper to collect all versions for merge point reads. |
| src/lib.rs | Export MergeOperator; include module. |
| src/key.rs | Add debug tag for MergeOperand. |
| src/error.rs | Add Error::MergeOperator. |
| src/config/mod.rs | Store merge operator in config + builder method. |
| src/compaction/worker.rs | Thread merge operator into compaction stream creation. |
| src/compaction/stream.rs | Collapse merge operands below GC watermark via merge operator; add compaction tests. |
| src/compaction/filter.rs | Treat MergeOperand as a value for filter access. |
| src/blob_tree/mod.rs | Implement merge for BlobTree; plumb new range iterator signature (merge op not used). |
| src/abstract_tree.rs | Pass configured merge operator into flush/compaction streams. |
| src/table/util.rs | Gate too_many_arguments lint expectation behind metrics feature. |
| src/table/iter.rs | Gate too_many_arguments lint expectation behind metrics feature. |
You can also share your feedback on Copilot code review. Take the survey.
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/compaction/stream.rs`:
- Around line 131-177: resolve_merge_operands currently only notifies
dropped_callback for the tombstone path, so absorbed entries (head, collected
operands, and any absorbed base/indirection) are not all reported; update
resolve_merge_operands to, after the merge is successfully produced (i.e. right
before returning the resulting InternalValue), iterate the collected entries
(the original head, every value pushed into operands, and the optional
base_value or the tombstone entry) and call watcher.on_dropped exactly once per
absorbed entry via self.dropped_callback if present; use the existing symbols
resolve_merge_operands, dropped_callback, on_dropped, head, operands,
base_value, and drain_key to locate the logic and ensure you do not
double-report entries already passed to drain_key (only report those the
function consumed internally).
- Around line 280-317: When peeked.key.seqno < self.gc_seqno_threshold but
head.key is a merge operand with head.key.seqno >= self.gc_seqno_threshold, the
current logic falls into the non-merge branch and calls
self.drain_key(&head.key.user_key), which incorrectly drops older base/operands
needed to resolve the live merge operand; update the branch in the compaction
loop (the block handling peeked.key.seqno < self.gc_seqno_threshold inside the
iterator in stream.rs) to first check if head.key.value_type.is_merge_operand()
AND head.key.seqno >= self.gc_seqno_threshold and, if so, skip draining the tail
(i.e., do not call self.drain_key and instead preserve the older entries so
resolution can occur at read time), keeping the existing merge_operator path for
merge resolution; add a regression test that constructs head = MergeOperand with
seqno >= gc_seqno_threshold and older operands/values with seqno <
gc_seqno_threshold to assert compaction does not drop the older operands needed
to resolve the live merge operand.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: f5ad3306-098b-4a24-a76b-69116c7e3c24
📒 Files selected for processing (6)
src/compaction/stream.rssrc/merge_operator.rssrc/mvcc_stream.rssrc/tree/mod.rssrc/value.rstests/merge_operator.rs
🚧 Files skipped from review as they are similar to previous changes (3)
- src/value.rs
- src/merge_operator.rs
- tests/merge_operator.rs
There was a problem hiding this comment.
Pull request overview
Adds optional, user-defined merge operators to support commutative “delta/operand” updates in the LSM (via ValueType::MergeOperand), enabling tree.merge() and merge-aware reads/iteration plus compaction-time collapsing of old operands under an MVCC GC watermark.
Changes:
- Introduces
MergeOperatortrait, config plumbing (Config::merge_operator/with_merge_operator), and newAbstractTree::merge()API. - Extends MVCC iteration (
MvccStream) and range iteration state to resolve merge operands during scans (forward + reverse). - Extends flush/compaction pipeline (
CompactionStream) to optionally collapse merge operands below the GC watermark, and adds extensive tests.
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/merge_operator.rs | Integration tests covering merge semantics across reads, scans, flush/compaction, MVCC, and BlobTree limitations. |
| src/merge_operator.rs | New public MergeOperator trait definition and docs. |
| src/config/mod.rs | Adds merge_operator to Config and builder method with_merge_operator. |
| src/abstract_tree.rs | Adds merge() to the public tree trait and wires merge operator into flush compaction stream. |
| src/value_type.rs | Adds ValueType::MergeOperand and helper is_merge_operand(). |
| src/value.rs | Adds InternalValue::new_merge_operand() constructor. |
| src/error.rs | Adds Error::MergeOperator variant. |
| src/lib.rs | Exposes MergeOperator from the crate root. |
| src/key.rs | Updates InternalKey debug formatting to include merge operand tag. |
| src/memtable/mod.rs | Adds get_all_for_key() to collect all versions for merge resolution. |
| src/tree/mod.rs | Adds Tree::merge(), merge-aware get()/multi_get(), and passes merge operator into range iteration. |
| src/range.rs | Extends iterator state to carry merge operator and passes it into MvccStream. |
| src/mvcc_stream.rs | Adds merge operand resolution in MVCC iterator for forward and reverse iteration. |
| src/compaction/worker.rs | Wires merge operator into compaction stream creation. |
| src/compaction/stream.rs | Adds merge operand collapsing behavior to compaction/flush stream. |
| src/compaction/filter.rs | Treats MergeOperand like Value for filter accessor value retrieval (raw bytes). |
| src/blob_tree/mod.rs | Adds merge() passthrough; keeps BlobTree reads/scans non-resolving (passes None). |
| src/table/util.rs | Gates too_many_arguments lint expectation on metrics feature. |
| src/table/iter.rs | Gates too_many_arguments lint expectation on metrics feature. |
You can also share your feedback on Copilot code review. Take the survey.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/merge_operator.rs`:
- Around line 10-12: Update the public MergeOperator trait docs to explicitly
state that the base_value parameter passed into merge operations (and any
related doc sections around lines 56-60) may already be the result of a previous
merge (e.g., compaction or earlier read resolution) rather than the original
stored value; instruct implementers of MergeOperator::merge (and any helpers
referenced in the trait) to ensure their merge logic is deterministic and stable
across multiple passes so that repeated merging/compaction produces the same
bytes for a key.
There was a problem hiding this comment.
Pull request overview
Adds first-class merge operands and a configurable merge operator to support commutative LSM updates (lazy operand resolution on reads + optional operand collapsing during compaction), enabling efficient mutation-heavy workloads without read-modify-write.
Changes:
- Introduces
MergeOperator+ new on-diskValueType::MergeOperandand wiring throughConfig/public exports. - Adds merge-aware resolution in
Tree::get/multi_getand in MVCC range iteration (MvccStream) for forward + reverse scans. - Extends compaction to optionally collapse merge operands below the MVCC GC watermark via the configured merge operator.
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/merge_operator.rs | New integration test suite covering merge semantics across reads, scans, flush, compaction, and BlobTree limitations. |
| src/value_type.rs | Adds ValueType::MergeOperand and helper is_merge_operand(). |
| src/value.rs | Adds InternalValue::new_merge_operand constructor. |
| src/tree/mod.rs | Merge-aware get/multi_get, point-merge resolution (resolve_merge_get), and plumbs merge operator into range iteration state. |
| src/table/util.rs | Gates too_many_arguments lint suppression to metrics builds only. |
| src/table/iter.rs | Same cfg(metrics) gating for too_many_arguments suppression. |
| src/range.rs | Extends iterator state and passes merge operator into MvccStream. |
| src/mvcc_stream.rs | Adds merge operand resolution during MVCC iteration (forward + reverse) and new unit tests. |
| src/merge_operator.rs | New MergeOperator trait definition and docs. |
| src/memtable/mod.rs | Adds get_all_for_key to collect all versions needed for point-merge resolution. |
| src/lib.rs | Registers module and re-exports MergeOperator. |
| src/key.rs | Extends debug formatting for merge operand keys. |
| src/error.rs | Adds Error::MergeOperator variant. |
| src/config/mod.rs | Stores optional merge operator in config + builder method with_merge_operator. |
| src/compaction/worker.rs | Wires merge operator into compaction stream creation. |
| src/compaction/stream.rs | Adds merge-operand collapsing logic (below GC watermark) and merge-operator plumbing + tests. |
| src/compaction/filter.rs | Treats MergeOperand as value-like for filter accessor. |
| src/blob_tree/mod.rs | Keeps BlobTree range ops non-merge-resolving by passing None merge operator into internal range creation; adds merge() delegation. |
| src/abstract_tree.rs | Plumbs merge operator into the default major-compaction stream and adds AbstractTree::merge. |
You can also share your feedback on Copilot code review. Take the survey.
There was a problem hiding this comment.
Pull request overview
This PR introduces merge operators to support commutative, delta-style updates in the LSM tree by adding a new merge-operand value type and integrating merge resolution into reads, scans, and compaction.
Changes:
- Adds
MergeOperatortrait andValueType::MergeOperandto store merge operands as first-class entries. - Implements merge-aware point reads (
get/multi_get) and range iteration (forward + reverse) via merge resolution inTreeandMvccStream. - Extends compaction to optionally collapse merge operands below the GC watermark (when a merge operator is configured), plus broad test coverage.
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| tests/merge_operator.rs | New integration tests covering merge semantics across reads, scans, flush/compaction, MVCC, and BlobTree write/flush behavior. |
| src/merge_operator.rs | Defines the MergeOperator trait and its contract (determinism/stability). |
| src/value_type.rs | Adds MergeOperand = 3 plus helper is_merge_operand(). |
| src/value.rs | Adds InternalValue::new_merge_operand constructor. |
| src/abstract_tree.rs | Adds AbstractTree::merge API and wires merge operator into flush/compaction stream. |
| src/config/mod.rs | Adds Config::merge_operator and with_merge_operator(...). |
| src/error.rs | Adds Error::MergeOperator. |
| src/lib.rs | Registers module and re-exports MergeOperator. |
| src/key.rs | Extends debug formatting for new value type. |
| src/memtable/mod.rs | Adds get_all_for_key helper to collect all versions for merge resolution. |
| src/tree/mod.rs | Adds Tree::merge, merge-aware get/multi_get, and passes merge operator into range iteration state. |
| src/range.rs | Threads merge operator into iterator state and into MvccStream. |
| src/mvcc_stream.rs | Adds merge-aware MVCC stream resolution for forward and reverse iteration; updates unit tests accordingly. |
| src/compaction/worker.rs | Passes configured merge operator into CompactionStream. |
| src/compaction/stream.rs | Adds merge-operand collapsing logic during compaction based on GC watermark and operator presence. |
| src/compaction/filter.rs | Treats MergeOperand as a value payload for filter accessors. |
| src/blob_tree/mod.rs | Adds merge support and adapts range construction to new create_internal_range signature. |
| src/table/util.rs | Makes clippy too_many_arguments expectation conditional on metrics feature. |
| src/table/iter.rs | Makes clippy too_many_arguments expectation conditional on metrics feature. |
You can also share your feedback on Copilot code review. Take the survey.
There was a problem hiding this comment.
Pull request overview
Adds merge-operator support to the LSM-tree to enable commutative “delta” updates (MergeOperand) that can be lazily combined during reads and (selectively) during compaction, avoiding read-modify-write cycles.
Changes:
- Introduces
MergeOperatortrait andValueType::MergeOperand, plus write-path support viaAbstractTree::merge. - Makes point reads, range iteration (
MvccStream), and compaction merge-aware when a merge operator is configured. - Adds extensive unit + integration test coverage for merge semantics across layers, MVCC, scans, and compaction.
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| tests/merge_operator.rs | Integration tests covering merge semantics across reads/scans/flush/compaction (incl. BlobTree write path). |
| src/merge_operator.rs | Defines the MergeOperator trait and its determinism/stability contract. |
| src/value_type.rs | Adds ValueType::MergeOperand and helper is_merge_operand(). |
| src/value.rs | Adds InternalValue::new_merge_operand constructor. |
| src/abstract_tree.rs | Extends public tree API with merge() and wires merge operator into major compaction stream. |
| src/tree/mod.rs | Adds merge-aware get/multi_get, adds merge() write op, and plumbs merge operator into range iterators. |
| src/memtable/mod.rs | Adds get_all_for_key to collect all versions needed for merge resolution. |
| src/range.rs | Extends iterator state with optional merge operator and passes it into MvccStream. |
| src/mvcc_stream.rs | Resolves merge operands during forward iteration and reverse iteration when merge operator is configured. |
| src/compaction/worker.rs | Threads configured merge operator into CompactionStream. |
| src/compaction/stream.rs | Collapses merge operands below GC watermark into a single value when merge operator is configured. |
| src/compaction/filter.rs | Treats MergeOperand as a non-indirection value for filter access. |
| src/blob_tree/mod.rs | Plumbs merge into BlobTree write API; disables merge resolution for BlobTree range/prefix iterators. |
| src/config/mod.rs | Adds merge_operator to config + builder method with_merge_operator. |
| src/lib.rs | Registers module and re-exports MergeOperator. |
| src/error.rs | Adds Error::MergeOperator. |
| src/key.rs | Adds debug formatting tag for merge operands. |
| src/table/util.rs | Makes #[expect(clippy::too_many_arguments)] conditional on metrics feature. |
| src/table/iter.rs | Makes #[expect(clippy::too_many_arguments)] conditional on metrics feature. |
You can also share your feedback on Copilot code review. Take the survey.
There was a problem hiding this comment.
Pull request overview
Introduces merge-operator support to enable commutative, write-only update patterns (merge operands) that are lazily resolved during reads and selectively collapsed during compaction, improving efficiency for workloads that would otherwise require read-modify-write.
Changes:
- Add
MergeOperatorAPI +ValueType::MergeOperandand expose configuration viaConfig::with_merge_operator. - Implement merge-aware point reads (
get,multi_get) and range iteration (viaMvccStream). - Extend compaction to optionally collapse merge operands below the GC watermark and preserve MVCC correctness above it, plus add broad test coverage.
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tests/merge_operator.rs | New integration tests covering merge semantics across reads, scans, flush/compaction, MVCC, and BlobTree limitations. |
| src/merge_operator.rs | New public MergeOperator trait and documentation for deterministic/stable merge behavior. |
| src/value_type.rs | Add ValueType::MergeOperand and helper is_merge_operand. |
| src/value.rs | Add InternalValue::new_merge_operand constructor. |
| src/config/mod.rs | Add merge_operator to Config and builder method with_merge_operator. |
| src/lib.rs | Wire module + re-export MergeOperator. |
| src/abstract_tree.rs | Add AbstractTree::merge API and pass merge operator into flush/compaction streams. |
| src/tree/mod.rs | Implement merge write path and merge-aware get/multi_get; plumb merge operator into range iteration. |
| src/range.rs | Extend iterator state to carry merge operator into MvccStream. |
| src/mvcc_stream.rs | Resolve merge operands during forward and reverse iteration when a merge operator is configured; add unit tests. |
| src/memtable/mod.rs | Add get_all_for_key helper to gather all versions for merge resolution. |
| src/compaction/worker.rs | Plumb merge operator into CompactionStream creation. |
| src/compaction/stream.rs | Add optional merge-collapsing behavior below GC watermark; preserve operands above watermark; add unit tests. |
| src/compaction/filter.rs | Treat MergeOperand as value bytes for accessor .value() (when not indirection). |
| src/blob_tree/mod.rs | Implement merge by delegating to index tree; keep range iteration merge resolution disabled (per limitation). |
| src/key.rs | Add debug tag for merge operand keys. |
| src/error.rs | Add Error::MergeOperator variant for merge failures. |
| src/table/util.rs | Gate #[expect(clippy::too_many_arguments)] behind metrics to avoid unmet-expect warnings. |
| src/table/iter.rs | Same cfg_attr(metrics, expect(...)) adjustment for constructor. |
You can also share your feedback on Copilot code review. Take the survey.
dfbef11 to
5799df9
Compare
|
@coderabbitai re-review |
|
🧠 Learnings used✅ Actions performedFull review triggered. |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/compaction/worker.rs (1)
388-393:⚠️ Potential issue | 🔴 CriticalCompaction merges are still blind to input range tombstones.
Lines 379-384 gather the input RT set, but the merge stream created here still only receives
merge_operator. That means compaction can collapseValue@1 -> RT@3 -> MergeOperand@5into a mergedValue@5that already contains the deleted base bytes, and the RT written out later will not suppress them anymore.CompactionStreamneeds the same RT-boundary handling asMvccStreambefore merge resolution is enabled here.As per coding guidelines, "Flag incorrect merge semantics: tombstones not propagated to lower levels, point deletes applied out of order (Tier 1 — MUST flag)".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/compaction/worker.rs` around lines 388 - 393, Compaction currently calls create_compaction_stream(...) with only merge_operator, causing tombstones (RTs) to be ignored during merge; update the call site (where you construct create_compaction_stream / let Some(mut merge_iter) = ...) to pass the collected input RT set (the RT boundary data gathered earlier) into create_compaction_stream and extend CompactionStream to accept and honor that RT-boundary exactly like MvccStream does before merge resolution; ensure the merge path uses the RT info to suppress deleted base bytes and propagate tombstones downward rather than letting merges resurrect deleted content.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/abstract_tree.rs`:
- Around line 126-127: The compaction/flush path builds a CompactionStream via
CompactionStream::new(...) and sets the merge operator but never propagates the
earlier-collected range_tombstones into the stream, so flush-time merges can
ignore RT boundaries; modify the code that constructs the CompactionStream (the
call to CompactionStream::new and its builder chain where
.with_merge_operator(...) is called) to also supply the collected
range_tombstones (e.g., add a .with_range_tombstones(range_tombstones) or an
equivalent parameter) so the stream has the RT boundaries available when
resolving merges during flush; if CompactionStream lacks such an API, add a
constructor/builder method to CompactionStream (or to the merger used) to accept
and store range_tombstones and use them during merge resolution.
In `@src/tree/mod.rs`:
- Around line 913-956: The loop in get_internal_entry_from_tables() flat-scans
runs newest-last and breaks on the first base/RT, which can miss newer
MergeOperand entries in other L0 runs; change the iteration to mirror the L0
ordering used elsewhere: collect level-0 runs and iterate those newest-first
(reverse order) before scanning other levels, while keeping non-L0 runs'
iteration unchanged; ensure when scanning L0 runs you do not stop the entire
search on found_base/tombstone/RT until all L0 runs are checked (so table.get,
get_for_key(key), process_entry, and is_rt_suppressed checks defer breaking
across L0 run boundaries), and add a regression test where a base/tombstone and
newer MergeOperand live in different L0 runs to validate merge semantics.
---
Outside diff comments:
In `@src/compaction/worker.rs`:
- Around line 388-393: Compaction currently calls create_compaction_stream(...)
with only merge_operator, causing tombstones (RTs) to be ignored during merge;
update the call site (where you construct create_compaction_stream / let
Some(mut merge_iter) = ...) to pass the collected input RT set (the RT boundary
data gathered earlier) into create_compaction_stream and extend CompactionStream
to accept and honor that RT-boundary exactly like MvccStream does before merge
resolution; ensure the merge path uses the RT info to suppress deleted base
bytes and propagate tombstones downward rather than letting merges resurrect
deleted content.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 43a8fe2b-89be-41c1-9334-82dc244dc617
📒 Files selected for processing (17)
src/abstract_tree.rssrc/blob_tree/mod.rssrc/compaction/filter.rssrc/compaction/stream.rssrc/compaction/worker.rssrc/config/mod.rssrc/error.rssrc/key.rssrc/lib.rssrc/memtable/mod.rssrc/merge_operator.rssrc/mvcc_stream.rssrc/range.rssrc/tree/mod.rssrc/value.rssrc/value_type.rstests/merge_operator.rs
RT@2 suppresses both base@0 and op@1 (seqno < 2), not just base. Flush CompactionStream does not need RT suppression — entries and RTs coexist in output tables, suppression happens at read time.
|
@coderabbitai re-review |
|
🧠 Learnings used✅ Actions performedFull review triggered. |
…ge_get L0 runs can overlap and iteration order is not guaranteed newest-first. Collect all matching on-disk entries, sort by descending seqno, then process — ensures newer MergeOperands are seen before older bases/tombstones.
|
@coderabbitai re-review |
|
🧠 Learnings used✅ Actions performedFull review triggered. |
|
@coderabbitai re-review |
|
🧠 Learnings used✅ Actions performedFull review triggered. |
Move is_rt_suppressed inside the if-let-Some(merge_op) block so the no-operator passthrough path stays O(1) per key without scanning the range_tombstones vec.
Summary
MergeOperatortrait and persistedValueType::MergeOperandsupport for commutative LSM operationsTechnical Details
New persisted type:
ValueType::MergeOperand = 3stores partial updates (operands) that are lazily combined via a user-providedMergeOperatortrait.Write path:
tree.merge(key, operand, seqno)appends aMergeOperandentry to the memtable.Read path:
tree.get()andtree.multi_get()collect all entries for the key across memtable, sealed memtables, and disk tables. The disk scan collects all visible entries for the key across all runs (handling L0 overlap correctly by sorting in descending seqno order), then applies the merge function in chronological order (ascending seqno). Tombstones and RT-suppressed entries stop the search and produce a merge with no base value. When no merge operator is configured, bothget()andmulti_get()check RT suppression before returning raw operand bytes.Range tombstone interaction:
resolve_merge_get(point reads) checks each entry againstis_suppressed_by_range_tombstonesbefore including it in the merge.MvccStream(range scans) accepts optional RT state with per-source visibility cutoffs viawith_range_tombstones()and skips RT-suppressed entries during merge collection (treated as deletion boundaries). If the newestMergeOperanditself is RT-suppressed, merge resolution is skipped entirely. The post-MvccStreamRangeTombstoneFilterremains for non-merge entries.Compaction:
CompactionStreammerges operands below the GC watermark via the merge operator. When a base value or tombstone boundary is found in the same stream, the result is aValue(complete merge). When no boundary is found (partial merge — base may be in a lower level), the result stays asMergeOperandto avoid shadowing the real base. Operands above the watermark are preserved for MVCC. Compaction filters that replace aMergeOperandwith aValuepreserve theMergeOperandtype to prevent base shadowing (Indirection replacement is allowed through to avoid storing blob pointers under the MergeOperand type). Seqno zeroing is skipped for preservedMergeOperandentries to avoid creating duplicate internal keys.Indirection (BlobTree) safety: All merge resolution paths (MvccStream forward/reverse, CompactionStream, resolve_merge_get) detect
ValueType::Indirectionentries and skip merge resolution — blob pointer bytes are not user data. CompactionStream uses a pending buffer to preserve all consumed operands when hitting an Indirection base (no data loss); pending entries flow through the normal compaction pipeline (filter, GC, seqno zeroing).resolve_merge_getfalls back to the latest operand instead of returningOk(None).Range reads:
MvccStreamresolves merge operands during both forward and reverse iteration. Reverse iteration uses a reusable buffer (key_entries_buf) to avoid per-call allocation, and merges only when the newest entry is aMergeOperand.Implementor contract:
MergeOperator::mergemust be deterministic and stable across multiple passes.base_valuemay already be the output of a previous merge (from compaction or earlier read resolution).Disk Format Compatibility
This PR participates in the fork's disk format V4 boundary.
V4is required because the fork now persists semantics that aV3reader cannot interpret safely:range_tombstonessection /BlockType::RangeTombstoneValueType::MergeOperandin the on-disk value encodingAPI
Known Limitations
resolve_merge_get(point reads) andMvccStream(range reads) — tracked in refactor: unify merge resolution via bloom-filtered iterator pipeline #46 for unification via bloom-filtered iterator pipelineTest Plan
CompactionStreamunit tests: GC boundaries, partial merge type, complete vs partial, indirection base, error propagation, dropped callback, mixed watermark, filter type preservationMvccStreamunit tests: forward/reverse merge, indirection base, error propagation, weak tombstone, RT suppression (base, operand, head forward/reverse)Memtableunit tests:get_all_for_keyseqno=0 edge case, multi-version collectionCloses #15
Summary by CodeRabbit
merge()method to store merge operands that are combined with base values during access.