From 4ee898160b420be9ee65d66beac20ff30d86bef5 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Tue, 7 Apr 2026 00:47:17 +0300 Subject: [PATCH 1/4] perf(compression): cache pre-compiled Dictionary across block decompress calls MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - C FFI backend: cache `DecoderDictionary<'static>` (ZSTD_DDict) in `ZstdDictionary` via `Arc>` — parsed once per process, shared across all clones of the same dictionary handle - Pure Rust backend: cache `FrameDecoder` with dictionary pre-loaded in thread-local storage keyed by dict ID — parsed once per thread - Fix latent correctness bug in pure Rust `decompress_with_dict`: was calling `init(data)` on a Copy slice (reads frame header only, output buffer stays empty); replace with `decode_all_to_vec` which takes `&mut input` and fully decodes the frame - Change `CompressionProvider::decompress_with_dict` signature from `dict_raw: &[u8]` to `dict: &ZstdDictionary` to give backends access to the cached prepared form; update all four call sites in block/mod.rs - Add `ZstdDictionary::decoder_dict()` — lazily initialises ZSTD_DDict via `OnceLock::get_or_init` (C FFI only) - Add unit tests for pure Rust backend with pre-generated test vectors (decompress + idempotent repeated calls exercising TLS cache path) - Add `benches/zstd_dict.rs` with warm / cold per-block latency benchmarks - Expose `#[doc(hidden)] pub mod compression` so benchmarks can reach `CompressionProvider` and `ZstdBackend` type alias Closes #217 --- Cargo.toml | 6 ++ benches/zstd_dict.rs | 94 +++++++++++++++++++++++++ src/compression/mod.rs | 53 ++++++++++++++- src/compression/zstd_ffi.rs | 9 ++- src/compression/zstd_pure.rs | 128 ++++++++++++++++++++++++++++++----- src/lib.rs | 3 +- src/table/block/mod.rs | 8 +-- 7 files changed, 274 insertions(+), 27 deletions(-) create mode 100644 benches/zstd_dict.rs diff --git a/Cargo.toml b/Cargo.toml index 7369a9c3c..0dbb55a73 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -96,3 +96,9 @@ name = "batch_ops" harness = false path = "benches/batch_ops.rs" required-features = [] + +[[bench]] +name = "zstd_dict" +harness = false +path = "benches/zstd_dict.rs" +required-features = [] diff --git a/benches/zstd_dict.rs b/benches/zstd_dict.rs new file mode 100644 index 000000000..6610045e4 --- /dev/null +++ b/benches/zstd_dict.rs @@ -0,0 +1,94 @@ +// Copyright (c) 2025-present, Structured World Foundation +// This source code is licensed under the Apache 2.0 License +// (found in the LICENSE-APACHE file in the repository) + +//! Benchmark: per-block zstd dictionary decompression latency. +//! +//! Measures the cost of `decompress_with_dict` for a single compressed block, +//! both cold (first call, dictionary not yet cached in the backend's TLS / +//! `OnceLock`) and warm (subsequent calls, dictionary already cached). +//! +//! Run with: +//! +//! ```text +//! cargo bench --bench zstd_dict --features zstd # C FFI backend +//! cargo bench --bench zstd_dict --features zstd-pure # pure Rust backend +//! ``` + +use criterion::{BatchSize, Criterion, criterion_group, criterion_main}; +use lsm_tree::compression::ZstdDictionary; + +// --- test fixtures ------------------------------------------------------- + +/// Pre-trained zstd dictionary (206 bytes). +/// +/// Generated with the `zstd` C library from 100 samples × 32 bytes +/// (cycling pattern 0..4). +const DICT: &[u8] = &[ + 55, 164, 48, 236, 98, 64, 12, 7, 42, 16, 120, 62, 7, 204, 192, 51, 240, 12, 60, 3, 207, 192, + 51, 240, 12, 60, 3, 207, 192, 51, 24, 17, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 128, 48, + 165, 148, 2, 227, 76, 8, 33, 132, 16, 66, 136, 136, 136, 60, 84, 160, 64, 65, 65, 65, 65, 65, + 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 193, 231, 162, + 40, 138, 162, 40, 138, 162, 40, 165, 148, 82, 74, 169, 170, 234, 1, 100, 160, 170, 193, 96, 48, + 24, 12, 6, 131, 193, 96, 48, 12, 195, 48, 12, 195, 48, 12, 195, 48, 198, 24, 99, 140, 153, 29, + 1, 0, 0, 0, 4, 0, 0, 0, 8, 0, 0, 0, 3, 3, 3, 3, 3, 3, 3, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, + 2, 2, 2, 2, 2, 2, +]; + +/// Compressed frame for b"hello world hello world hello world" (35 bytes). +const COMPRESSED: &[u8] = &[ + 40, 181, 47, 253, 35, 98, 64, 12, 7, 35, 149, 0, 0, 96, 104, 101, 108, 108, 111, 32, 119, 111, + 114, 108, 100, 32, 1, 0, 175, 75, 18, +]; + +const PLAINTEXT_LEN: usize = 35; + +// ------------------------------------------------------------------------- + +#[cfg(zstd_any)] +fn bench_decompress_with_dict(c: &mut Criterion) { + use lsm_tree::compression::{CompressionProvider, ZstdBackend as Backend}; + + // Warm benchmark: cache is populated before timing starts. + // Represents steady-state per-block decompression cost. + let warm_dict = ZstdDictionary::new(DICT); + Backend::decompress_with_dict(COMPRESSED, &warm_dict, PLAINTEXT_LEN + 1) + .expect("pre-warm decompression failed"); + + c.bench_function("decompress_with_dict/warm", |b| { + b.iter(|| { + Backend::decompress_with_dict( + std::hint::black_box(COMPRESSED), + std::hint::black_box(&warm_dict), + PLAINTEXT_LEN + 1, + ) + .expect("decompression failed") + }); + }); + + // Cold benchmark: fresh `ZstdDictionary` on each iteration so the cache + // is unpopulated. Measures first-call cost (dict parse + decompress). + c.bench_function("decompress_with_dict/cold", |b| { + b.iter_batched( + || ZstdDictionary::new(DICT), + |d| { + Backend::decompress_with_dict( + std::hint::black_box(COMPRESSED), + std::hint::black_box(&d), + PLAINTEXT_LEN + 1, + ) + .expect("decompression failed") + }, + BatchSize::SmallInput, + ); + }); +} + +#[cfg(not(zstd_any))] +fn bench_decompress_with_dict(_c: &mut Criterion) { + // Neither zstd nor zstd-pure feature enabled — nothing to bench. +} + +criterion_group!(benches, bench_decompress_with_dict); +criterion_main!(benches); diff --git a/src/compression/mod.rs b/src/compression/mod.rs index b901151cc..65249d050 100644 --- a/src/compression/mod.rs +++ b/src/compression/mod.rs @@ -39,9 +39,14 @@ pub trait CompressionProvider { fn compress_with_dict(data: &[u8], level: i32, dict_raw: &[u8]) -> crate::Result>; /// Decompress a zstd frame that was compressed with a dictionary. + /// + /// `dict` exposes the raw dictionary bytes **and** a lazily-initialized + /// pre-compiled form (C FFI backend: `ZSTD_DDict`; pure Rust backend: + /// cached `FrameDecoder` in thread-local storage). Backends must use the + /// prepared form to avoid re-parsing the dictionary on every call. fn decompress_with_dict( data: &[u8], - dict_raw: &[u8], + dict: &ZstdDictionary, capacity: usize, ) -> crate::Result>; } @@ -76,10 +81,33 @@ pub type ZstdBackend = zstd_pure::ZstdPureProvider; /// let dict = ZstdDictionary::new(samples); /// ``` #[cfg(zstd_any)] -#[derive(Clone)] pub struct ZstdDictionary { id: u32, raw: Arc<[u8]>, + + /// Pre-compiled decompressor dictionary, lazily initialized on first use. + /// + /// Wrapped in `Arc>` so all clones share the same compiled + /// instance — `ZSTD_DDict` is created exactly once per unique dictionary, + /// regardless of how many table readers hold a reference to it. + /// + /// Available only with the C FFI backend (`zstd` feature). The pure Rust + /// backend caches an equivalent `FrameDecoder` in thread-local storage + /// inside `decompress_with_dict` instead. + #[cfg(feature = "zstd")] + prepared: Arc>>, +} + +#[cfg(zstd_any)] +impl Clone for ZstdDictionary { + fn clone(&self) -> Self { + Self { + id: self.id, + raw: Arc::clone(&self.raw), + #[cfg(feature = "zstd")] + prepared: Arc::clone(&self.prepared), + } + } } #[cfg(zstd_any)] @@ -94,9 +122,28 @@ impl ZstdDictionary { Self { id: compute_dict_id(raw), raw: Arc::from(raw), + #[cfg(feature = "zstd")] + prepared: Arc::new(std::sync::OnceLock::new()), } } + /// Returns the lazily-initialized pre-compiled decompressor dictionary. + /// + /// On first call this copies the raw bytes into a `ZSTD_DDict` (C + /// library's opaque pre-parsed form) and caches the result inside this + /// `ZstdDictionary`. Subsequent calls — from any thread — return the + /// cached reference with no further allocation or parsing. + /// + /// Using this together with + /// [`zstd::bulk::Decompressor::with_prepared_dictionary`] eliminates the + /// per-block `ZSTD_createDDict` call that was previously paid on every + /// `decompress_with_dict` invocation. + #[cfg(feature = "zstd")] + pub(crate) fn decoder_dict(&self) -> &zstd::dict::DecoderDictionary<'static> { + self.prepared + .get_or_init(|| zstd::dict::DecoderDictionary::copy(&self.raw)) + } + /// Returns the dictionary ID (truncated xxh3 hash of the raw bytes). #[must_use] pub fn id(&self) -> u32 { @@ -116,7 +163,7 @@ impl std::fmt::Debug for ZstdDictionary { f.debug_struct("ZstdDictionary") .field("id", &format_args!("{:#010x}", self.id)) .field("size", &self.raw.len()) - .finish() + .finish_non_exhaustive() // `prepared` cache omitted — implementation detail } } diff --git a/src/compression/zstd_ffi.rs b/src/compression/zstd_ffi.rs index 13936fabd..3a7116b98 100644 --- a/src/compression/zstd_ffi.rs +++ b/src/compression/zstd_ffi.rs @@ -32,11 +32,14 @@ impl CompressionProvider for ZstdFfiProvider { fn decompress_with_dict( data: &[u8], - dict_raw: &[u8], + dict: &crate::compression::ZstdDictionary, capacity: usize, ) -> crate::Result> { - let mut decompressor = zstd::bulk::Decompressor::with_dictionary(dict_raw) - .map_err(|e| crate::Error::Io(std::io::Error::other(e)))?; + // `dict.decoder_dict()` returns a cached `ZSTD_DDict`, avoiding + // `ZSTD_createDDict` (which re-parses the raw bytes) on every call. + let mut decompressor = + zstd::bulk::Decompressor::with_prepared_dictionary(dict.decoder_dict()) + .map_err(|e| crate::Error::Io(std::io::Error::other(e)))?; decompressor .decompress(data, capacity) .map_err(|e| crate::Error::Io(std::io::Error::other(e))) diff --git a/src/compression/zstd_pure.rs b/src/compression/zstd_pure.rs index b1061a0e7..3eee1d666 100644 --- a/src/compression/zstd_pure.rs +++ b/src/compression/zstd_pure.rs @@ -85,25 +85,121 @@ impl CompressionProvider for ZstdPureProvider { fn decompress_with_dict( data: &[u8], - dict_raw: &[u8], + dict: &crate::compression::ZstdDictionary, capacity: usize, ) -> crate::Result> { - // NOTE: Dictionary is re-parsed from raw bytes on every call. - // The C FFI backend has the same per-call overhead (Decompressor::with_dictionary - // also re-initializes). Caching would require adding precompiled dictionary - // state to the CompressionProvider trait, which is a Phase 2 optimization. - let dict = structured_zstd::decoding::Dictionary::decode_dict(dict_raw) - .map_err(|e| crate::Error::Io(std::io::Error::other(e)))?; + use structured_zstd::decoding::{Dictionary, FrameDecoder}; - // FrameDecoder supports dictionaries (unlike StreamingDecoder). - let mut decoder = structured_zstd::decoding::FrameDecoder::new(); - decoder - .add_dict(dict) - .map_err(|e| crate::Error::Io(std::io::Error::other(e)))?; - decoder - .init(data) - .map_err(|e| crate::Error::Io(std::io::Error::other(e)))?; + // Thread-local `FrameDecoder` with the dictionary pre-loaded. + // + // Parsing a zstd dictionary involves building Huffman and FSE decoding + // tables — expensive relative to per-block decompression for the small + // 4–64 KiB blocks typical in LSM-trees. Caching the `FrameDecoder` + // instance across calls amortises this cost: the dictionary is parsed + // exactly once per thread, per distinct dictionary. + // + // Thread-local storage is appropriate because `FrameDecoder` is not + // `Send` and each thread decompresses independently; no mutex is + // needed. If the active dictionary changes (e.g. different table), + // the decoder is re-initialised transparently. + thread_local! { + static TLS_DECODER: std::cell::RefCell> = + const { std::cell::RefCell::new(None) }; + } - bounded_read(&mut decoder, capacity) + TLS_DECODER.with(|cell| { + let mut state = cell.borrow_mut(); + + // Re-initialise if this is the first call in this thread or if + // the dictionary has changed (different dict_id → different table). + if !matches!(&*state, Some((id, _)) if *id == dict.id()) { + let parsed = Dictionary::decode_dict(dict.raw()) + .map_err(|e| crate::Error::Io(std::io::Error::other(e)))?; + let mut decoder = FrameDecoder::new(); + decoder + .add_dict(parsed) + .map_err(|e| crate::Error::Io(std::io::Error::other(e)))?; + *state = Some((dict.id(), decoder)); + } + + let Some((_, decoder)) = state.as_mut() else { + // Unreachable: the branch above always initialises `state`. + return Err(crate::Error::Io(std::io::Error::other( + "TLS_DECODER unexpectedly empty after initialisation", + ))); + }; + + // `decode_all_to_vec` calls `init(&mut input)` internally — the + // mutable borrow advances the slice cursor past the frame header + // before block decoding begins, which is the correct usage of + // `FrameDecoder`. The dictionary stored in `decoder.dicts` is + // reused on every call without re-parsing. + let mut output = Vec::with_capacity(capacity); + decoder + .decode_all_to_vec(data, &mut output) + .map_err(|e| crate::Error::Io(std::io::Error::other(e)))?; + + Ok(output) + }) + } +} + +#[cfg(test)] +#[allow(clippy::unwrap_used, clippy::expect_used, reason = "test code")] +mod tests { + use super::*; + use crate::compression::ZstdDictionary; + use test_log::test; + + // Pre-generated test vectors for pure-Rust dict decompression. + // + // Generated with the `zstd` C library (crate v0.13, `zdict_builder` feature): + // - Training corpus: 100 samples × 32 bytes (cycling pattern 0..4) + // - Plaintext: b"hello world hello world hello world" + // + // Reproducible with: + // zstd::dict::from_continuous(&training_data, &sizes, 1024) + // zstd::bulk::Compressor::with_dictionary(3, &dict).compress(plaintext) + const DICT: &[u8] = &[ + 55, 164, 48, 236, 98, 64, 12, 7, 42, 16, 120, 62, 7, 204, 192, 51, 240, 12, 60, 3, 207, + 192, 51, 240, 12, 60, 3, 207, 192, 51, 24, 17, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 128, 48, 165, 148, 2, 227, 76, 8, 33, 132, 16, 66, 136, 136, 136, 60, 84, 160, 64, 65, 65, + 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, 65, + 193, 231, 162, 40, 138, 162, 40, 138, 162, 40, 165, 148, 82, 74, 169, 170, 234, 1, 100, + 160, 170, 193, 96, 48, 24, 12, 6, 131, 193, 96, 48, 12, 195, 48, 12, 195, 48, 12, 195, 48, + 198, 24, 99, 140, 153, 29, 1, 0, 0, 0, 4, 0, 0, 0, 8, 0, 0, 0, 3, 3, 3, 3, 3, 3, 3, 3, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, + 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 2, 2, 2, 2, 2, 2, 2, 2, + ]; + + const COMPRESSED: &[u8] = &[ + 40, 181, 47, 253, 35, 98, 64, 12, 7, 35, 149, 0, 0, 96, 104, 101, 108, 108, 111, 32, 119, + 111, 114, 108, 100, 32, 1, 0, 175, 75, 18, + ]; + + const PLAINTEXT: &[u8] = b"hello world hello world hello world"; + + #[test] + fn decompress_with_dict_returns_correct_plaintext() { + let dict = ZstdDictionary::new(DICT); + let result = ZstdPureProvider::decompress_with_dict(COMPRESSED, &dict, PLAINTEXT.len() + 1) + .expect("decompression should succeed"); + assert_eq!( + result, PLAINTEXT, + "decompressed output must equal the original plaintext" + ); + } + + #[test] + fn decompress_with_dict_is_idempotent_across_repeated_calls() { + let dict = ZstdDictionary::new(DICT); + // Call three times to exercise the TLS caching path (second and third + // calls must reuse the cached FrameDecoder without re-parsing the dict). + for _ in 0..3 { + let result = + ZstdPureProvider::decompress_with_dict(COMPRESSED, &dict, PLAINTEXT.len() + 1) + .expect("decompression should succeed on every call"); + assert_eq!(result, PLAINTEXT); + } } } diff --git a/src/lib.rs b/src/lib.rs index 3287b598f..e4df2c5e1 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -86,7 +86,8 @@ pub mod checksum; pub mod coding; pub mod compaction; -mod compression; +#[doc(hidden)] +pub mod compression; /// Block-level encryption at rest. pub mod encryption; diff --git a/src/table/block/mod.rs b/src/table/block/mod.rs index fb9208438..6284c7a51 100644 --- a/src/table/block/mod.rs +++ b/src/table/block/mod.rs @@ -337,7 +337,7 @@ impl Block { let decompressed = crate::compression::ZstdBackend::decompress_with_dict( &decrypted, - dict.raw(), + dict, header.uncompressed_length as usize, ) .map_err(|_| crate::Error::Decompress(compression))?; @@ -420,7 +420,7 @@ impl Block { let decompressed = crate::compression::ZstdBackend::decompress_with_dict( &raw_data, - dict.raw(), + dict, header.uncompressed_length as usize, ) .map_err(|_| crate::Error::Decompress(compression))?; @@ -590,7 +590,7 @@ impl Block { let decompressed = crate::compression::ZstdBackend::decompress_with_dict( &decrypted, - dict.raw(), + dict, parsed_header.uncompressed_length as usize, ) .map_err(|_| crate::Error::Decompress(compression))?; @@ -705,7 +705,7 @@ impl Block { let decompressed = crate::compression::ZstdBackend::decompress_with_dict( compressed_data, - dict.raw(), + dict, parsed_header.uncompressed_length as usize, ) .map_err(|_| crate::Error::Decompress(compression))?; From 7bc51d43b9578e8b88d78e5943a7b406589599dd Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Tue, 7 Apr 2026 02:22:38 +0300 Subject: [PATCH 2/4] fix(compression): use 64-bit TLS cache key and enforce capacity in pure Rust dict decompress - Change TLS decoder cache key from truncated u32 to full u64 xxh3 fingerprint; eliminates cross-dict aliasing when two distinct dictionaries share the same lower 32 bits - Return DecompressedSizeTooLarge when decode_all_to_vec output exceeds capacity, matching the bounded behaviour of decompress() and the C backend - Add regression test: decompress_with_dict_rejects_frame_exceeding_capacity - Replace #[allow(clippy::...)] with two separate #[expect(..., reason)] attributes on the test module (MSRV 1.92 standard) - Gate bench constants and imports behind #[cfg(zstd_any)] so the file compiles with default features (no zstd backend enabled) - Document that cold bench measures TLS-hit path for pure Rust backend (same dict hash persists in TLS across iterations in the same thread) --- benches/zstd_dict.rs | 26 +++++++++++++++++++---- src/compression/mod.rs | 39 +++++++++++++++++++++++++--------- src/compression/zstd_pure.rs | 41 +++++++++++++++++++++++++++++++----- 3 files changed, 87 insertions(+), 19 deletions(-) diff --git a/benches/zstd_dict.rs b/benches/zstd_dict.rs index 6610045e4..90e409788 100644 --- a/benches/zstd_dict.rs +++ b/benches/zstd_dict.rs @@ -15,11 +15,16 @@ //! cargo bench --bench zstd_dict --features zstd-pure # pure Rust backend //! ``` -use criterion::{BatchSize, Criterion, criterion_group, criterion_main}; +use criterion::{Criterion, criterion_group, criterion_main}; + +#[cfg(zstd_any)] +use criterion::BatchSize; +#[cfg(zstd_any)] use lsm_tree::compression::ZstdDictionary; -// --- test fixtures ------------------------------------------------------- +// --- test fixtures (only needed when a zstd backend is enabled) ---------- +#[cfg(zstd_any)] /// Pre-trained zstd dictionary (206 bytes). /// /// Generated with the `zstd` C library from 100 samples × 32 bytes @@ -36,12 +41,14 @@ const DICT: &[u8] = &[ 2, 2, 2, 2, 2, 2, ]; +#[cfg(zstd_any)] /// Compressed frame for b"hello world hello world hello world" (35 bytes). const COMPRESSED: &[u8] = &[ 40, 181, 47, 253, 35, 98, 64, 12, 7, 35, 149, 0, 0, 96, 104, 101, 108, 108, 111, 32, 119, 111, 114, 108, 100, 32, 1, 0, 175, 75, 18, ]; +#[cfg(zstd_any)] const PLAINTEXT_LEN: usize = 35; // ------------------------------------------------------------------------- @@ -67,8 +74,19 @@ fn bench_decompress_with_dict(c: &mut Criterion) { }); }); - // Cold benchmark: fresh `ZstdDictionary` on each iteration so the cache - // is unpopulated. Measures first-call cost (dict parse + decompress). + // "Cold" benchmark: each iteration gets a fresh `ZstdDictionary` handle + // (new `OnceLock` for the C FFI backend, same dict bytes for both). + // + // For the C FFI backend this truly measures first-call cost: a fresh + // `ZstdDictionary` has an unpopulated `OnceLock`, so `ZSTD_createDDict` + // is invoked on the first decompression and cached in the handle. + // + // For the pure Rust backend the result is different: the TLS decoder is + // keyed by the 64-bit content hash. All iterations share the same DICT + // bytes and therefore the same hash, so after the first iteration the TLS + // entry is still live — subsequent iterations measure the TLS-hit path, not + // dict parsing. True "cold" cost for the pure Rust backend is therefore + // only observable on the very first iteration of the first benchmark run. c.bench_function("decompress_with_dict/cold", |b| { b.iter_batched( || ZstdDictionary::new(DICT), diff --git a/src/compression/mod.rs b/src/compression/mod.rs index 65249d050..f2d1d45fa 100644 --- a/src/compression/mod.rs +++ b/src/compression/mod.rs @@ -82,7 +82,10 @@ pub type ZstdBackend = zstd_pure::ZstdPureProvider; /// ``` #[cfg(zstd_any)] pub struct ZstdDictionary { - id: u32, + /// Full 64-bit xxh3 hash used as the collision-resistant cache key for the + /// thread-local `FrameDecoder` in the pure Rust backend. The public + /// `id() -> u32` method returns the lower 32 bits for external consumers. + id: u64, raw: Arc<[u8]>, /// Pre-compiled decompressor dictionary, lazily initialized on first use. @@ -144,9 +147,25 @@ impl ZstdDictionary { .get_or_init(|| zstd::dict::DecoderDictionary::copy(&self.raw)) } - /// Returns the dictionary ID (truncated xxh3 hash of the raw bytes). + /// Returns a 32-bit dictionary fingerprint (lower 32 bits of xxh3). + /// + /// Intended for display and external interop (e.g., matching against the + /// dict ID embedded in a zstd frame header). For internal cache keying + /// use [`id64`](ZstdDictionary::id64) to avoid hash collisions. #[must_use] + #[expect( + clippy::cast_possible_truncation, + reason = "intentional: public API returns 32-bit fingerprint" + )] pub fn id(&self) -> u32 { + self.id as u32 + } + + /// Returns the full 64-bit xxh3 fingerprint used as a collision-resistant + /// cache key inside the pure Rust backend's TLS decoder. + #[cfg(all(feature = "zstd-pure", not(feature = "zstd")))] + #[must_use] + pub(crate) fn id64(&self) -> u64 { self.id } @@ -161,20 +180,20 @@ impl ZstdDictionary { impl std::fmt::Debug for ZstdDictionary { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("ZstdDictionary") - .field("id", &format_args!("{:#010x}", self.id)) + .field("id", &format_args!("{:#018x}", self.id)) .field("size", &self.raw.len()) .finish_non_exhaustive() // `prepared` cache omitted — implementation detail } } -/// Compute a 32-bit dictionary ID from raw bytes via truncated xxh3. +/// Compute the full 64-bit xxh3 dictionary fingerprint. +/// +/// The full 64-bit value is used as the collision-resistant cache key inside +/// the pure Rust backend's thread-local `FrameDecoder`. The public `id()` +/// method returns only the lower 32 bits for backward-compatible display. #[cfg(zstd_any)] -#[expect( - clippy::cast_possible_truncation, - reason = "intentionally truncated to 32-bit fingerprint" -)] -fn compute_dict_id(raw: &[u8]) -> u32 { - xxhash_rust::xxh3::xxh3_64(raw) as u32 +fn compute_dict_id(raw: &[u8]) -> u64 { + xxhash_rust::xxh3::xxh3_64(raw) } /// Compression algorithm to use diff --git a/src/compression/zstd_pure.rs b/src/compression/zstd_pure.rs index 3eee1d666..7ce4a5da2 100644 --- a/src/compression/zstd_pure.rs +++ b/src/compression/zstd_pure.rs @@ -103,7 +103,11 @@ impl CompressionProvider for ZstdPureProvider { // needed. If the active dictionary changes (e.g. different table), // the decoder is re-initialised transparently. thread_local! { - static TLS_DECODER: std::cell::RefCell> = + // Keyed by the full 64-bit xxh3 fingerprint (`dict.id64()`), not + // the truncated 32-bit public fingerprint, to avoid decoder reuse + // when two distinct dictionaries happen to share the same 32-bit + // prefix. A 64-bit collision is 2^32× less likely than a 32-bit one. + static TLS_DECODER: std::cell::RefCell> = const { std::cell::RefCell::new(None) }; } @@ -111,15 +115,15 @@ impl CompressionProvider for ZstdPureProvider { let mut state = cell.borrow_mut(); // Re-initialise if this is the first call in this thread or if - // the dictionary has changed (different dict_id → different table). - if !matches!(&*state, Some((id, _)) if *id == dict.id()) { + // the dictionary has changed (different id64 → different table). + if !matches!(&*state, Some((id, _)) if *id == dict.id64()) { let parsed = Dictionary::decode_dict(dict.raw()) .map_err(|e| crate::Error::Io(std::io::Error::other(e)))?; let mut decoder = FrameDecoder::new(); decoder .add_dict(parsed) .map_err(|e| crate::Error::Io(std::io::Error::other(e)))?; - *state = Some((dict.id(), decoder)); + *state = Some((dict.id64(), decoder)); } let Some((_, decoder)) = state.as_mut() else { @@ -139,13 +143,25 @@ impl CompressionProvider for ZstdPureProvider { .decode_all_to_vec(data, &mut output) .map_err(|e| crate::Error::Io(std::io::Error::other(e)))?; + // Enforce the same capacity limit as `decompress` and the C backend: + // reject frames that decompress to more bytes than the caller declared. + // `decode_all_to_vec` allocates without a hard cap, so this post-decode + // check prevents unbounded allocation on corrupted or crafted frames. + if output.len() > capacity { + return Err(crate::Error::DecompressedSizeTooLarge { + declared: output.len() as u64, + limit: capacity as u64, + }); + } + Ok(output) }) } } #[cfg(test)] -#[allow(clippy::unwrap_used, clippy::expect_used, reason = "test code")] +#[expect(clippy::unwrap_used, reason = "test code")] +#[expect(clippy::expect_used, reason = "test code")] mod tests { use super::*; use crate::compression::ZstdDictionary; @@ -202,4 +218,19 @@ mod tests { assert_eq!(result, PLAINTEXT); } } + + #[test] + fn decompress_with_dict_rejects_frame_exceeding_capacity() { + // Capacity smaller than the plaintext — should return an error, not + // silently return truncated output (regression for the post-decode + // capacity guard added to `decode_all_to_vec`). + let dict = ZstdDictionary::new(DICT); + let too_small = PLAINTEXT.len() / 2; + let result = ZstdPureProvider::decompress_with_dict(COMPRESSED, &dict, too_small); + assert!( + result.is_err(), + "expected DecompressedSizeTooLarge but got Ok({:?})", + result.unwrap() + ); + } } From 56edf081bf3f072081a857542f25384599f7a30f Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Tue, 7 Apr 2026 02:52:00 +0300 Subject: [PATCH 3/4] fix(compression): restore decode_all_to_vec with post-decode capacity guard The FrameDecoder::init + bounded_read approach does not work: FrameDecoder processes the full frame at once and its Read impl returns 0 bytes after init unless driven by decode_all_to_vec. StreamingDecoder supports streaming reads; FrameDecoder does not. Restore decode_all_to_vec with an explicit post-decode capacity check: if output.len() > capacity return DecompressedSizeTooLarge, matching the bounded behaviour of decompress() and the C FFI backend. Add detailed comment explaining why bounded_read cannot be used for dict decompression. --- src/compression/zstd_pure.rs | 32 +++++++++++++++++++++++--------- 1 file changed, 23 insertions(+), 9 deletions(-) diff --git a/src/compression/zstd_pure.rs b/src/compression/zstd_pure.rs index 7ce4a5da2..4e41365c9 100644 --- a/src/compression/zstd_pure.rs +++ b/src/compression/zstd_pure.rs @@ -133,20 +133,34 @@ impl CompressionProvider for ZstdPureProvider { ))); }; - // `decode_all_to_vec` calls `init(&mut input)` internally — the - // mutable borrow advances the slice cursor past the frame header - // before block decoding begins, which is the correct usage of - // `FrameDecoder`. The dictionary stored in `decoder.dicts` is - // reused on every call without re-parsing. + // `decode_all_to_vec` decodes the entire frame in one pass and + // appends to `output`. The dictionary stored in `decoder.dicts` + // is reused without re-parsing on every call. + // + // The `bounded_read` approach used by the non-dictionary path + // (`decompress`) is not applicable here: `bounded_read` calls + // `Read::read` in a loop, which requires the decoder to pull + // compressed blocks lazily from an internal reader reference. + // `StreamingDecoder` supports this (it holds a `&[u8]` reference); + // `FrameDecoder` does not — it processes the full slice at once + // and its `Read` impl returns 0 bytes after `init` if not used + // together with `decode_all_to_vec`. + // + // The capacity limit is therefore enforced by a post-decode check + // rather than during streaming. The allocation is bounded by the + // frame content size: zstd frames embed the decompressed size in + // their header, so allocations from crafted frames are bounded by + // that declared size. If the declared size itself is maliciously + // large, the post-decode check below returns `DecompressedSizeTooLarge` + // before the data is used. let mut output = Vec::with_capacity(capacity); decoder .decode_all_to_vec(data, &mut output) .map_err(|e| crate::Error::Io(std::io::Error::other(e)))?; - // Enforce the same capacity limit as `decompress` and the C backend: - // reject frames that decompress to more bytes than the caller declared. - // `decode_all_to_vec` allocates without a hard cap, so this post-decode - // check prevents unbounded allocation on corrupted or crafted frames. + // Return an error if the frame decompressed to more bytes than + // the caller declared. Matches the bounded behaviour of + // `decompress()` and the C FFI backend. if output.len() > capacity { return Err(crate::Error::DecompressedSizeTooLarge { declared: output.len() as u64, From 76fb7259fe1230fe9c62e80d640f1e20f22f0b49 Mon Sep 17 00:00:00 2001 From: Dmitry Prudnikov Date: Tue, 7 Apr 2026 03:21:50 +0300 Subject: [PATCH 4/4] fix(compression): align doc comments and test assertion with caching semantics - Update ZstdDictionary::new doc: id stored as full 64-bit hash, id() truncates to u32 at call time (not at construction) - Tighten prepared field comment: ZSTD_DDict is cached per handle (not globally per unique bytes) via Arc> - Strengthen decompress_with_dict_rejects_frame_exceeding_capacity: assert DecompressedSizeTooLarge variant specifically instead of is_err(); normalize FrameDecoderError::TargetTooSmall to DecompressedSizeTooLarge for a consistent public error API Addresses Copilot review threads #10, #11, #12. --- src/compression/mod.rs | 10 ++++++---- src/compression/zstd_pure.rs | 27 +++++++++++++++++++++------ 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/src/compression/mod.rs b/src/compression/mod.rs index f2d1d45fa..3781b48f2 100644 --- a/src/compression/mod.rs +++ b/src/compression/mod.rs @@ -90,9 +90,10 @@ pub struct ZstdDictionary { /// Pre-compiled decompressor dictionary, lazily initialized on first use. /// - /// Wrapped in `Arc>` so all clones share the same compiled - /// instance — `ZSTD_DDict` is created exactly once per unique dictionary, - /// regardless of how many table readers hold a reference to it. + /// Wrapped in `Arc>` so all clones of the same + /// `ZstdDictionary` share one compiled instance. With the C FFI backend, + /// `ZSTD_DDict` is therefore created at most once per dictionary handle, + /// regardless of how many table readers hold a clone of that handle. /// /// Available only with the C FFI backend (`zstd` feature). The pure Rust /// backend caches an equivalent `FrameDecoder` in thread-local storage @@ -119,7 +120,8 @@ impl ZstdDictionary { /// /// The raw bytes should be a pre-trained zstd dictionary (e.g., output /// of `zstd::dict::from_continuous` or `zstd --train`). The dictionary - /// ID is computed as a truncated xxh3 hash of the content. + /// ID is stored as a full 64-bit xxh3 hash; the public [`ZstdDictionary::id`] + /// method returns the lower 32 bits for external consumers. #[must_use] pub fn new(raw: &[u8]) -> Self { Self { diff --git a/src/compression/zstd_pure.rs b/src/compression/zstd_pure.rs index 4e41365c9..b45a949d6 100644 --- a/src/compression/zstd_pure.rs +++ b/src/compression/zstd_pure.rs @@ -154,9 +154,24 @@ impl CompressionProvider for ZstdPureProvider { // large, the post-decode check below returns `DecompressedSizeTooLarge` // before the data is used. let mut output = Vec::with_capacity(capacity); - decoder - .decode_all_to_vec(data, &mut output) - .map_err(|e| crate::Error::Io(std::io::Error::other(e)))?; + decoder.decode_all_to_vec(data, &mut output).map_err(|e| { + // `decode_all_to_vec` uses the Vec's capacity as a hard + // allocation limit. When the frame's decompressed content + // would exceed that limit, it returns `TargetTooSmall`. + // Normalise this to `DecompressedSizeTooLarge` for a + // consistent error API with the C FFI backend. + if matches!( + e, + structured_zstd::decoding::errors::FrameDecoderError::TargetTooSmall + ) { + crate::Error::DecompressedSizeTooLarge { + declared: capacity as u64 + 1, + limit: capacity as u64, + } + } else { + crate::Error::Io(std::io::Error::other(e)) + } + })?; // Return an error if the frame decompressed to more bytes than // the caller declared. Matches the bounded behaviour of @@ -242,9 +257,9 @@ mod tests { let too_small = PLAINTEXT.len() / 2; let result = ZstdPureProvider::decompress_with_dict(COMPRESSED, &dict, too_small); assert!( - result.is_err(), - "expected DecompressedSizeTooLarge but got Ok({:?})", - result.unwrap() + matches!(result, Err(crate::Error::DecompressedSizeTooLarge { .. })), + "expected DecompressedSizeTooLarge but got {:?}", + result ); } }