Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/abstract_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -511,6 +511,52 @@ pub trait AbstractTree {
self.get(key, seqno).map(|x| x.is_some())
}

/// Reads multiple keys from the tree.
///
/// Returns a `Vec` with exactly one entry per input key, in the same order
/// as the input. Each entry is `Some(value)` if the key was found, or
/// `None` if it was missing. Duplicate keys in the input produce duplicate
/// entries in the output. This contract is guaranteed for all
/// implementations, including optimized ones.
///
/// Implementations may choose to perform all lookups against a single
/// version snapshot and acquire the version lock only once, which can be
/// more efficient than calling [`AbstractTree::get`] in a loop. The
/// default trait implementation, however, is a convenience wrapper that
/// simply calls [`AbstractTree::get`] for each key and therefore does not
/// guarantee a single-snapshot or single-lock acquisition. Optimized
/// implementations (such as [`Tree`] and [`BlobTree`]) provide the
/// single-snapshot/one-lock behavior.
///
/// # Examples
///
/// ```
/// # let folder = tempfile::tempdir()?;
/// use lsm_tree::{AbstractTree, Config, Tree};
///
/// let tree = Config::new(folder, Default::default(), Default::default()).open()?;
Comment thread
polaz marked this conversation as resolved.
/// tree.insert("a", "value_a", 0);
/// tree.insert("b", "value_b", 1);
///
/// let results = tree.multi_get(["a", "b", "c"], 2)?;
/// assert_eq!(results[0], Some("value_a".as_bytes().into()));
/// assert_eq!(results[1], Some("value_b".as_bytes().into()));
/// assert_eq!(results[2], None);
/// #
/// # Ok::<(), lsm_tree::Error>(())
/// ```
///
/// # Errors
///
/// Will return `Err` if an IO error occurs.
fn multi_get<K: AsRef<[u8]>>(
&self,
keys: impl IntoIterator<Item = K>,
seqno: SeqNo,
) -> crate::Result<Vec<Option<UserValue>>> {
keys.into_iter().map(|key| self.get(key, seqno)).collect()
}
Comment thread
polaz marked this conversation as resolved.

/// Inserts a key-value pair into the tree.
///
/// If the key already exists, the item will be overwritten.
Expand Down
57 changes: 35 additions & 22 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,29 @@ impl BlobTree {
blobs_folder: Arc::new(blobs_folder),
})
}

/// Resolves a single key against a pre-acquired [`SuperVersion`].
Comment thread
polaz marked this conversation as resolved.
fn resolve_key(
&self,
super_version: &crate::version::SuperVersion,
key: &[u8],
seqno: SeqNo,
) -> crate::Result<Option<UserValue>> {
let Some(item) = crate::Tree::get_internal_entry_from_version(super_version, key, seqno)?
else {
return Ok(None);
};

let (_, v) = resolve_value_handle(
self.id(),
self.blobs_folder.as_path(),
&self.index.config.cache,
&super_version.version,
item,
)?;

Ok(Some(v))
}
}

impl AbstractTree for BlobTree {
Expand Down Expand Up @@ -584,30 +607,20 @@ impl AbstractTree for BlobTree {
}

fn get<K: AsRef<[u8]>>(&self, key: K, seqno: SeqNo) -> crate::Result<Option<crate::UserValue>> {
let key = key.as_ref();

#[expect(clippy::expect_used, reason = "lock is expected to not be poisoned")]
let super_version = self
.index
.version_history
.read()
.expect("lock is poisoned")
.get_version_for_snapshot(seqno);

let Some(item) = crate::Tree::get_internal_entry_from_version(&super_version, key, seqno)?
else {
return Ok(None);
};
let super_version = self.index.get_version_for_snapshot(seqno);
self.resolve_key(&super_version, key.as_ref(), seqno)
}

let (_, v) = resolve_value_handle(
self.id(),
self.blobs_folder.as_path(),
&self.index.config.cache,
&super_version.version,
item,
)?;
fn multi_get<K: AsRef<[u8]>>(
&self,
keys: impl IntoIterator<Item = K>,
seqno: SeqNo,
) -> crate::Result<Vec<Option<crate::UserValue>>> {
let super_version = self.index.get_version_for_snapshot(seqno);

Ok(Some(v))
keys.into_iter()
.map(|key| self.resolve_key(&super_version, key.as_ref(), seqno))
.collect()
}

fn remove<K: Into<UserKey>>(&self, key: K, seqno: SeqNo) -> (u64, u64) {
Expand Down
17 changes: 17 additions & 0 deletions src/tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,23 @@ impl AbstractTree for Tree {
.map(|x| x.value))
}

fn multi_get<K: AsRef<[u8]>>(
&self,
keys: impl IntoIterator<Item = K>,
seqno: SeqNo,
) -> crate::Result<Vec<Option<UserValue>>> {
let super_version = self.get_version_for_snapshot(seqno);

keys.into_iter()
.map(|key| {
Ok(
Self::get_internal_entry_from_version(&super_version, key.as_ref(), seqno)?
.map(|x| x.value),
)
})
.collect()
}

fn insert<K: Into<UserKey>, V: Into<UserValue>>(
&self,
key: K,
Expand Down
237 changes: 237 additions & 0 deletions tests/multi_get.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
use lsm_tree::{
get_tmp_folder, AbstractTree, Config, KvSeparationOptions, SeqNo, SequenceNumberCounter,
};
use test_log::test;

#[test]
fn multi_get_all_existing() -> lsm_tree::Result<()> {
let folder = get_tmp_folder();

let tree = Config::new(
&folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;

for i in 0..100u64 {
tree.insert(format!("key_{i:04}"), format!("value_{i}"), i);
}

tree.flush_active_memtable(0)?;

let keys: Vec<String> = (0..100u64).map(|i| format!("key_{i:04}")).collect();
let results = tree.multi_get(&keys, SeqNo::MAX)?;

assert_eq!(results.len(), 100);
for (i, result) in results.iter().enumerate() {
let expected = format!("value_{i}");
assert_eq!(
result.as_deref(),
Some(expected.as_bytes()),
"mismatch at index {i}",
);
}

Ok(())
}

#[test]
fn multi_get_mixed_existing_and_missing() -> lsm_tree::Result<()> {
let folder = get_tmp_folder();

let tree = Config::new(
&folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;

tree.insert("a", "val_a", 0);
tree.insert("c", "val_c", 1);
tree.insert("e", "val_e", 2);

let results = tree.multi_get(["a", "b", "c", "d", "e"], 3)?;

assert_eq!(results.len(), 5);
assert_eq!(results[0].as_deref(), Some(b"val_a".as_slice()));
assert_eq!(results[1], None);
assert_eq!(results[2].as_deref(), Some(b"val_c".as_slice()));
assert_eq!(results[3], None);
assert_eq!(results[4].as_deref(), Some(b"val_e".as_slice()));

Ok(())
}

#[test]
fn multi_get_empty_keys() -> lsm_tree::Result<()> {
let folder = get_tmp_folder();

let tree = Config::new(
&folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;

tree.insert("a", "val_a", 0);

let results = tree.multi_get(Vec::<&str>::new(), 1)?;
assert!(results.is_empty());

Ok(())
}

#[test]
fn multi_get_snapshot_isolation() -> lsm_tree::Result<()> {
let folder = get_tmp_folder();

let tree = Config::new(
&folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;

tree.insert("a", "v1", 0);
tree.insert("b", "v1", 1);

// Update values at higher seqno
tree.insert("a", "v2", 2);
tree.insert("b", "v2", 3);

// Read at snapshot seqno=2: should see a=v1, b=v1
// Snapshot semantics: entry visible iff entry.seqno < snapshot_seqno
// (memtable lookup uses `seqno - 1` as upper bound, see Memtable::get).
// So a@2 (v2) is NOT visible at seqno=2, only a@0 (v1) is.
let results = tree.multi_get(["a", "b"], 2)?;
assert_eq!(results[0].as_deref(), Some(b"v1".as_slice()));
assert_eq!(results[1].as_deref(), Some(b"v1".as_slice()));

// Read at snapshot seqno=4: should see a=v2, b=v2
let results = tree.multi_get(["a", "b"], 4)?;
assert_eq!(results[0].as_deref(), Some(b"v2".as_slice()));
assert_eq!(results[1].as_deref(), Some(b"v2".as_slice()));

Ok(())
}

#[test]
fn multi_get_with_tombstones() -> lsm_tree::Result<()> {
let folder = get_tmp_folder();

let tree = Config::new(
&folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;

tree.insert("a", "val_a", 0);
tree.insert("b", "val_b", 1);
tree.remove("a", 2);

let results = tree.multi_get(["a", "b"], 3)?;
assert_eq!(results[0], None);
assert_eq!(results[1].as_deref(), Some(b"val_b".as_slice()));

Ok(())
}

#[test]
fn multi_get_from_disk() -> lsm_tree::Result<()> {
let folder = get_tmp_folder();

let tree = Config::new(
&folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;

tree.insert("a", "val_a", 0);
tree.insert("b", "val_b", 1);
tree.insert("c", "val_c", 2);
tree.flush_active_memtable(0)?;

// Insert more to memtable
tree.insert("d", "val_d", 3);

// Multi-get spanning both disk and memtable
let results = tree.multi_get(["a", "b", "c", "d", "e"], SeqNo::MAX)?;
assert_eq!(results.len(), 5);
assert_eq!(results[0].as_deref(), Some(b"val_a".as_slice()));
assert_eq!(results[1].as_deref(), Some(b"val_b".as_slice()));
assert_eq!(results[2].as_deref(), Some(b"val_c".as_slice()));
assert_eq!(results[3].as_deref(), Some(b"val_d".as_slice()));
assert_eq!(results[4], None);

Ok(())
}

#[test]
fn multi_get_blob_tree_with_kv_separation() -> lsm_tree::Result<()> {
let folder = get_tmp_folder();

let tree = Config::new(
&folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_kv_separation(Some(KvSeparationOptions {
separation_threshold: 1, // separate all values
..Default::default()
}))
.open()?;

let big_val_a = b"aaa".repeat(1000);
let big_val_b = b"bbb".repeat(1000);

tree.insert("a", big_val_a.as_slice(), 0);
tree.insert("b", big_val_b.as_slice(), 1);
tree.insert("c", b"ccc".repeat(1000).as_slice(), 2);
tree.remove("c", 3);

tree.flush_active_memtable(0)?;

// Verify blob indirections were created
assert!(tree.blob_file_count() > 0);

let results = tree.multi_get(["a", "b", "c", "missing"], SeqNo::MAX)?;

assert_eq!(results.len(), 4);
assert_eq!(results[0].as_deref(), Some(big_val_a.as_slice()));
assert_eq!(results[1].as_deref(), Some(big_val_b.as_slice()));
assert_eq!(results[2], None); // tombstoned
assert_eq!(results[3], None); // never existed

Ok(())
}

#[test]
fn multi_get_unsorted_and_duplicate_keys() -> lsm_tree::Result<()> {
let folder = get_tmp_folder();

let tree = Config::new(
&folder,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;

tree.insert("a", "val_a", 0);
tree.insert("b", "val_b", 1);
tree.insert("c", "val_c", 2);

// Unsorted keys with a duplicate — results must match input order 1:1
let results = tree.multi_get(["c", "a", "b", "a", "missing"], 3)?;

assert_eq!(results.len(), 5);
assert_eq!(results[0].as_deref(), Some(b"val_c".as_slice()));
assert_eq!(results[1].as_deref(), Some(b"val_a".as_slice()));
assert_eq!(results[2].as_deref(), Some(b"val_b".as_slice()));
assert_eq!(results[3].as_deref(), Some(b"val_a".as_slice())); // duplicate
assert_eq!(results[4], None);

Ok(())
}
Loading