Skip to content
Merged
Show file tree
Hide file tree
Changes from 45 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
fcedd42
perf: batch multi_get + PinnableSlice + WriteBatch
polaz Apr 5, 2026
56210bb
test: comprehensive integration tests for batch APIs
polaz Apr 5, 2026
6ef0bbe
refactor(table): extract check_bloom helper, fix batch safety
polaz Apr 5, 2026
c34bca7
fix(tree): hold read guard during batch insert, fix docs + bloom
polaz Apr 5, 2026
06e216e
fix(docs): correct PinnableSlice + WriteBatch semantics, fix filter miss
polaz Apr 5, 2026
10b260f
test(tree): add PinnableSlice + multi_get edge case tests
polaz Apr 5, 2026
23bc77e
fix(docs): align WriteBatch + get_pinned wording with visibility cont…
polaz Apr 5, 2026
698b1cf
fix(write_batch): document duplicate-key semantics, tighten visibility
polaz Apr 5, 2026
8db05fa
perf(blob_tree): batch multi_get for BlobTree, add batch_ops bench
polaz Apr 5, 2026
d2bd527
fix(blob_tree): add range tombstone suppression to batch multi_get
polaz Apr 5, 2026
431e975
fix(table): return global seqnos from Table::get and get_with_block
polaz Apr 5, 2026
8be9dbe
refactor(table): extract point_read_inner to DRY block-index walk
polaz Apr 5, 2026
ef9c21f
fix(tree): add debug_asserts for batch_get_from_tables index contract
polaz Apr 5, 2026
2b04c91
fix(tree): correct L0 fast-path seqno check, fix duplicate-key docs
polaz Apr 5, 2026
11c1f66
fix(docs): correct WriteBatch wording in README
polaz Apr 5, 2026
cb132fe
refactor(tree): unify table point-read walk via TablePointLookup trait
polaz Apr 5, 2026
6aa6d66
fix(write_batch): correct duplicate-key docs, add mixed-op debug_assert
polaz Apr 5, 2026
04af107
perf(tree): add L0 seqno ceiling skip to batch_get_from_tables
polaz Apr 5, 2026
80c3aa7
refactor(tree): extract resolve_pinned_entry helper for get_pinned
polaz Apr 5, 2026
766d34c
refactor(tree): unify resolve_entry via resolve_pinned_entry
polaz Apr 5, 2026
0fbf3f8
fix(write_batch): reject mixed-op duplicates unconditionally
polaz Apr 5, 2026
23da114
fix(docs): align WriteBatch mixed-op doc with unconditional validation
polaz Apr 5, 2026
a6af258
perf(tree): defer sort+hash after memtable phase, bitmap for L0 ceiling
polaz Apr 5, 2026
5b94ac4
refactor(table): reword filter_queries metrics annotation
polaz Apr 5, 2026
fbc99e4
fix(blob_tree): add merge resolution to batch multi_get path
polaz Apr 5, 2026
bebe140
fix(blob_tree): use resolve_merge_via_pipeline directly, not resolve_key
polaz Apr 5, 2026
d243003
perf(bench): reuse fixed keys to prevent memtable growth across itera…
polaz Apr 5, 2026
c6ac265
perf(table): saturating_add for global seqno, hash only remaining keys
polaz Apr 5, 2026
b86ca49
fix(blob_tree): return raw merge operand when operator absent
polaz Apr 5, 2026
3de18ea
perf(blob_tree): defer snapshot acquisition below empty-batch check
polaz Apr 5, 2026
dc56c17
perf(tree): pass (idx, hash) pairs to batch_get_from_tables
polaz Apr 6, 2026
2ba3f05
docs(copilot): add unit struct vs type alias rule
polaz Apr 6, 2026
238d13f
perf(bench): use constant seqno to prevent version accumulation
polaz Apr 6, 2026
a27fbdd
refactor(bench): extract setup_empty_tree helper for write benchmarks
polaz Apr 6, 2026
22e4042
perf(bench): use iter_batched for fresh tree per sample
polaz Apr 6, 2026
5297d52
perf(bench): add black_box to prevent optimizer elision
polaz Apr 6, 2026
a2c1f56
perf(tree): avoid clone in materialize validation, take miss_keys by …
polaz Apr 6, 2026
8f43698
refactor(error): reword MixedOperationBatch as ambiguous semantics
polaz Apr 6, 2026
8660343
refactor(tree): reword resolve_or_passthrough_pinned doc
polaz Apr 6, 2026
4a9d067
refactor(tree): update batch_get_from_tables doc and debug_assert
polaz Apr 6, 2026
5ee39fa
perf(tree): L1+ early exit after covering table miss in batch path
polaz Apr 6, 2026
70f973b
perf(tree): break L1+ single-key walk after covering table miss
polaz Apr 6, 2026
1422860
perf(tree): re-sort after L1+ covered_miss merge, DRY test helpers
polaz Apr 6, 2026
f17cd0a
test(pinnable_slice): improve test robustness
polaz Apr 6, 2026
6e025fb
test(multi_get): blob_tree batch RT suppression, merge, memtable-first
polaz Apr 6, 2026
05707d9
docs(write_batch): clarify mixed-op rejection rationale
polaz Apr 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ This is a **maintained fork** of [fjall-rs/lsm-tree](https://github.com/fjall-rs
- **Unsizing coercions:** `Arc::new(ConcreteType) as Arc<dyn Trait>` is valid Rust — the compiler performs an unsizing coercion from `Arc<T>` to `Arc<dyn Trait>` when `T: Trait`. This is NOT an invalid cast. Do not flag it as a compile error.
- **Feature gates:** Code behind `#[cfg(feature = "...")]` must compile with any combination of features. Variables used only in feature-gated branches must also be feature-gated.
- **Definite initialization:** `let x: T;` without a default value is valid Rust — the compiler tracks initialization per control-flow path and does not drop uninitialized bindings; do not flag this as a bug when the variable is only used in paths that initialize it.
- **Unit structs as values:** `struct Foo;` (unit struct) is both a type AND a value (constructor). Writing `HashMap::with_capacity_and_hasher(n, FxBuildHasher)` is valid when `FxBuildHasher` is a unit struct — do not confuse with type aliases (`type Foo = Bar`) which cannot be used as values.

## Testing Standards

Expand Down
8 changes: 7 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ interval-heap = "0.0.5"
log = "0.4.27"
lz4_flex = { version = "0.13.0", optional = true, default-features = false }
zstd = { version = "0.13", optional = true, default-features = false }
structured-zstd = { version = "0.0.1", optional = true, default-features = false, features = ["std"] }
structured-zstd = { version = "0.0.7", optional = true, default-features = false, features = ["std"] }
quick_cache = { version = "0.6.16", default-features = false, features = [] }
Comment thread
polaz marked this conversation as resolved.
rustc-hash = "2.1.1"
self_cell = "1.2.0"
Expand Down Expand Up @@ -90,3 +90,9 @@ name = "index_block"
harness = false
path = "benches/index_block.rs"
required-features = []

[[bench]]
name = "batch_ops"
harness = false
path = "benches/batch_ops.rs"
required-features = []
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
[![License](https://img.shields.io/badge/license-Apache--2.0-blue)](#license)

> LSM-tree engine for [CoordiNode](https://github.com/structured-world/coordinode), maintained by [Structured World Foundation](https://sw.foundation).
> Derivative work of [fjall-rs/lsm-tree](https://github.com/fjall-rs/lsm-tree), developed independently with diverging features: zstd dictionary compression, custom sequence number generators, multi_get, intra-L0 compaction, and security hardening.
> Derivative work of [fjall-rs/lsm-tree](https://github.com/fjall-rs/lsm-tree), developed independently with diverging features: zstd dictionary compression, custom sequence number generators, multi_get (batch-optimized), PinnableSlice zero-copy reads, WriteBatch seqno-grouped batch writes with caller-controlled atomic visibility, intra-L0 compaction, and security hardening.

> [!IMPORTANT]
> This fork now introduces a fork-specific **disk format V4** compatibility boundary.
Expand Down
128 changes: 128 additions & 0 deletions benches/batch_ops.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use criterion::{BenchmarkId, Criterion, criterion_group, criterion_main};
use lsm_tree::{AbstractTree, AnyTree, Config, SeqNo, SequenceNumberCounter, WriteBatch};

fn setup_tree_with_disk_data(n: u64) -> (AnyTree, tempfile::TempDir) {
let folder = tempfile::tempdir().unwrap();
let tree = Config::new(
&folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()
.unwrap();

for i in 0..n {
tree.insert(format!("key_{i:06}"), format!("value_{i}"), i);
}
tree.flush_active_memtable(0).unwrap();

(tree, folder)
}

fn bench_multi_get(c: &mut Criterion) {
let mut group = c.benchmark_group("multi_get");

for count in [10, 50, 100, 500] {
let (tree, _folder) = setup_tree_with_disk_data(1000);

let keys: Vec<String> = (0..count).map(|i| format!("key_{i:06}")).collect();

group.bench_with_input(BenchmarkId::new("disk", count), &count, |b, _| {
b.iter(|| {
std::hint::black_box(tree.multi_get(&keys, SeqNo::MAX).unwrap());
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.
});
}

group.finish();
}

fn bench_get_pinned(c: &mut Criterion) {
let mut group = c.benchmark_group("get_pinned");

let (tree, _folder) = setup_tree_with_disk_data(1000);

group.bench_function("disk_hit", |b| {
b.iter(|| {
std::hint::black_box(tree.get_pinned("key_000500", SeqNo::MAX).unwrap());
});
});

// Compare with regular get
group.bench_function("get_regular", |b| {
b.iter(|| {
std::hint::black_box(tree.get("key_000500", SeqNo::MAX).unwrap());
});
});

group.finish();
}

fn setup_empty_tree() -> (AnyTree, tempfile::TempDir) {
let folder = tempfile::tempdir().unwrap();
let tree = Config::new(
&folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()
.unwrap();
(tree, folder)
}

fn bench_write_batch(c: &mut Criterion) {
let mut group = c.benchmark_group("write_batch");

for batch_size in [10, 50, 100, 500] {
group.bench_with_input(
BenchmarkId::new("insert", batch_size),
&batch_size,
|b, &size| {
let keys: Vec<String> = (0..size).map(|i| format!("key_{i:04}")).collect();

b.iter_batched(
setup_empty_tree,
|(tree, _folder)| {
let mut batch = WriteBatch::with_capacity(size);
for k in &keys {
batch.insert(k.as_str(), "value");
}
tree.apply_batch(batch, 0).unwrap();
},
criterion::BatchSize::SmallInput,
);
},
);
}

// Compare with individual inserts
for batch_size in [10, 50, 100, 500] {
group.bench_with_input(
BenchmarkId::new("individual_inserts", batch_size),
&batch_size,
|b, &size| {
let keys: Vec<String> = (0..size).map(|i| format!("key_{i:04}")).collect();

b.iter_batched(
setup_empty_tree,
|(tree, _folder)| {
for k in &keys {
tree.insert(k.as_str(), "value", 0);
}
},
criterion::BatchSize::SmallInput,
);
},
);
}

group.finish();
}

criterion_group!(
benches,
bench_multi_get,
bench_get_pinned,
bench_write_batch
);
criterion_main!(benches);
76 changes: 76 additions & 0 deletions src/abstract_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,44 @@ pub trait AbstractTree: sealed::Sealed {
/// Will return `Err` if an IO error occurs.
fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<UserValue>>;

/// Retrieves an item from the tree as a [`PinnableSlice`].
///
/// When the value is backed by an on-disk data block, implementations
/// may return [`PinnableSlice::Pinned`] holding a reference to that block's
/// decompressed buffer (avoiding a data copy). Memtable and blob-resolved
/// values use [`PinnableSlice::Owned`]. The default implementation always
/// returns `Owned`; only [`Tree`] overrides with the pinned path.
///
/// The existing [`AbstractTree::get`] method is unaffected.
///
/// # Examples
///
/// ```
/// # let folder = tempfile::tempdir()?;
/// use lsm_tree::{AbstractTree, Config, Tree};
///
/// let tree = Config::new(&folder, Default::default(), Default::default()).open()?;
/// tree.insert("a", "my_value", 0);
///
/// let item = tree.get_pinned("a", 1)?;
/// assert_eq!(item.as_ref().map(|v| v.as_ref()), Some("my_value".as_bytes()));
/// #
/// # Ok::<(), lsm_tree::Error>(())
/// ```
Comment thread
polaz marked this conversation as resolved.
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn get_pinned<K: AsRef<[u8]>>(
&self,
key: K,
seqno: SeqNo,
) -> crate::Result<Option<crate::PinnableSlice>> {
// Default: delegate to get() and wrap as Owned
self.get(key, seqno)
.map(|opt| opt.map(crate::PinnableSlice::owned))
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.

/// Returns `true` if the tree contains the specified key.
///
/// # Examples
Expand Down Expand Up @@ -656,6 +694,44 @@ pub trait AbstractTree: sealed::Sealed {
keys.into_iter().map(|key| self.get(key, seqno)).collect()
}

/// Applies a [`WriteBatch`] with the given sequence number.
///
/// All entries share a single seqno. This is more efficient than individual
/// writes because the version-history lock and memtable size accounting
/// are performed only once for the entire batch.
///
/// **Visibility:** entries become individually visible to concurrent readers
/// as they are inserted. For atomic batch visibility, the caller must
/// publish `seqno` (via `visible_seqno.fetch_max(seqno + 1)`) only
/// **after** this method returns.
///
/// Returns the total bytes added and new size of the memtable.
///
/// # Examples
///
/// ```
/// # let folder = tempfile::tempdir()?;
/// use lsm_tree::{AbstractTree, Config, Tree, WriteBatch};
///
/// let tree = Config::new(&folder, Default::default(), Default::default()).open()?;
///
/// let mut batch = WriteBatch::new();
/// batch.insert("key1", "value1");
/// batch.insert("key2", "value2");
/// batch.remove("key3");
///
/// let (bytes_added, memtable_size) = tree.apply_batch(batch, 0)?;
/// assert!(bytes_added > 0);
/// #
/// # Ok::<(), lsm_tree::Error>(())
/// ```
Comment thread
polaz marked this conversation as resolved.
///
/// # Errors
///
/// Returns [`Error::MixedOperationBatch`](crate::Error::MixedOperationBatch)
/// if the batch contains mixed operation types for the same user key.
fn apply_batch(&self, batch: crate::WriteBatch, seqno: SeqNo) -> crate::Result<(u64, u64)>;

/// Inserts a key-value pair into the tree.
///
/// If the key already exists, the item will be overwritten.
Expand Down
113 changes: 110 additions & 3 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,10 @@ impl AbstractTree for BlobTree {
self.index.get_highest_persisted_seqno()
}

fn apply_batch(&self, batch: crate::WriteBatch, seqno: SeqNo) -> crate::Result<(u64, u64)> {
self.index.apply_batch(batch, seqno)
}

fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&self,
key: K,
Expand All @@ -681,16 +685,119 @@ impl AbstractTree for BlobTree {
self.resolve_key(&super_version, key.as_ref(), seqno)
}

#[expect(
clippy::indexing_slicing,
reason = "indices are generated from 0..n range, always in bounds"
)]
fn multi_get<K: AsRef<[u8]>>(
&self,
keys: impl IntoIterator<Item = K>,
seqno: SeqNo,
) -> crate::Result<Vec<Option<crate::UserValue>>> {
let keys: Vec<_> = keys.into_iter().collect();
let n = keys.len();
if n == 0 {
return Ok(Vec::new());
}

let super_version = self.index.get_version_for_snapshot(seqno);
let comparator = self.index.config.comparator.as_ref();

// For small batches, use the simple per-key path
if n <= 2 {
return keys
.iter()
.map(|key| self.resolve_key(&super_version, key.as_ref(), seqno))
.collect();
}

// Phase 1: Check memtables (unsorted — defer sort+hash for SST phase)
let mut internal_entries: Vec<Option<crate::value::InternalValue>> = vec![None; n];
let mut remaining: Vec<usize> = Vec::with_capacity(n);

for idx in 0..n {
let key = keys[idx].as_ref();
if let Some(entry) = super_version.active_memtable.get(key, seqno) {
internal_entries[idx] = Some(entry);
continue;
}
if let Some(entry) =
crate::Tree::get_internal_entry_from_sealed_memtables(&super_version, key, seqno)
{
internal_entries[idx] = Some(entry);
continue;
}
remaining.push(idx);
}

// Phase 2: Sort + hash only if memtable misses exist
if !remaining.is_empty() {
remaining.sort_by(|&a, &b| comparator.compare(keys[a].as_ref(), keys[b].as_ref()));

let miss_keys: Vec<(usize, u64)> = remaining
.iter()
.map(|&idx| {
let hash =
crate::table::filter::standard_bloom::Builder::get_hash(keys[idx].as_ref());
(idx, hash)
})
.collect();

crate::Tree::batch_get_from_tables(
&super_version.version,
&keys,
miss_keys,
seqno,
comparator,
&mut internal_entries,
)?;
}

// Phase 3: Resolve each entry (tombstones, RT suppression, merge, blob indirections)
let mut results = vec![None; n];
for idx in 0..n {
if let Some(item) = internal_entries[idx].take() {
if item.is_tombstone() {
continue;
}
if crate::Tree::is_suppressed_by_range_tombstones(
&super_version,
keys[idx].as_ref(),
item.key.seqno,
seqno,
comparator,
) {
continue;
}
// Merge operand resolution. Merge operands in BlobTree are stored
// inline (not as blob indirection), so the pipeline result is a
// plain value. Without a merge operator, return raw operand value
// (same as resolve_key / resolve_pinned_entry behavior).
if item.key.value_type.is_merge_operand() {
if let Some(merge_op) = &self.index.config.merge_operator {
results[idx] = crate::Tree::resolve_merge_via_pipeline(
super_version.clone(),
keys[idx].as_ref(),
seqno,
Arc::clone(merge_op),
)?;
} else {
results[idx] = Some(item.value);
}
continue;
}
let (_, v) = resolve_value_handle(
self.id(),
self.blobs_folder.as_path(),
&self.index.config.cache,
&super_version.version,
item,
)?;
results[idx] = Some(v);
}
}

keys.into_iter()
.map(|key| self.resolve_key(&super_version, key.as_ref(), seqno))
.collect()
Ok(results)
}

fn merge<K: Into<UserKey>, V: Into<UserValue>>(
Expand Down
Loading
Loading