Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,9 @@ name = "batch_ops"
harness = false
path = "benches/batch_ops.rs"
required-features = []

[[bench]]
name = "zstd_dict"
harness = false
path = "benches/zstd_dict.rs"
required-features = []
112 changes: 112 additions & 0 deletions benches/zstd_dict.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) 2025-present, Structured World Foundation
// This source code is licensed under the Apache 2.0 License
// (found in the LICENSE-APACHE file in the repository)

//! Benchmark: per-block zstd dictionary decompression latency.
//!
//! Measures the cost of `decompress_with_dict` for a single compressed block,
//! both cold (first call, dictionary not yet cached in the backend's TLS /
//! `OnceLock`) and warm (subsequent calls, dictionary already cached).
//!
//! Run with:
//!
//! ```text
//! cargo bench --bench zstd_dict --features zstd # C FFI backend
//! cargo bench --bench zstd_dict --features zstd-pure # pure Rust backend
//! ```

use criterion::{Criterion, criterion_group, criterion_main};

#[cfg(zstd_any)]
use criterion::BatchSize;
#[cfg(zstd_any)]
use lsm_tree::compression::ZstdDictionary;

// --- test fixtures (only needed when a zstd backend is enabled) ----------

#[cfg(zstd_any)]
/// Pre-trained zstd dictionary (206 bytes).
///
/// Generated with the `zstd` C library from 100 samples × 32 bytes
/// (cycling pattern 0..4).
const DICT: &[u8] = &[
55, 164, 48, 236, 98, 64, 12, 7, 42, 16, 120, 62, 7, 204, 192, 51, 240, 12, 60, 3, 207, 192,
51, 240, 12, 60, 3, 207, 192, 51, 24, 17, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 128, 48,
165, 148, 2, 227, 76, 8, 33, 132, 16, 66, 136, 136, 136, 60, 84, 160, 64, 65, 65, 65, 65, 65,
65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 193, 231, 162,
40, 138, 162, 40, 138, 162, 40, 165, 148, 82, 74, 169, 170, 234, 1, 100, 160, 170, 193, 96, 48,
24, 12, 6, 131, 193, 96, 48, 12, 195, 48, 12, 195, 48, 12, 195, 48, 198, 24, 99, 140, 153, 29,
1, 0, 0, 0, 4, 0, 0, 0, 8, 0, 0, 0, 3, 3, 3, 3, 3, 3, 3, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2,
2, 2, 2, 2, 2, 2,
];

#[cfg(zstd_any)]
/// Compressed frame for b"hello world hello world hello world" (35 bytes).
const COMPRESSED: &[u8] = &[
40, 181, 47, 253, 35, 98, 64, 12, 7, 35, 149, 0, 0, 96, 104, 101, 108, 108, 111, 32, 119, 111,
114, 108, 100, 32, 1, 0, 175, 75, 18,
];

#[cfg(zstd_any)]
const PLAINTEXT_LEN: usize = 35;

// -------------------------------------------------------------------------

#[cfg(zstd_any)]
fn bench_decompress_with_dict(c: &mut Criterion) {
use lsm_tree::compression::{CompressionProvider, ZstdBackend as Backend};

// Warm benchmark: cache is populated before timing starts.
// Represents steady-state per-block decompression cost.
let warm_dict = ZstdDictionary::new(DICT);
Backend::decompress_with_dict(COMPRESSED, &warm_dict, PLAINTEXT_LEN + 1)
.expect("pre-warm decompression failed");

c.bench_function("decompress_with_dict/warm", |b| {
b.iter(|| {
Backend::decompress_with_dict(
std::hint::black_box(COMPRESSED),
std::hint::black_box(&warm_dict),
PLAINTEXT_LEN + 1,
)
.expect("decompression failed")
});
});

// "Cold" benchmark: each iteration gets a fresh `ZstdDictionary` handle
// (new `OnceLock` for the C FFI backend, same dict bytes for both).
//
// For the C FFI backend this truly measures first-call cost: a fresh
// `ZstdDictionary` has an unpopulated `OnceLock`, so `ZSTD_createDDict`
// is invoked on the first decompression and cached in the handle.
//
// For the pure Rust backend the result is different: the TLS decoder is
// keyed by the 64-bit content hash. All iterations share the same DICT
// bytes and therefore the same hash, so after the first iteration the TLS
// entry is still live — subsequent iterations measure the TLS-hit path, not
// dict parsing. True "cold" cost for the pure Rust backend is therefore
// only observable on the very first iteration of the first benchmark run.
c.bench_function("decompress_with_dict/cold", |b| {
b.iter_batched(
|| ZstdDictionary::new(DICT),
|d| {
Backend::decompress_with_dict(
std::hint::black_box(COMPRESSED),
std::hint::black_box(&d),
PLAINTEXT_LEN + 1,
)
.expect("decompression failed")
},
BatchSize::SmallInput,
);
});
}

#[cfg(not(zstd_any))]
fn bench_decompress_with_dict(_c: &mut Criterion) {
// Neither zstd nor zstd-pure feature enabled — nothing to bench.
}

criterion_group!(benches, bench_decompress_with_dict);
criterion_main!(benches);
92 changes: 79 additions & 13 deletions src/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@ pub trait CompressionProvider {
fn compress_with_dict(data: &[u8], level: i32, dict_raw: &[u8]) -> crate::Result<Vec<u8>>;

/// Decompress a zstd frame that was compressed with a dictionary.
///
/// `dict` exposes the raw dictionary bytes **and** a lazily-initialized
/// pre-compiled form (C FFI backend: `ZSTD_DDict`; pure Rust backend:
/// cached `FrameDecoder` in thread-local storage). Backends must use the
/// prepared form to avoid re-parsing the dictionary on every call.
fn decompress_with_dict(
data: &[u8],
dict_raw: &[u8],
dict: &ZstdDictionary,
capacity: usize,
) -> crate::Result<Vec<u8>>;
}
Expand Down Expand Up @@ -76,10 +81,36 @@ pub type ZstdBackend = zstd_pure::ZstdPureProvider;
/// let dict = ZstdDictionary::new(samples);
/// ```
#[cfg(zstd_any)]
#[derive(Clone)]
pub struct ZstdDictionary {
id: u32,
/// Full 64-bit xxh3 hash used as the collision-resistant cache key for the
/// thread-local `FrameDecoder` in the pure Rust backend. The public
/// `id() -> u32` method returns the lower 32 bits for external consumers.
id: u64,
raw: Arc<[u8]>,

/// Pre-compiled decompressor dictionary, lazily initialized on first use.
///
/// Wrapped in `Arc<OnceLock<…>>` so all clones share the same compiled
/// instance — `ZSTD_DDict` is created exactly once per unique dictionary,
/// regardless of how many table readers hold a reference to it.
Comment thread
polaz marked this conversation as resolved.
Outdated
///
/// Available only with the C FFI backend (`zstd` feature). The pure Rust
/// backend caches an equivalent `FrameDecoder` in thread-local storage
/// inside `decompress_with_dict` instead.
#[cfg(feature = "zstd")]
prepared: Arc<std::sync::OnceLock<zstd::dict::DecoderDictionary<'static>>>,
}

#[cfg(zstd_any)]
impl Clone for ZstdDictionary {
fn clone(&self) -> Self {
Self {
id: self.id,
raw: Arc::clone(&self.raw),
#[cfg(feature = "zstd")]
prepared: Arc::clone(&self.prepared),
}
}
}

#[cfg(zstd_any)]
Expand All @@ -94,12 +125,47 @@ impl ZstdDictionary {
Self {
id: compute_dict_id(raw),
raw: Arc::from(raw),
#[cfg(feature = "zstd")]
prepared: Arc::new(std::sync::OnceLock::new()),
}
Comment thread
polaz marked this conversation as resolved.
}

/// Returns the dictionary ID (truncated xxh3 hash of the raw bytes).
/// Returns the lazily-initialized pre-compiled decompressor dictionary.
///
/// On first call this copies the raw bytes into a `ZSTD_DDict` (C
/// library's opaque pre-parsed form) and caches the result inside this
/// `ZstdDictionary`. Subsequent calls — from any thread — return the
/// cached reference with no further allocation or parsing.
///
/// Using this together with
/// [`zstd::bulk::Decompressor::with_prepared_dictionary`] eliminates the
/// per-block `ZSTD_createDDict` call that was previously paid on every
/// `decompress_with_dict` invocation.
#[cfg(feature = "zstd")]
pub(crate) fn decoder_dict(&self) -> &zstd::dict::DecoderDictionary<'static> {
self.prepared
.get_or_init(|| zstd::dict::DecoderDictionary::copy(&self.raw))
}

/// Returns a 32-bit dictionary fingerprint (lower 32 bits of xxh3).
///
/// Intended for display and external interop (e.g., matching against the
/// dict ID embedded in a zstd frame header). For internal cache keying
/// use [`id64`](ZstdDictionary::id64) to avoid hash collisions.
#[must_use]
#[expect(
clippy::cast_possible_truncation,
reason = "intentional: public API returns 32-bit fingerprint"
)]
pub fn id(&self) -> u32 {
self.id as u32
}

/// Returns the full 64-bit xxh3 fingerprint used as a collision-resistant
/// cache key inside the pure Rust backend's TLS decoder.
#[cfg(all(feature = "zstd-pure", not(feature = "zstd")))]
#[must_use]
pub(crate) fn id64(&self) -> u64 {
self.id
}

Expand All @@ -114,20 +180,20 @@ impl ZstdDictionary {
impl std::fmt::Debug for ZstdDictionary {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ZstdDictionary")
.field("id", &format_args!("{:#010x}", self.id))
.field("id", &format_args!("{:#018x}", self.id))
.field("size", &self.raw.len())
.finish()
.finish_non_exhaustive() // `prepared` cache omitted — implementation detail
}
}

/// Compute a 32-bit dictionary ID from raw bytes via truncated xxh3.
/// Compute the full 64-bit xxh3 dictionary fingerprint.
///
/// The full 64-bit value is used as the collision-resistant cache key inside
/// the pure Rust backend's thread-local `FrameDecoder`. The public `id()`
/// method returns only the lower 32 bits for backward-compatible display.
#[cfg(zstd_any)]
#[expect(
clippy::cast_possible_truncation,
reason = "intentionally truncated to 32-bit fingerprint"
)]
fn compute_dict_id(raw: &[u8]) -> u32 {
xxhash_rust::xxh3::xxh3_64(raw) as u32
fn compute_dict_id(raw: &[u8]) -> u64 {
xxhash_rust::xxh3::xxh3_64(raw)
}

/// Compression algorithm to use
Expand Down
9 changes: 6 additions & 3 deletions src/compression/zstd_ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ impl CompressionProvider for ZstdFfiProvider {

fn decompress_with_dict(
data: &[u8],
dict_raw: &[u8],
dict: &crate::compression::ZstdDictionary,
capacity: usize,
) -> crate::Result<Vec<u8>> {
let mut decompressor = zstd::bulk::Decompressor::with_dictionary(dict_raw)
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))?;
// `dict.decoder_dict()` returns a cached `ZSTD_DDict`, avoiding
// `ZSTD_createDDict` (which re-parses the raw bytes) on every call.
let mut decompressor =
zstd::bulk::Decompressor::with_prepared_dictionary(dict.decoder_dict())
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))?;
decompressor
.decompress(data, capacity)
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))
Expand Down
Loading
Loading