diff --git a/Cargo.toml b/Cargo.toml index c54096744..6065c6a40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,12 +55,6 @@ unexpected_cfgs = { level = "warn", check-cfg = [ [package.metadata.cargo-all-features] denylist = [] -[[bench]] -name = "tli" -harness = false -path = "benches/tli.rs" -required-features = [] - [[bench]] name = "merge" harness = false @@ -79,12 +73,6 @@ harness = false path = "benches/bloom.rs" required-features = [] -[[bench]] -name = "block" -harness = false -path = "benches/block.rs" -required-features = ["lz4"] - [[bench]] name = "tree" harness = false @@ -103,8 +91,3 @@ harness = false path = "benches/fd_table.rs" required-features = [] -[[bench]] -name = "partition_point" -harness = false -path = "benches/partition_point.rs" -required-features = [] diff --git a/benches/block.rs b/benches/block.rs deleted file mode 100644 index 0e02eff7a..000000000 --- a/benches/block.rs +++ /dev/null @@ -1,200 +0,0 @@ -use criterion::{criterion_group, criterion_main, Criterion}; -use lsm_tree::{ - coding::Encode, - table::{ - block::{header::Header as BlockHeader, offset::BlockOffset, ItemSize}, - meta::CompressionType, - value_block::ValueBlock, - }, - Checksum, InternalValue, -}; -use rand::Rng; -use std::io::Write; - -/* fn value_block_size(c: &mut Criterion) { - let mut group = c.benchmark_group("ValueBlock::size"); - - for item_count in [10, 100, 1_000] { - group.bench_function(format!("{item_count} items"), |b| { - let items = (0..item_count) - .map(|_| { - InternalValue::from_components( - "a".repeat(16).as_bytes(), - "a".repeat(100).as_bytes(), - 63, - lsm_tree::ValueType::Value, - ) - }) - .collect(); - - let block = ValueBlock { - items, - header: BlockHeader { - compression: CompressionType::Lz4, - checksum: Checksum::from_raw(0), - data_length: 0, - previous_block_offset: 0, - uncompressed_length: 0, - }, - }; - - b.iter(|| { - (&*block.items).size(); - }) - }); - } -} */ - -fn value_block_find(c: &mut Criterion) { - let mut group = c.benchmark_group("ValueBlock::find_latest"); - - for item_count in [10, 100, 1_000, 10_000] { - let mut items = vec![]; - - for item in 0u64..item_count { - items.push(InternalValue::from_components( - item.to_be_bytes(), - b"", - 0, - lsm_tree::ValueType::Value, - )); - } - - let block = ValueBlock { - items: items.into_boxed_slice(), - header: BlockHeader { - compression: CompressionType::Lz4, - checksum: Checksum::from_raw(0), - data_length: 0, - previous_block_offset: BlockOffset(0), - uncompressed_length: 0, - }, - }; - - let mut rng = rand::rng(); - - group.bench_function(format!("{item_count} items (linear)"), |b| { - b.iter(|| { - let needle = rng.random_range(0..item_count).to_be_bytes(); - - let item = block - .items - .iter() - .find(|item| &*item.key.user_key == needle) - .cloned() - .unwrap(); - - assert_eq!(item.key.user_key, needle); - }) - }); - - group.bench_function(format!("{item_count} items (binary search)"), |b| { - b.iter(|| { - let needle = rng.random_range(0..item_count).to_be_bytes(); - - let item = block.get_latest(&needle).unwrap(); - assert_eq!(item.key.user_key, needle); - }) - }); - } -} - -fn encode_block(c: &mut Criterion) { - let mut group = c.benchmark_group("Encode block"); - - for comp_type in [CompressionType::None, CompressionType::Lz4] { - for block_size in [4, 8, 16, 32, 64, 128] { - let block_size = block_size * 1_024; - - let mut size = 0; - - let mut items = vec![]; - - for x in 0u64.. { - let value = InternalValue::from_components( - x.to_be_bytes(), - x.to_string().repeat(50).as_bytes(), - 63, - lsm_tree::ValueType::Value, - ); - - size += value.size(); - - items.push(value); - - if size >= block_size { - break; - } - } - - group.bench_function(format!("{block_size} KiB [{comp_type}]"), |b| { - b.iter(|| { - // Serialize block - let (mut header, data) = - ValueBlock::to_bytes_compressed(&items, BlockOffset(0), comp_type).unwrap(); - }); - }); - } - } -} - -fn load_value_block_from_disk(c: &mut Criterion) { - let mut group = c.benchmark_group("Load block from disk"); - - for comp_type in [CompressionType::None, CompressionType::Lz4] { - for block_size in [4, 8, 16, 32, 64, 128] { - let block_size = block_size * 1_024; - - let mut size = 0; - - let mut items = vec![]; - - for x in 0u64.. { - let value = InternalValue::from_components( - x.to_be_bytes(), - x.to_string().repeat(50).as_bytes(), - 63, - lsm_tree::ValueType::Value, - ); - - size += value.size(); - - items.push(value); - - if size >= block_size { - break; - } - } - - // Serialize block - let (mut header, data) = - ValueBlock::to_bytes_compressed(&items, BlockOffset(0), comp_type).unwrap(); - - let mut file = tempfile::tempfile().unwrap(); - header.encode_into(&mut file).unwrap(); - file.write_all(&data).unwrap(); - - let expected_block = ValueBlock { - items: items.clone().into_boxed_slice(), - header, - }; - - group.bench_function(format!("{block_size} KiB [{comp_type}]"), |b| { - b.iter(|| { - let loaded_block = ValueBlock::from_file(&mut file, BlockOffset(0)).unwrap(); - - assert_eq!(loaded_block.items.len(), expected_block.items.len()); - assert_eq!(loaded_block.header.checksum, expected_block.header.checksum); - }); - }); - } - } -} - -criterion_group!( - benches, - encode_block, - value_block_find, - load_value_block_from_disk, -); -criterion_main!(benches); diff --git a/benches/bloom.rs b/benches/bloom.rs index 1b67e9c4a..4c2d8dfe1 100644 --- a/benches/bloom.rs +++ b/benches/bloom.rs @@ -54,34 +54,6 @@ fn standard_filter_construction(c: &mut Criterion) { }); } -fn blocked_filter_construction(c: &mut Criterion) { - use lsm_tree::table::filter::blocked_bloom::Builder; - - let mut rng = rand::rng(); - - c.bench_function("blocked bloom filter add key, 1M", |b| { - let mut filter = Builder::with_fp_rate(1_000_000, 0.01); - - b.iter(|| { - let mut key = [0; 16]; - rng.fill_bytes(&mut key); - - filter.set_with_hash(Builder::get_hash(&key)); - }); - }); - - c.bench_function("blocked bloom filter add key, 10M", |b| { - let mut filter = Builder::with_fp_rate(10_000_000, 0.01); - - b.iter(|| { - let mut key = [0; 16]; - rng.fill_bytes(&mut key); - - filter.set_with_hash(Builder::get_hash(&key)); - }); - }); -} - fn standard_filter_contains(c: &mut Criterion) { use lsm_tree::table::filter::standard_bloom::Builder; @@ -126,56 +98,10 @@ fn standard_filter_contains(c: &mut Criterion) { } } -fn blocked_filter_contains(c: &mut Criterion) { - use lsm_tree::table::filter::blocked_bloom::Builder; - - let keys = (0..100_000u128) - .map(|x| x.to_be_bytes().to_vec()) - .collect::>(); - - for fpr in [0.1, 0.01, 0.001, 0.0001, 0.00001] { - // NOTE: Purposefully bloat bloom filter size to run into more CPU cache misses - let n = 100_000_000; - - let mut filter = Builder::with_fp_rate(n, fpr); - - for key in &keys { - filter.set_with_hash(Builder::get_hash(key)); - } - - let mut rng = rand::rng(); - - let filter_bytes = filter.build(); - - c.bench_function( - &format!( - "blocked bloom filter contains key, true positive ({}%)", - fpr * 100.0, - ), - |b| { - b.iter(|| { - use lsm_tree::table::filter::blocked_bloom::BlockedBloomFilterReader as Reader; - use rand::seq::IndexedRandom; - - // NOTE: To make the costs more realistic, we - // pretend we are reading the filter straight from the block - let filter = Reader::new(&filter_bytes).unwrap(); - - let sample = keys.choose(&mut rng).unwrap(); - let hash = Builder::get_hash(sample); - assert!(filter.contains_hash(hash)); - }); - }, - ); - } -} - criterion_group!( benches, fast_block_index, standard_filter_construction, - blocked_filter_construction, standard_filter_contains, - blocked_filter_contains, ); criterion_main!(benches); diff --git a/benches/level_manifest.rs b/benches/level_manifest.rs index 1834f8600..af7cdcc13 100644 --- a/benches/level_manifest.rs +++ b/benches/level_manifest.rs @@ -1,5 +1,5 @@ use criterion::{criterion_group, criterion_main, Criterion}; -use lsm_tree::{AbstractTree, Config}; +use lsm_tree::{config::BlockSizePolicy, AbstractTree, Config, SequenceNumberCounter}; fn iterate_segments(c: &mut Criterion) { let mut group = c.benchmark_group("Iterate level manifest"); @@ -10,71 +10,26 @@ fn iterate_segments(c: &mut Criterion) { for segment_count in [0, 1, 5, 10, 100, 500, 1_000, 2_000, 4_000] { group.bench_function(format!("iterate {segment_count} segments"), |b| { let folder = tempfile::tempdir_in(".bench").unwrap(); - let tree = Config::new(folder).data_block_size(1_024).open().unwrap(); + let tree = Config::new( + folder, + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .data_block_size_policy(BlockSizePolicy::all(1_024)) + .open() + .unwrap(); for x in 0_u64..segment_count { tree.insert("a", "b", x); tree.flush_active_memtable(0).unwrap(); } - let levels = tree.levels.read().unwrap(); - b.iter(|| { - assert_eq!(levels.iter().count(), segment_count as usize); + assert_eq!(tree.table_count(), segment_count as usize); }); }); } } -fn find_segment(c: &mut Criterion) { - let mut group = c.benchmark_group("Find segment in disjoint level"); - group.sample_size(10); - - std::fs::create_dir_all(".bench").unwrap(); - - for segment_count in [1u16, 2, 3, 4, 5, 10, 100, 1_000] { - let folder = tempfile::tempdir_in(".bench").unwrap(); - let tree = Config::new(folder).data_block_size(1_024).open().unwrap(); - - for x in 0..segment_count { - tree.insert(x.to_be_bytes(), "", x.into()); - tree.flush_active_memtable(0).unwrap(); - } - - let key = (segment_count / 2).to_be_bytes(); - - group.bench_function( - format!("find segment in {segment_count} segments - binary search"), - |b| { - let levels = tree.levels.read().unwrap(); - let first_level = levels.levels.first().expect("should exist"); - - b.iter(|| { - first_level - .as_disjoint() - .expect("should be disjoint") - .get_segment_containing_key(&key) - .expect("should exist") - }); - }, - ); - - group.bench_function( - format!("find segment in {segment_count} segments - linear search"), - |b| { - let levels = tree.levels.read().unwrap(); - let first_level = levels.levels.first().expect("should exist"); - - b.iter(|| { - first_level - .iter() - .find(|x| x.metadata.key_range.contains_key(&key)) - .expect("should exist"); - }); - }, - ); - } -} - -criterion_group!(benches, iterate_segments, find_segment); +criterion_group!(benches, iterate_segments); criterion_main!(benches); diff --git a/benches/memtable.rs b/benches/memtable.rs index e7d201fe4..6033c0781 100644 --- a/benches/memtable.rs +++ b/benches/memtable.rs @@ -3,7 +3,7 @@ use lsm_tree::{InternalValue, Memtable}; use nanoid::nanoid; fn memtable_get_hit(c: &mut Criterion) { - let memtable = Memtable::default(); + let memtable = Memtable::new(0_u64); memtable.insert(InternalValue::from_components( "abc_w5wa35aw35naw", @@ -25,14 +25,14 @@ fn memtable_get_hit(c: &mut Criterion) { b.iter(|| { assert_eq!( [1, 2, 3], - &*memtable.get(b"abc_w5wa35aw35naw", None).unwrap().value, + &*memtable.get(b"abc_w5wa35aw35naw", u64::MAX).unwrap().value, ) }); }); } fn memtable_get_snapshot(c: &mut Criterion) { - let memtable = Memtable::default(); + let memtable = Memtable::new(0_u64); memtable.insert(InternalValue::from_components( "abc_w5wa35aw35naw", @@ -60,14 +60,14 @@ fn memtable_get_snapshot(c: &mut Criterion) { b.iter(|| { assert_eq!( [1, 2, 3], - &*memtable.get(b"abc_w5wa35aw35naw", Some(1)).unwrap().value, + &*memtable.get(b"abc_w5wa35aw35naw", 1).unwrap().value, ); }); }); } fn memtable_get_miss(c: &mut Criterion) { - let memtable = Memtable::default(); + let memtable = Memtable::new(0_u64); for _ in 0..1_000_000 { memtable.insert(InternalValue::from_components( @@ -79,13 +79,13 @@ fn memtable_get_miss(c: &mut Criterion) { } c.bench_function("memtable get miss", |b| { - b.iter(|| assert!(memtable.get(b"abc_564321", None).is_none())); + b.iter(|| assert!(memtable.get(b"abc_564321", u64::MAX).is_none())); }); } fn memtable_highest_seqno(c: &mut Criterion) { c.bench_function("memtable highest seqno", |b| { - let memtable = Memtable::default(); + let memtable = Memtable::new(0_u64); for x in 0..100_000 { memtable.insert(InternalValue::from_components( diff --git a/benches/merge.rs b/benches/merge.rs index 8072e893c..6b52b145a 100644 --- a/benches/merge.rs +++ b/benches/merge.rs @@ -8,7 +8,7 @@ fn merger(c: &mut Criterion) { c.bench_function(&format!("Merge {num}"), |b| { let memtables = (0..num) .map(|_| { - let table = Memtable::default(); + let table = Memtable::new(0_u64); for _ in 0..100 { table.insert(InternalValue::from_components( @@ -26,7 +26,7 @@ fn merger(c: &mut Criterion) { b.iter_with_large_drop(|| { let iters = memtables .iter() - .map(|x| x.iter().map(Ok)) + .map(|x| x.iter().map(Ok::<_, lsm_tree::Error>)) .map(|x| Box::new(x) as BoxedIterator<'_>) .collect(); @@ -43,7 +43,7 @@ fn mvcc_stream(c: &mut Criterion) { c.bench_function(&format!("MVCC stream {num} versions"), |b| { let memtables = (0..num) .map(|_| { - let table = Memtable::default(); + let table = Memtable::new(0_u64); for key in 'a'..='z' { table.insert(InternalValue::from_components( @@ -61,7 +61,7 @@ fn mvcc_stream(c: &mut Criterion) { b.iter_with_large_drop(|| { let iters = memtables .iter() - .map(|x| x.iter().map(Ok)) + .map(|x| x.iter().map(Ok::<_, lsm_tree::Error>)) .map(|x| Box::new(x) as BoxedIterator<'_>) .collect(); diff --git a/benches/partition_point.rs b/benches/partition_point.rs deleted file mode 100644 index dbbe8382a..000000000 --- a/benches/partition_point.rs +++ /dev/null @@ -1,30 +0,0 @@ -use criterion::{criterion_group, criterion_main, Criterion}; -use lsm_tree::binary_search::partition_point; -use rand::Rng; - -fn bench_partition_point(c: &mut Criterion) { - let mut group = c.benchmark_group("partition_point"); - - let mut rng = rand::rng(); - - for item_count in [10, 100, 1_000, 10_000, 100_000, 1_000_000] { - let items = (0..item_count).collect::>(); - - group.bench_function(format!("native {item_count}"), |b| { - b.iter(|| { - let needle = rng.random_range(0..item_count); - items.partition_point(|&x| x <= needle) - }) - }); - - group.bench_function(format!("rewrite {item_count}"), |b| { - b.iter(|| { - let needle = rng.random_range(0..item_count); - partition_point(&items, |&x| x <= needle) - }) - }); - } -} - -criterion_group!(benches, bench_partition_point); -criterion_main!(benches); diff --git a/benches/tli.rs b/benches/tli.rs deleted file mode 100644 index 1b9dea651..000000000 --- a/benches/tli.rs +++ /dev/null @@ -1,51 +0,0 @@ -use criterion::{criterion_group, criterion_main, Criterion}; -use lsm_tree::table::{ - block::offset::BlockOffset, block_index::KeyedBlockIndex, value_block::CachePolicy, -}; -use rand::Rng; - -fn tli_find_item(c: &mut Criterion) { - use lsm_tree::table::block_index::{block_handle::KeyedBlockHandle, top_level::TopLevelIndex}; - - let mut group = c.benchmark_group("TLI find item"); - - for item_count in [10u64, 100, 1_000, 10_000, 25_000, 100_000] { - let items = { - let mut items = Vec::with_capacity(item_count as usize); - - for x in 0..item_count { - items.push(KeyedBlockHandle { - end_key: x.to_be_bytes().into(), - offset: BlockOffset(x), - }); - } - - items - }; - - let index = TopLevelIndex::from_boxed_slice(items.into()); - - let mut rng = rand::rng(); - - group.bench_function( - format!("TLI get_block_containing_item ({item_count} items)"), - |b| { - b.iter(|| { - let needle = rng.random_range(0..item_count).to_be_bytes(); - - assert_eq!( - needle, - &*index - .get_lowest_block_containing_key(&needle, CachePolicy::Read) - .unwrap() - .unwrap() - .end_key, - ); - }) - }, - ); - } -} - -criterion_group!(benches, tli_find_item); -criterion_main!(benches); diff --git a/benches/tree.rs b/benches/tree.rs index 067308aa4..74a974e8f 100644 --- a/benches/tree.rs +++ b/benches/tree.rs @@ -1,5 +1,8 @@ -use criterion::{criterion_group, criterion_main, Criterion}; -use lsm_tree::{AbstractTree, BlockCache, Config}; +use criterion::{criterion_group, criterion_main, BatchSize, Criterion, Throughput}; +use lsm_tree::{ + config::BlockSizePolicy, AbstractTree, BatchItem, Cache, Config, Guard, SeqNo, + SequenceNumberCounter, +}; use std::sync::Arc; use tempfile::tempdir; @@ -11,10 +14,14 @@ fn full_scan(c: &mut Criterion) { group.bench_function(format!("scan all uncached, {item_count} items"), |b| { let path = tempdir().unwrap(); - let tree = Config::new(path) - .block_cache(BlockCache::with_capacity_bytes(0).into()) - .open() - .unwrap(); + let tree = Config::new( + path.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .use_cache(Arc::new(Cache::with_capacity_bytes(0))) + .open() + .unwrap(); for x in 0_u32..item_count { let key = x.to_be_bytes(); @@ -25,17 +32,21 @@ fn full_scan(c: &mut Criterion) { tree.flush_active_memtable(0).unwrap(); b.iter(|| { - assert_eq!(tree.len(None, None).unwrap(), item_count as usize); + assert_eq!(tree.len(SeqNo::MAX, None).unwrap(), item_count as usize); }) }); group.bench_function(format!("scan all cached, {item_count} items"), |b| { let path = tempdir().unwrap(); - let tree = Config::new(path) - .block_cache(BlockCache::with_capacity_bytes(100_000_000).into()) - .open() - .unwrap(); + let tree = Config::new( + path.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .use_cache(Arc::new(Cache::with_capacity_bytes(100_000_000))) + .open() + .unwrap(); for x in 0_u32..item_count { let key = x.to_be_bytes(); @@ -44,10 +55,10 @@ fn full_scan(c: &mut Criterion) { } tree.flush_active_memtable(0).unwrap(); - assert_eq!(tree.len(None, None).unwrap(), item_count as usize); + assert_eq!(tree.len(SeqNo::MAX, None).unwrap(), item_count as usize); b.iter(|| { - assert_eq!(tree.len(None, None).unwrap(), item_count as usize); + assert_eq!(tree.len(SeqNo::MAX, None).unwrap(), item_count as usize); }) }); } @@ -61,10 +72,14 @@ fn scan_vs_query(c: &mut Criterion) { for size in [100_000, 1_000_000] { let path = tempdir().unwrap(); - let tree = Config::new(path) - .block_cache(BlockCache::with_capacity_bytes(0).into()) - .open() - .unwrap(); + let tree = Config::new( + path.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .use_cache(Arc::new(Cache::with_capacity_bytes(0))) + .open() + .unwrap(); for x in 0..size as u64 { let key = x.to_be_bytes().to_vec(); @@ -73,22 +88,19 @@ fn scan_vs_query(c: &mut Criterion) { } tree.flush_active_memtable(0).unwrap(); - assert_eq!(tree.len(None, None).unwrap(), size); + assert_eq!(tree.len(SeqNo::MAX, None).unwrap(), size); group.sample_size(10); group.bench_function(format!("scan {} (uncached)", size), |b| { b.iter(|| { - let iter = tree.iter(None, None); - let iter = iter.into_iter(); + let iter = tree.iter(SeqNo::MAX, None); let count = iter - .filter(|x| match x { - Ok((key, _)) => { - let buf = &key[..8]; - let (int_bytes, _rest) = buf.split_at(std::mem::size_of::()); - let num = u64::from_be_bytes(int_bytes.try_into().unwrap()); - (60000..60010).contains(&num) - } - Err(_) => false, + .filter_map(|guard| { + let (key, _) = guard.into_inner().ok()?; + let buf = &key[..8]; + let (int_bytes, _rest) = buf.split_at(std::mem::size_of::()); + let num = u64::from_be_bytes(int_bytes.try_into().unwrap()); + (60000..60010).contains(&num).then_some(()) }) .count(); assert_eq!(count, 10); @@ -101,7 +113,7 @@ fn scan_vs_query(c: &mut Criterion) { Included(60000_u64.to_be_bytes().to_vec()), Excluded(60010_u64.to_be_bytes().to_vec()), ), - None, + SeqNo::MAX, None, ); let iter = iter.into_iter(); @@ -115,7 +127,7 @@ fn scan_vs_query(c: &mut Criterion) { Included(60000_u64.to_be_bytes().to_vec()), Excluded(60010_u64.to_be_bytes().to_vec()), ), - None, + SeqNo::MAX, None, ); let iter = iter.into_iter(); @@ -131,10 +143,14 @@ fn scan_vs_prefix(c: &mut Criterion) { for size in [10_000, 100_000, 1_000_000] { let path = tempdir().unwrap(); - let tree = Config::new(path) - .block_cache(BlockCache::with_capacity_bytes(0).into()) - .open() - .unwrap(); + let tree = Config::new( + path.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .use_cache(Arc::new(Cache::with_capacity_bytes(0))) + .open() + .unwrap(); for _ in 0..size { let key = nanoid::nanoid!(); @@ -151,28 +167,30 @@ fn scan_vs_prefix(c: &mut Criterion) { } tree.flush_active_memtable(0).unwrap(); - assert_eq!(tree.len(None, None).unwrap() as u64, size + 10); + assert_eq!(tree.len(SeqNo::MAX, None).unwrap() as u64, size + 10); group.sample_size(10); group.bench_function(format!("scan {} (uncached)", size), |b| { b.iter(|| { - let iter = tree.iter(None, None); - let iter = iter.filter(|x| match x { - Ok((key, _)) => key.starts_with(prefix.as_bytes()), - Err(_) => false, - }); - assert_eq!(iter.count(), 10); + let count = tree + .iter(SeqNo::MAX, None) + .filter_map(|guard| { + let (key, _) = guard.into_inner().ok()?; + key.starts_with(prefix.as_bytes()).then_some(()) + }) + .count(); + assert_eq!(count, 10); }); }); group.bench_function(format!("prefix {} (uncached)", size), |b| { b.iter(|| { - let iter = tree.prefix(prefix, None, None); + let iter = tree.prefix(prefix, SeqNo::MAX, None); assert_eq!(iter.count(), 10); }); }); group.bench_function(format!("prefix rev {} (uncached)", size), |b| { b.iter(|| { - let iter = tree.prefix(prefix, None, None); + let iter = tree.prefix(prefix, SeqNo::MAX, None); assert_eq!(iter.rev().count(), 10); }); }); @@ -186,11 +204,15 @@ fn tree_get_pairs(c: &mut Criterion) { for segment_count in [1, 2, 4, 8, 16, 32, 64, 128, 256, 512] { { let folder = tempfile::tempdir().unwrap(); - let tree = Config::new(folder) - .data_block_size(1_024) - .block_cache(Arc::new(BlockCache::with_capacity_bytes(0))) - .open() - .unwrap(); + let tree = Config::new( + folder.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .data_block_size_policy(BlockSizePolicy::all(1_024)) + .use_cache(Arc::new(Cache::with_capacity_bytes(0))) + .open() + .unwrap(); let mut x = 0_u64; @@ -207,7 +229,7 @@ fn tree_get_pairs(c: &mut Criterion) { &format!("Tree::first_key_value (disjoint), {segment_count} segments"), |b| { b.iter(|| { - assert!(tree.first_key_value(None, None).unwrap().is_some()); + assert!(tree.first_key_value(SeqNo::MAX, None).is_some()); }); }, ); @@ -216,7 +238,7 @@ fn tree_get_pairs(c: &mut Criterion) { &format!("Tree::last_key_value (disjoint), {segment_count} segments"), |b| { b.iter(|| { - assert!(tree.last_key_value(None, None).unwrap().is_some()); + assert!(tree.last_key_value(SeqNo::MAX, None).is_some()); }); }, ); @@ -224,11 +246,15 @@ fn tree_get_pairs(c: &mut Criterion) { { let folder = tempfile::tempdir().unwrap(); - let tree = Config::new(folder) - .data_block_size(1_024) - .block_cache(Arc::new(BlockCache::with_capacity_bytes(0))) - .open() - .unwrap(); + let tree = Config::new( + folder.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .data_block_size_policy(BlockSizePolicy::all(1_024)) + .use_cache(Arc::new(Cache::with_capacity_bytes(0))) + .open() + .unwrap(); let mut x = 0_u64; @@ -247,7 +273,7 @@ fn tree_get_pairs(c: &mut Criterion) { &format!("Tree::first_key_value (non-disjoint), {segment_count} segments"), |b| { b.iter(|| { - assert!(tree.first_key_value(None, None).unwrap().is_some()); + assert!(tree.first_key_value(SeqNo::MAX, None).is_some()); }); }, ); @@ -256,7 +282,7 @@ fn tree_get_pairs(c: &mut Criterion) { &format!("Tree::last_key_value (non-disjoint), {segment_count} segments"), |b| { b.iter(|| { - assert!(tree.last_key_value(None, None).unwrap().is_some()); + assert!(tree.last_key_value(SeqNo::MAX, None).is_some()); }); }, ); @@ -267,11 +293,15 @@ fn tree_get_pairs(c: &mut Criterion) { fn disk_point_read(c: &mut Criterion) { let folder = tempdir().unwrap(); - let tree = Config::new(folder) - .data_block_size(1_024) - .block_cache(Arc::new(BlockCache::with_capacity_bytes(0))) - .open() - .unwrap(); + let tree = Config::new( + folder.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .data_block_size_policy(BlockSizePolicy::all(1_024)) + .use_cache(Arc::new(Cache::with_capacity_bytes(0))) + .open() + .unwrap(); for seqno in 0..5 { tree.insert("a", "b", seqno); @@ -287,29 +317,30 @@ fn disk_point_read(c: &mut Criterion) { let tree = tree.clone(); b.iter(|| { - tree.get("a", None).unwrap().unwrap(); + tree.get("a", SeqNo::MAX).unwrap().unwrap(); }); }); - c.bench_function("point read w/ seqno latest (uncached)", |b| { - let snapshot = tree.snapshot(5); - + c.bench_function("point read w/ seqno (uncached)", |b| { b.iter(|| { - snapshot.get("a").unwrap().unwrap(); + tree.get("a", 5).unwrap().unwrap(); }); }); } fn disjoint_tree_minmax(c: &mut Criterion) { let mut group = c.benchmark_group("Disjoint tree"); - let folder = tempfile::tempdir().unwrap(); - let tree = Config::new(folder) - .data_block_size(1_024) - .block_cache(Arc::new(BlockCache::with_capacity_bytes(0))) - .open() - .unwrap(); + let tree = Config::new( + folder.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .data_block_size_policy(BlockSizePolicy::all(1_024)) + .use_cache(Arc::new(Cache::with_capacity_bytes(0))) + .open() + .unwrap(); tree.insert("a", "a", 0); tree.flush_active_memtable(0).unwrap(); @@ -346,13 +377,23 @@ fn disjoint_tree_minmax(c: &mut Criterion) { group.bench_function("Tree::first_key_value".to_string(), |b| { b.iter(|| { - assert_eq!(&*tree.first_key_value(None, None).unwrap().unwrap().1, b"a"); + let (_, val) = tree + .first_key_value(SeqNo::MAX, None) + .unwrap() + .into_inner() + .unwrap(); + assert_eq!(&*val, b"a"); }); }); group.bench_function("Tree::last_key_value".to_string(), |b| { b.iter(|| { - assert_eq!(&*tree.last_key_value(None, None).unwrap().unwrap().1, b"g"); + let (_, val) = tree + .last_key_value(SeqNo::MAX, None) + .unwrap() + .into_inner() + .unwrap(); + assert_eq!(&*val, b"g"); }); }); } @@ -360,22 +401,78 @@ fn disjoint_tree_minmax(c: &mut Criterion) { fn blob_tree_get(c: &mut Criterion) { let folder = tempfile::tempdir().unwrap(); - let tree = Config::new(folder.path()) - .block_cache(BlockCache::with_capacity_bytes(0).into()) - .open_as_blob_tree() - .unwrap(); + let tree = Config::new( + folder.path(), + SequenceNumberCounter::default(), + SequenceNumberCounter::default(), + ) + .use_cache(Arc::new(Cache::with_capacity_bytes(0))) + .with_kv_separation(Some(Default::default())) + .open() + .unwrap(); let value = b"powek5bowa".repeat(100); tree.insert("mykey", &value, 0); + tree.flush_active_memtable(0).unwrap(); c.bench_function("blob tree get", |b| { b.iter(|| { - tree.get("mykey", None).unwrap().unwrap(); + tree.get("mykey", SeqNo::MAX).unwrap().unwrap(); }); }); } +fn tree_batch_write(c: &mut Criterion) { + let mut group = c.benchmark_group("write batch"); + group.sample_size(10); + + for batch_size in [10, 100, 1_000, 10_000] { + //group.throughput(Throughput::Elements(batch_size)); + + // prepare items outside the timed section + let items: Vec<_> = (0..batch_size) + .map(|i: u64| (i.to_be_bytes().to_vec(), b"value".to_vec())) + .collect(); + + group.bench_function(format!("naive loop, {batch_size} items"), |b| { + b.iter_batched( + || { + let path = tempdir().unwrap(); + let tree = Config::new(path.path(), Default::default(), Default::default()) + .open() + .unwrap(); + (path, tree) + }, + |(_path, tree)| { + for (k, v) in &items { + tree.insert(k.clone(), v.clone(), 0); + } + }, + criterion::BatchSize::PerIteration, + ); + }); + group.bench_function(format!("write_batch, {batch_size} items"), |b| { + b.iter_batched( + || { + let path = tempdir().unwrap(); + let tree = Config::new(path.path(), Default::default(), Default::default()) + .open() + .unwrap(); + let batch = items + .iter() + .map(|(k, v)| BatchItem::Insert(k.clone(), v.clone())); + (path, tree, batch) + }, + |(_path, tree, batch)| { + tree.write_batch(batch, 0); + }, + criterion::BatchSize::PerIteration, + ); + }); + } +} + // TODO: benchmark point read disjoint vs non-disjoint level vs disjoint *tree* // TODO: benchmark .prefix().next() and .next_back(), disjoint and non-disjoint @@ -387,6 +484,7 @@ criterion_group!( full_scan, scan_vs_query, scan_vs_prefix, + tree_batch_write, tree_get_pairs, ); criterion_main!(benches); diff --git a/src/abstract_tree.rs b/src/abstract_tree.rs index d5585dd5b..1456109fd 100644 --- a/src/abstract_tree.rs +++ b/src/abstract_tree.rs @@ -13,6 +13,25 @@ use std::{ pub type RangeItem = crate::Result; +/// A single operation within a write batch. +/// +/// Used with [`AbstractTree::write_batch`] to perform multiple +/// insert and delete operations using a single memtable lookup. +pub enum BatchItem, V: Into> { + /// Insert a key-value pair. + Insert(K, V), + + /// Remove a key (strong tombstone). + Remove(K), + + /// Remove a key (weak tombstone). + /// + /// The tombstone marker will vanish when it collides with its + /// corresponding insertion. Should only be used when a key is + /// only ever written once. + RemoveWeak(K), +} + type FlushToTablesResult = (Vec, Option>); /// Generic Tree API @@ -539,6 +558,39 @@ pub trait AbstractTree { seqno: SeqNo, ) -> (u64, u64); + /// Writes a batch of operations into the tree. + /// + /// This is more efficient than calling [`insert`](AbstractTree::insert), + /// [`remove`](AbstractTree::remove), or [`remove_weak`](AbstractTree::remove_weak) + /// in a loop, because the active memtable is resolved once instead of + /// once per item. + /// + /// Returns the size sum of all inserted entries and the new size of the memtable. + /// + /// # Examples + /// + /// ``` + /// # let folder = tempfile::tempdir()?; + /// use lsm_tree::{AbstractTree, BatchItem, Config, Tree}; + /// + /// let tree = Config::new(folder, Default::default(), Default::default()).open()?; + /// + /// let batch = vec![ + /// BatchItem::Insert("a", "hello"), + /// BatchItem::Insert("b", "world"), + /// BatchItem::Remove("c"), + /// ]; + /// + /// tree.write_batch(batch, 0); + /// # + /// # Ok::<(), lsm_tree::Error>(()) + /// ``` + fn write_batch(&self, it: I, seqno: SeqNo) -> (u64, u64) + where + K: Into, + V: Into, + I: IntoIterator>; + /// Removes an item from the tree. /// /// Returns the added item's size and new size of the memtable. diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 73ea1c119..8ec2875e7 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -10,7 +10,7 @@ pub mod ingest; pub use gc::{FragmentationEntry, FragmentationMap}; use crate::{ - abstract_tree::{AbstractTree, RangeItem}, + abstract_tree::RangeItem, coding::Decode, iter_guard::{IterGuard, IterGuardImpl}, table::Table, @@ -18,7 +18,7 @@ use crate::{ value::InternalValue, version::Version, vlog::{Accessor, BlobFile, BlobFileWriter}, - Cache, Config, Memtable, SeqNo, TableId, TreeId, UserKey, UserValue, + AbstractTree, BatchItem, Cache, Config, Memtable, SeqNo, TableId, TreeId, UserKey, UserValue, }; use handle::BlobIndirection; use std::{ @@ -617,4 +617,13 @@ impl AbstractTree for BlobTree { fn remove_weak>(&self, key: K, seqno: SeqNo) -> (u64, u64) { self.index.remove_weak(key, seqno) } + + fn write_batch(&self, it: I, seqno: SeqNo) -> (u64, u64) + where + K: Into, + V: Into, + I: IntoIterator>, + { + self.index.write_batch(it, seqno) + } } diff --git a/src/lib.rs b/src/lib.rs index d429d71c5..a0bd66b46 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -171,7 +171,7 @@ pub use { }; pub use { - abstract_tree::AbstractTree, + abstract_tree::{AbstractTree, BatchItem}, any_tree::AnyTree, blob_tree::BlobTree, cache::Cache, diff --git a/src/memtable/mod.rs b/src/memtable/mod.rs index ced9c12b7..e2d0cba19 100644 --- a/src/memtable/mod.rs +++ b/src/memtable/mod.rs @@ -166,6 +166,60 @@ impl Memtable { (item_size, size_before + item_size) } + /// Inserts a batch of items into the memtable. + /// + /// Returns the added items' total size and new size of the memtable. + #[doc(hidden)] + pub fn insert_batch(&self, items: I) -> (u64, u64) + where + I: IntoIterator, + { + let items: Vec = items.into_iter().collect(); + let mut total_item_size = 0_u64; + let mut max_seqno: Option = None; + + for item in &items { + #[expect( + clippy::expect_used, + reason = "keys are limited to 16-bit length + values are limited to 32-bit length" + )] + let item_size: u64 = + (item.key.user_key.len() + item.value.len() + std::mem::size_of::()) + .try_into() + .expect("should fit into u64"); + + total_item_size += item_size; + max_seqno = Some(match max_seqno { + Some(current) => current.max(item.key.seqno), + None => item.key.seqno, + }); + } + + if total_item_size == 0 { + return ( + 0, + self.approximate_size + .load(std::sync::atomic::Ordering::Acquire), + ); + } + + let size_before = self + .approximate_size + .fetch_add(total_item_size, std::sync::atomic::Ordering::AcqRel); + + if let Some(seqno) = max_seqno { + self.highest_seqno + .fetch_max(seqno, std::sync::atomic::Ordering::AcqRel); + } + + for item in items { + let key = InternalKey::new(item.key.user_key, item.key.seqno, item.key.value_type); + self.items.insert(key, item.value); + } + + (total_item_size, size_before + total_item_size) + } + /// Returns the highest sequence number in the memtable. pub fn get_highest_seqno(&self) -> Option { if self.is_empty() { @@ -325,6 +379,29 @@ mod tests { ); } + #[test] + fn memtable_insert_batch() { + let memtable = Memtable::new(0); + + let (written_size, new_size) = memtable.insert_batch(vec![ + InternalValue::from_components(b"a".to_vec(), b"1".to_vec(), 1, ValueType::Value), + InternalValue::from_components(b"b".to_vec(), b"2".to_vec(), 3, ValueType::Value), + InternalValue::from_components(b"c".to_vec(), b"3".to_vec(), 2, ValueType::Value), + ]); + + assert_eq!(written_size, new_size); + assert_eq!(Some(3), memtable.get_highest_seqno()); + assert_eq!( + Some(InternalValue::from_components( + b"a".to_vec(), + b"1".to_vec(), + 1, + ValueType::Value, + )), + memtable.get(b"a", SeqNo::MAX) + ); + } + #[test] fn memtable_get_old_version() { let memtable = Memtable::new(0); diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 453e9891c..e04561307 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -20,8 +20,8 @@ use crate::{ value::InternalValue, version::{recovery::recover, SuperVersion, SuperVersions, Version}, vlog::BlobFile, - AbstractTree, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, UserValue, - ValueType, + AbstractTree, BatchItem, Checksum, KvPair, SeqNo, SequenceNumberCounter, TableId, UserKey, + UserValue, ValueType, }; use inner::{TreeId, TreeInner}; use std::{ @@ -664,6 +664,24 @@ impl AbstractTree for Tree { let value = InternalValue::new_weak_tombstone(key, seqno); self.append_entry(value) } + + fn write_batch(&self, it: I, seqno: SeqNo) -> (u64, u64) + where + K: Into, + V: Into, + I: IntoIterator>, + { + let active_memtable = self.active_memtable(); + let items = it.into_iter().map(|item| match item { + BatchItem::Insert(key, value) => { + InternalValue::from_components(key, value, seqno, ValueType::Value) + } + BatchItem::Remove(key) => InternalValue::new_tombstone(key, seqno), + BatchItem::RemoveWeak(key) => InternalValue::new_weak_tombstone(key, seqno), + }); + + active_memtable.insert_batch(items) + } } impl Tree {