Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,9 @@ name = "batch_ops"
harness = false
path = "benches/batch_ops.rs"
required-features = []

[[bench]]
name = "zstd_dict"
harness = false
path = "benches/zstd_dict.rs"
required-features = []
112 changes: 112 additions & 0 deletions benches/zstd_dict.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) 2025-present, Structured World Foundation
// This source code is licensed under the Apache 2.0 License
// (found in the LICENSE-APACHE file in the repository)

//! Benchmark: per-block zstd dictionary decompression latency.
//!
//! Measures the cost of `decompress_with_dict` for a single compressed block,
//! both cold (first call, dictionary not yet cached in the backend's TLS /
//! `OnceLock`) and warm (subsequent calls, dictionary already cached).
//!
//! Run with:
//!
//! ```text
//! cargo bench --bench zstd_dict --features zstd # C FFI backend
//! cargo bench --bench zstd_dict --features zstd-pure # pure Rust backend
//! ```

use criterion::{Criterion, criterion_group, criterion_main};

#[cfg(zstd_any)]
use criterion::BatchSize;
#[cfg(zstd_any)]
use lsm_tree::compression::ZstdDictionary;

// --- test fixtures (only needed when a zstd backend is enabled) ----------

#[cfg(zstd_any)]
/// Pre-trained zstd dictionary (206 bytes).
///
/// Generated with the `zstd` C library from 100 samples × 32 bytes
/// (cycling pattern 0..4).
const DICT: &[u8] = &[
55, 164, 48, 236, 98, 64, 12, 7, 42, 16, 120, 62, 7, 204, 192, 51, 240, 12, 60, 3, 207, 192,
51, 240, 12, 60, 3, 207, 192, 51, 24, 17, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 128, 48,
165, 148, 2, 227, 76, 8, 33, 132, 16, 66, 136, 136, 136, 60, 84, 160, 64, 65, 65, 65, 65, 65,
65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 193, 231, 162,
40, 138, 162, 40, 138, 162, 40, 165, 148, 82, 74, 169, 170, 234, 1, 100, 160, 170, 193, 96, 48,
24, 12, 6, 131, 193, 96, 48, 12, 195, 48, 12, 195, 48, 12, 195, 48, 198, 24, 99, 140, 153, 29,
1, 0, 0, 0, 4, 0, 0, 0, 8, 0, 0, 0, 3, 3, 3, 3, 3, 3, 3, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1,
1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2,
2, 2, 2, 2, 2, 2,
];

#[cfg(zstd_any)]
/// Compressed frame for b"hello world hello world hello world" (35 bytes).
const COMPRESSED: &[u8] = &[
40, 181, 47, 253, 35, 98, 64, 12, 7, 35, 149, 0, 0, 96, 104, 101, 108, 108, 111, 32, 119, 111,
114, 108, 100, 32, 1, 0, 175, 75, 18,
];

#[cfg(zstd_any)]
const PLAINTEXT_LEN: usize = 35;

// -------------------------------------------------------------------------

#[cfg(zstd_any)]
fn bench_decompress_with_dict(c: &mut Criterion) {
use lsm_tree::compression::{CompressionProvider, ZstdBackend as Backend};

// Warm benchmark: cache is populated before timing starts.
// Represents steady-state per-block decompression cost.
let warm_dict = ZstdDictionary::new(DICT);
Backend::decompress_with_dict(COMPRESSED, &warm_dict, PLAINTEXT_LEN + 1)
.expect("pre-warm decompression failed");

c.bench_function("decompress_with_dict/warm", |b| {
b.iter(|| {
Backend::decompress_with_dict(
std::hint::black_box(COMPRESSED),
std::hint::black_box(&warm_dict),
PLAINTEXT_LEN + 1,
)
.expect("decompression failed")
});
});

// "Cold" benchmark: each iteration gets a fresh `ZstdDictionary` handle
// (new `OnceLock` for the C FFI backend, same dict bytes for both).
//
// For the C FFI backend this truly measures first-call cost: a fresh
// `ZstdDictionary` has an unpopulated `OnceLock`, so `ZSTD_createDDict`
// is invoked on the first decompression and cached in the handle.
//
// For the pure Rust backend the result is different: the TLS decoder is
// keyed by the 64-bit content hash. All iterations share the same DICT
// bytes and therefore the same hash, so after the first iteration the TLS
// entry is still live — subsequent iterations measure the TLS-hit path, not
// dict parsing. True "cold" cost for the pure Rust backend is therefore
// only observable on the very first iteration of the first benchmark run.
c.bench_function("decompress_with_dict/cold", |b| {
b.iter_batched(
|| ZstdDictionary::new(DICT),
|d| {
Backend::decompress_with_dict(
std::hint::black_box(COMPRESSED),
std::hint::black_box(&d),
PLAINTEXT_LEN + 1,
)
.expect("decompression failed")
},
BatchSize::SmallInput,
);
});
}

#[cfg(not(zstd_any))]
fn bench_decompress_with_dict(_c: &mut Criterion) {
// Neither zstd nor zstd-pure feature enabled — nothing to bench.
}

criterion_group!(benches, bench_decompress_with_dict);
criterion_main!(benches);
96 changes: 82 additions & 14 deletions src/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,14 @@ pub trait CompressionProvider {
fn compress_with_dict(data: &[u8], level: i32, dict_raw: &[u8]) -> crate::Result<Vec<u8>>;

/// Decompress a zstd frame that was compressed with a dictionary.
///
/// `dict` exposes the raw dictionary bytes **and** a lazily-initialized
/// pre-compiled form (C FFI backend: `ZSTD_DDict`; pure Rust backend:
/// cached `FrameDecoder` in thread-local storage). Backends must use the
/// prepared form to avoid re-parsing the dictionary on every call.
fn decompress_with_dict(
data: &[u8],
dict_raw: &[u8],
dict: &ZstdDictionary,
capacity: usize,
) -> crate::Result<Vec<u8>>;
}
Expand Down Expand Up @@ -76,10 +81,37 @@ pub type ZstdBackend = zstd_pure::ZstdPureProvider;
/// let dict = ZstdDictionary::new(samples);
/// ```
#[cfg(zstd_any)]
#[derive(Clone)]
pub struct ZstdDictionary {
id: u32,
/// Full 64-bit xxh3 hash used as the collision-resistant cache key for the
/// thread-local `FrameDecoder` in the pure Rust backend. The public
/// `id() -> u32` method returns the lower 32 bits for external consumers.
id: u64,
raw: Arc<[u8]>,

/// Pre-compiled decompressor dictionary, lazily initialized on first use.
///
/// Wrapped in `Arc<OnceLock<…>>` so all clones of the same
/// `ZstdDictionary` share one compiled instance. With the C FFI backend,
/// `ZSTD_DDict` is therefore created at most once per dictionary handle,
/// regardless of how many table readers hold a clone of that handle.
///
/// Available only with the C FFI backend (`zstd` feature). The pure Rust
/// backend caches an equivalent `FrameDecoder` in thread-local storage
/// inside `decompress_with_dict` instead.
#[cfg(feature = "zstd")]
prepared: Arc<std::sync::OnceLock<zstd::dict::DecoderDictionary<'static>>>,
}

#[cfg(zstd_any)]
impl Clone for ZstdDictionary {
fn clone(&self) -> Self {
Self {
id: self.id,
raw: Arc::clone(&self.raw),
#[cfg(feature = "zstd")]
prepared: Arc::clone(&self.prepared),
}
}
}

#[cfg(zstd_any)]
Expand All @@ -88,18 +120,54 @@ impl ZstdDictionary {
///
/// The raw bytes should be a pre-trained zstd dictionary (e.g., output
/// of `zstd::dict::from_continuous` or `zstd --train`). The dictionary
/// ID is computed as a truncated xxh3 hash of the content.
/// ID is stored as a full 64-bit xxh3 hash; the public [`ZstdDictionary::id`]
/// method returns the lower 32 bits for external consumers.
#[must_use]
pub fn new(raw: &[u8]) -> Self {
Self {
id: compute_dict_id(raw),
raw: Arc::from(raw),
#[cfg(feature = "zstd")]
prepared: Arc::new(std::sync::OnceLock::new()),
}
Comment thread
polaz marked this conversation as resolved.
}

/// Returns the dictionary ID (truncated xxh3 hash of the raw bytes).
/// Returns the lazily-initialized pre-compiled decompressor dictionary.
///
/// On first call this copies the raw bytes into a `ZSTD_DDict` (C
/// library's opaque pre-parsed form) and caches the result inside this
/// `ZstdDictionary`. Subsequent calls — from any thread — return the
/// cached reference with no further allocation or parsing.
///
/// Using this together with
/// [`zstd::bulk::Decompressor::with_prepared_dictionary`] eliminates the
/// per-block `ZSTD_createDDict` call that was previously paid on every
/// `decompress_with_dict` invocation.
#[cfg(feature = "zstd")]
pub(crate) fn decoder_dict(&self) -> &zstd::dict::DecoderDictionary<'static> {
self.prepared
.get_or_init(|| zstd::dict::DecoderDictionary::copy(&self.raw))
}

/// Returns a 32-bit dictionary fingerprint (lower 32 bits of xxh3).
///
/// Intended for display and external interop (e.g., matching against the
/// dict ID embedded in a zstd frame header). For internal cache keying
/// use [`id64`](ZstdDictionary::id64) to avoid hash collisions.
#[must_use]
#[expect(
clippy::cast_possible_truncation,
reason = "intentional: public API returns 32-bit fingerprint"
)]
pub fn id(&self) -> u32 {
self.id as u32
}

/// Returns the full 64-bit xxh3 fingerprint used as a collision-resistant
/// cache key inside the pure Rust backend's TLS decoder.
#[cfg(all(feature = "zstd-pure", not(feature = "zstd")))]
#[must_use]
pub(crate) fn id64(&self) -> u64 {
self.id
}

Expand All @@ -114,20 +182,20 @@ impl ZstdDictionary {
impl std::fmt::Debug for ZstdDictionary {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ZstdDictionary")
.field("id", &format_args!("{:#010x}", self.id))
.field("id", &format_args!("{:#018x}", self.id))
.field("size", &self.raw.len())
.finish()
.finish_non_exhaustive() // `prepared` cache omitted — implementation detail
}
}

/// Compute a 32-bit dictionary ID from raw bytes via truncated xxh3.
/// Compute the full 64-bit xxh3 dictionary fingerprint.
///
/// The full 64-bit value is used as the collision-resistant cache key inside
/// the pure Rust backend's thread-local `FrameDecoder`. The public `id()`
/// method returns only the lower 32 bits for backward-compatible display.
#[cfg(zstd_any)]
#[expect(
clippy::cast_possible_truncation,
reason = "intentionally truncated to 32-bit fingerprint"
)]
fn compute_dict_id(raw: &[u8]) -> u32 {
xxhash_rust::xxh3::xxh3_64(raw) as u32
fn compute_dict_id(raw: &[u8]) -> u64 {
xxhash_rust::xxh3::xxh3_64(raw)
}

/// Compression algorithm to use
Expand Down
9 changes: 6 additions & 3 deletions src/compression/zstd_ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ impl CompressionProvider for ZstdFfiProvider {

fn decompress_with_dict(
data: &[u8],
dict_raw: &[u8],
dict: &crate::compression::ZstdDictionary,
capacity: usize,
) -> crate::Result<Vec<u8>> {
let mut decompressor = zstd::bulk::Decompressor::with_dictionary(dict_raw)
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))?;
// `dict.decoder_dict()` returns a cached `ZSTD_DDict`, avoiding
// `ZSTD_createDDict` (which re-parses the raw bytes) on every call.
let mut decompressor =
zstd::bulk::Decompressor::with_prepared_dictionary(dict.decoder_dict())
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))?;
decompressor
.decompress(data, capacity)
.map_err(|e| crate::Error::Io(std::io::Error::other(e)))
Expand Down
Loading
Loading