Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,9 @@ name = "batch_ops"
harness = false
path = "benches/batch_ops.rs"
required-features = []

[[bench]]
name = "zstd_dict"
harness = false
path = "benches/zstd_dict.rs"
required-features = []
94 changes: 94 additions & 0 deletions benches/zstd_dict.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
// Copyright (c) 2025-present, Structured World Foundation
// This source code is licensed under the Apache 2.0 License
// (found in the LICENSE-APACHE file in the repository)

//! Benchmark: per-block zstd dictionary decompression latency.
//!
//! Measures the cost of `decompress_with_dict` for a single compressed block,
//! both cold (first call, dictionary not yet cached in the backend's TLS /
//! `OnceLock`) and warm (subsequent calls, dictionary already cached).
//!
//! Run with:
//!
//! ```text
//! cargo bench --bench zstd_dict --features zstd # C FFI backend
//! cargo bench --bench zstd_dict --features zstd-pure # pure Rust backend
//! ```

use criterion::{BatchSize, Criterion, criterion_group, criterion_main};
use lsm_tree::compression::ZstdDictionary;

Comment thread
polaz marked this conversation as resolved.
Outdated
Comment thread
polaz marked this conversation as resolved.
Outdated
// --- test fixtures -------------------------------------------------------

/// Pre-trained zstd dictionary (206 bytes).
///
/// Generated with the `zstd` C library from 100 samples × 32 bytes
/// (cycling pattern 0..4).
const DICT: &[u8] = &[
55, 164, 48, 236, 98, 64, 12, 7, 42, 16, 120, 62, 7, 204, 192, 51, 240, 12, 60, 3, 207, 192,
51, 240, 12, 60, 3, 207, 192, 51, 24, 17, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 128, 48,
165, 148, 2, 227, 76, 8, 33, 132, 16, 66, 136, 136, 136, 60, 84, 160, 64, 65, 65, 65, 65, 65,
65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 193, 231, 162,
40, 138, 162, 40, 138, 162, 40, 165, 148, 82, 74, 169, 170, 234, 1, 100, 160, 170, 193, 96, 48,
24, 12, 6, 131, 193, 96, 48, 12, 195, 48, 12, 195, 48, 12, 195, 48, 198, 24, 99, 140, 153, 29,
1, 0, 0, 0, 4, 0, 0, 0, 8, 0, 0, 0, 3, 3, 3, 3, 3, 3, 3, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2,
2, 2, 2, 2, 2, 2,
];

/// Compressed frame for b"hello world hello world hello world" (35 bytes).
const COMPRESSED: &[u8] = &[
40, 181, 47, 253, 35, 98, 64, 12, 7, 35, 149, 0, 0, 96, 104, 101, 108, 108, 111, 32, 119, 111,
114, 108, 100, 32, 1, 0, 175, 75, 18,
];

const PLAINTEXT_LEN: usize = 35;

// -------------------------------------------------------------------------

#[cfg(zstd_any)]
fn bench_decompress_with_dict(c: &mut Criterion) {
use lsm_tree::compression::{CompressionProvider, ZstdBackend as Backend};

// Warm benchmark: cache is populated before timing starts.
// Represents steady-state per-block decompression cost.
let warm_dict = ZstdDictionary::new(DICT);
Backend::decompress_with_dict(COMPRESSED, &warm_dict, PLAINTEXT_LEN + 1)
.expect("pre-warm decompression failed");

c.bench_function("decompress_with_dict/warm", |b| {
b.iter(|| {
Backend::decompress_with_dict(
std::hint::black_box(COMPRESSED),
std::hint::black_box(&warm_dict),
PLAINTEXT_LEN + 1,
)
.expect("decompression failed")
});
});

// Cold benchmark: fresh `ZstdDictionary` on each iteration so the cache
// is unpopulated. Measures first-call cost (dict parse + decompress).
c.bench_function("decompress_with_dict/cold", |b| {
b.iter_batched(
|| ZstdDictionary::new(DICT),
|d| {
Backend::decompress_with_dict(
std::hint::black_box(COMPRESSED),
std::hint::black_box(&d),
PLAINTEXT_LEN + 1,
)
.expect("decompression failed")
},
BatchSize::SmallInput,
);
Comment thread
polaz marked this conversation as resolved.
Outdated
});
Comment thread
polaz marked this conversation as resolved.
Outdated
}

#[cfg(not(zstd_any))]
fn bench_decompress_with_dict(_c: &mut Criterion) {
// Neither zstd nor zstd-pure feature enabled — nothing to bench.
}

criterion_group!(benches, bench_decompress_with_dict);
criterion_main!(benches);
53 changes: 50 additions & 3 deletions src/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@ pub trait CompressionProvider {
fn compress_with_dict(data: &[u8], level: i32, dict_raw: &[u8]) -> crate::Result<Vec<u8>>;

/// Decompress a zstd frame that was compressed with a dictionary.
///
/// `dict` exposes the raw dictionary bytes **and** a lazily-initialized
/// pre-compiled form (C FFI backend: `ZSTD_DDict`; pure Rust backend:
/// cached `FrameDecoder` in thread-local storage). Backends must use the
/// prepared form to avoid re-parsing the dictionary on every call.
fn decompress_with_dict(
data: &[u8],
dict_raw: &[u8],
dict: &ZstdDictionary,
capacity: usize,
) -> crate::Result<Vec<u8>>;
}
Expand Down Expand Up @@ -76,10 +81,33 @@ pub type ZstdBackend = zstd_pure::ZstdPureProvider;
/// let dict = ZstdDictionary::new(samples);
/// ```
#[cfg(zstd_any)]
#[derive(Clone)]
pub struct ZstdDictionary {
id: u32,
raw: Arc<[u8]>,

/// Pre-compiled decompressor dictionary, lazily initialized on first use.
///
/// Wrapped in `Arc<OnceLock<…>>` so all clones share the same compiled
/// instance — `ZSTD_DDict` is created exactly once per unique dictionary,
/// regardless of how many table readers hold a reference to it.
Comment thread
polaz marked this conversation as resolved.
Outdated
///
/// Available only with the C FFI backend (`zstd` feature). The pure Rust
/// backend caches an equivalent `FrameDecoder` in thread-local storage
/// inside `decompress_with_dict` instead.
#[cfg(feature = "zstd")]
prepared: Arc<std::sync::OnceLock<zstd::dict::DecoderDictionary<'static>>>,
}

#[cfg(zstd_any)]
impl Clone for ZstdDictionary {
fn clone(&self) -> Self {
Self {
id: self.id,
raw: Arc::clone(&self.raw),
#[cfg(feature = "zstd")]
prepared: Arc::clone(&self.prepared),
}
}
}

#[cfg(zstd_any)]
Expand All @@ -94,9 +122,28 @@ impl ZstdDictionary {
Self {
id: compute_dict_id(raw),
raw: Arc::from(raw),
#[cfg(feature = "zstd")]
prepared: Arc::new(std::sync::OnceLock::new()),
}
Comment thread
polaz marked this conversation as resolved.
}

/// Returns the lazily-initialized pre-compiled decompressor dictionary.
///
/// On first call this copies the raw bytes into a `ZSTD_DDict` (C
/// library's opaque pre-parsed form) and caches the result inside this
/// `ZstdDictionary`. Subsequent calls — from any thread — return the
/// cached reference with no further allocation or parsing.
///
/// Using this together with
/// [`zstd::bulk::Decompressor::with_prepared_dictionary`] eliminates the
/// per-block `ZSTD_createDDict` call that was previously paid on every
/// `decompress_with_dict` invocation.
#[cfg(feature = "zstd")]
pub(crate) fn decoder_dict(&self) -> &zstd::dict::DecoderDictionary<'static> {
self.prepared
.get_or_init(|| zstd::dict::DecoderDictionary::copy(&self.raw))
}

/// Returns the dictionary ID (truncated xxh3 hash of the raw bytes).
#[must_use]
pub fn id(&self) -> u32 {
Expand All @@ -116,7 +163,7 @@ impl std::fmt::Debug for ZstdDictionary {
f.debug_struct("ZstdDictionary")
.field("id", &format_args!("{:#010x}", self.id))
.field("size", &self.raw.len())
.finish()
.finish_non_exhaustive() // `prepared` cache omitted — implementation detail
}
}

Expand Down
9 changes: 6 additions & 3 deletions src/compression/zstd_ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ impl CompressionProvider for ZstdFfiProvider {

fn decompress_with_dict(
data: &[u8],
dict_raw: &[u8],
dict: &crate::compression::ZstdDictionary,
capacity: usize,
) -> crate::Result<Vec<u8>> {
let mut decompressor = zstd::bulk::Decompressor::with_dictionary(dict_raw)
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))?;
// `dict.decoder_dict()` returns a cached `ZSTD_DDict`, avoiding
// `ZSTD_createDDict` (which re-parses the raw bytes) on every call.
let mut decompressor =
zstd::bulk::Decompressor::with_prepared_dictionary(dict.decoder_dict())
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))?;
decompressor
.decompress(data, capacity)
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))
Expand Down
128 changes: 112 additions & 16 deletions src/compression/zstd_pure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,25 +85,121 @@ impl CompressionProvider for ZstdPureProvider {

fn decompress_with_dict(
data: &[u8],
dict_raw: &[u8],
dict: &crate::compression::ZstdDictionary,
capacity: usize,
) -> crate::Result<Vec<u8>> {
// NOTE: Dictionary is re-parsed from raw bytes on every call.
// The C FFI backend has the same per-call overhead (Decompressor::with_dictionary
// also re-initializes). Caching would require adding precompiled dictionary
// state to the CompressionProvider trait, which is a Phase 2 optimization.
let dict = structured_zstd::decoding::Dictionary::decode_dict(dict_raw)
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))?;
use structured_zstd::decoding::{Dictionary, FrameDecoder};

// FrameDecoder supports dictionaries (unlike StreamingDecoder).
let mut decoder = structured_zstd::decoding::FrameDecoder::new();
decoder
.add_dict(dict)
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))?;
decoder
.init(data)
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))?;
// Thread-local `FrameDecoder` with the dictionary pre-loaded.
//
// Parsing a zstd dictionary involves building Huffman and FSE decoding
// tables — expensive relative to per-block decompression for the small
// 4–64 KiB blocks typical in LSM-trees. Caching the `FrameDecoder`
// instance across calls amortises this cost: the dictionary is parsed
// exactly once per thread, per distinct dictionary.
//
// Thread-local storage is appropriate because `FrameDecoder` is not
// `Send` and each thread decompresses independently; no mutex is
// needed. If the active dictionary changes (e.g. different table),
// the decoder is re-initialised transparently.
thread_local! {
static TLS_DECODER: std::cell::RefCell<Option<(u32, FrameDecoder)>> =
const { std::cell::RefCell::new(None) };
}

bounded_read(&mut decoder, capacity)
TLS_DECODER.with(|cell| {
let mut state = cell.borrow_mut();

// Re-initialise if this is the first call in this thread or if
// the dictionary has changed (different dict_id → different table).
if !matches!(&*state, Some((id, _)) if *id == dict.id()) {
let parsed = Dictionary::decode_dict(dict.raw())
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))?;
let mut decoder = FrameDecoder::new();
decoder
.add_dict(parsed)
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))?;
*state = Some((dict.id(), decoder));
Comment thread
polaz marked this conversation as resolved.
Outdated
}

let Some((_, decoder)) = state.as_mut() else {
// Unreachable: the branch above always initialises `state`.
return Err(crate::Error::Io(std::io::Error::other(
"TLS_DECODER unexpectedly empty after initialisation",
)));
};

// `decode_all_to_vec` calls `init(&mut input)` internally — the
// mutable borrow advances the slice cursor past the frame header
// before block decoding begins, which is the correct usage of
// `FrameDecoder`. The dictionary stored in `decoder.dicts` is
// reused on every call without re-parsing.
let mut output = Vec::with_capacity(capacity);
decoder
.decode_all_to_vec(data, &mut output)
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))?;

Ok(output)
Comment thread
polaz marked this conversation as resolved.
Outdated
})
}
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, reason = "test code")]
Comment thread
polaz marked this conversation as resolved.
Outdated
mod tests {
Comment thread
polaz marked this conversation as resolved.
use super::*;
use crate::compression::ZstdDictionary;
use test_log::test;

// Pre-generated test vectors for pure-Rust dict decompression.
//
// Generated with the `zstd` C library (crate v0.13, `zdict_builder` feature):
// - Training corpus: 100 samples × 32 bytes (cycling pattern 0..4)
// - Plaintext: b"hello world hello world hello world"
//
// Reproducible with:
// zstd::dict::from_continuous(&training_data, &sizes, 1024)
// zstd::bulk::Compressor::with_dictionary(3, &dict).compress(plaintext)
const DICT: &[u8] = &[
55, 164, 48, 236, 98, 64, 12, 7, 42, 16, 120, 62, 7, 204, 192, 51, 240, 12, 60, 3, 207,
192, 51, 240, 12, 60, 3, 207, 192, 51, 24, 17, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
128, 48, 165, 148, 2, 227, 76, 8, 33, 132, 16, 66, 136, 136, 136, 60, 84, 160, 64, 65, 65,
65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65,
193, 231, 162, 40, 138, 162, 40, 138, 162, 40, 165, 148, 82, 74, 169, 170, 234, 1, 100,
160, 170, 193, 96, 48, 24, 12, 6, 131, 193, 96, 48, 12, 195, 48, 12, 195, 48, 12, 195, 48,
198, 24, 99, 140, 153, 29, 1, 0, 0, 0, 4, 0, 0, 0, 8, 0, 0, 0, 3, 3, 3, 3, 3, 3, 3, 3, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2,
];

const COMPRESSED: &[u8] = &[
40, 181, 47, 253, 35, 98, 64, 12, 7, 35, 149, 0, 0, 96, 104, 101, 108, 108, 111, 32, 119,
111, 114, 108, 100, 32, 1, 0, 175, 75, 18,
];

const PLAINTEXT: &[u8] = b"hello world hello world hello world";

#[test]
fn decompress_with_dict_returns_correct_plaintext() {
let dict = ZstdDictionary::new(DICT);
let result = ZstdPureProvider::decompress_with_dict(COMPRESSED, &dict, PLAINTEXT.len() + 1)
.expect("decompression should succeed");
assert_eq!(
result, PLAINTEXT,
"decompressed output must equal the original plaintext"
);
}

#[test]
fn decompress_with_dict_is_idempotent_across_repeated_calls() {
let dict = ZstdDictionary::new(DICT);
// Call three times to exercise the TLS caching path (second and third
// calls must reuse the cached FrameDecoder without re-parsing the dict).
for _ in 0..3 {
let result =
ZstdPureProvider::decompress_with_dict(COMPRESSED, &dict, PLAINTEXT.len() + 1)
.expect("decompression should succeed on every call");
assert_eq!(result, PLAINTEXT);
}
}
}
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ pub mod checksum;
pub mod coding;

pub mod compaction;
mod compression;
#[doc(hidden)]
pub mod compression;

/// Block-level encryption at rest.
pub mod encryption;
Expand Down
8 changes: 4 additions & 4 deletions src/table/block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ impl Block {

let decompressed = crate::compression::ZstdBackend::decompress_with_dict(
&decrypted,
dict.raw(),
dict,
header.uncompressed_length as usize,
)
.map_err(|_| crate::Error::Decompress(compression))?;
Expand Down Expand Up @@ -420,7 +420,7 @@ impl Block {

let decompressed = crate::compression::ZstdBackend::decompress_with_dict(
&raw_data,
dict.raw(),
dict,
header.uncompressed_length as usize,
)
.map_err(|_| crate::Error::Decompress(compression))?;
Expand Down Expand Up @@ -590,7 +590,7 @@ impl Block {

let decompressed = crate::compression::ZstdBackend::decompress_with_dict(
&decrypted,
dict.raw(),
dict,
parsed_header.uncompressed_length as usize,
)
.map_err(|_| crate::Error::Decompress(compression))?;
Expand Down Expand Up @@ -705,7 +705,7 @@ impl Block {

let decompressed = crate::compression::ZstdBackend::decompress_with_dict(
compressed_data,
dict.raw(),
dict,
parsed_header.uncompressed_length as usize,
)
.map_err(|_| crate::Error::Decompress(compression))?;
Expand Down
Loading