Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
1172d68
feat(compression): zstd dictionary compression support
polaz Mar 23, 2026
432e198
fix(compression): thread zstd dictionary through Scanner read path
polaz Mar 23, 2026
f905939
fix(compression): use Option<u32> for dict mismatch, ErrorKind::Unsup…
polaz Mar 23, 2026
28f2e8d
fix(compression): adapt zstd dict arms to refactored encrypt pipeline
polaz Mar 23, 2026
10b9d51
style(compression): format zstd dict test call sites
polaz Mar 23, 2026
34b58bf
test(compression): block-level zstd dictionary roundtrip and error paths
polaz Mar 23, 2026
7e338b1
test(compression): validate range scan key-value content with zstd dict
polaz Mar 23, 2026
d33146c
fix(compression): pass Fs to blob Writer::new in zstd dict test
polaz Mar 23, 2026
1c1544e
fix(compression): validate zstd dictionary at config open, reject Zst…
polaz Mar 23, 2026
2f38450
fix(compression): validate only data block policies for zstd dictionary
polaz Mar 23, 2026
a80e9df
fix(compression): validate dict_id at table recovery, fix per-level test
polaz Mar 23, 2026
0c556f0
style(test): annotate Guard trait import used by into_inner()
polaz Mar 23, 2026
edaa2da
fix(test): remove flaky zero-id assertion from dictionary determinism…
polaz Mar 23, 2026
7757297
docs(compression): clarify auto-deref in zstd dictionary validation loop
polaz Mar 23, 2026
8cf95bc
fix(compression): reject ZstdDict blob compression at open, remove te…
polaz Mar 23, 2026
6ae13dd
fix(compression): add missing zstd_dict param to multi_writer test re…
polaz Mar 23, 2026
dc27d0d
refactor(compression): explicit ref pattern in dict validation loop
polaz Mar 23, 2026
b7d5b96
test(compression): assert exact wire bytes and use real serializer in…
polaz Mar 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,15 @@ Allows using `LZ4` compression, powered by [`lz4_flex`](https://github.com/PSeit

*Disabled by default.*

### zstd

Allows using `Zstd` compression, powered by [`zstd`](https://github.com/gyscos/zstd-rs).
Supports both regular zstd (`CompressionType::Zstd`) and dictionary compression
(`CompressionType::ZstdDict`) for improved ratios on small table blocks (4–64 KiB).
Blob-file dictionary compression is currently not supported.

Comment thread
polaz marked this conversation as resolved.
*Disabled by default.*

### bytes

Uses [`bytes`](https://github.com/tokio-rs/bytes) as the underlying `Slice` type.
Expand Down
2 changes: 2 additions & 0 deletions src/blob_tree/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ impl<'a> BlobIngestion<'a> {
false,
false,
index.config.encryption.clone(),
#[cfg(feature = "zstd")]
index.config.zstd_dictionary.clone(),
index.config.comparator.clone(),
#[cfg(feature = "metrics")]
index.metrics.clone(),
Expand Down
8 changes: 8 additions & 0 deletions src/blob_tree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,12 @@ impl AbstractTree for BlobTree {
table_writer.use_prefix_extractor(self.index.config.prefix_extractor.clone());
table_writer = table_writer.use_encryption(self.index.config.encryption.clone());

#[cfg(feature = "zstd")]
{
table_writer =
table_writer.use_zstd_dictionary(self.index.config.zstd_dictionary.clone());
}

#[expect(
clippy::expect_used,
reason = "cannot create blob tree without defining kv separation options"
Expand Down Expand Up @@ -541,6 +547,8 @@ impl AbstractTree for BlobTree {
pin_filter,
pin_index,
self.index.config.encryption.clone(),
#[cfg(feature = "zstd")]
self.index.config.zstd_dictionary.clone(),
self.index.config.comparator.clone(),
#[cfg(feature = "metrics")]
self.index.metrics.clone(),
Expand Down
11 changes: 9 additions & 2 deletions src/compaction/flavour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ pub(super) fn prepare_table_writer(
let last_level = (version.level_count() - 1) as u8;
let is_last_level = payload.dest_level == last_level;

Ok(table_writer
let table_writer = table_writer
.use_data_block_restart_interval(data_block_restart_interval)
.use_index_block_restart_interval(index_block_restart_interval)
.use_data_block_compression(data_block_compression)
Expand Down Expand Up @@ -123,7 +123,12 @@ pub(super) fn prepare_table_writer(
None => BloomConstructionPolicy::BitsPerKey(0.0),
}
}
}))
});

#[cfg(feature = "zstd")]
let table_writer = table_writer.use_zstd_dictionary(opts.config.zstd_dictionary.clone());

Ok(table_writer)
}

// TODO: find a better name
Expand Down Expand Up @@ -383,6 +388,8 @@ impl StandardCompaction {
pin_filter,
pin_index,
opts.config.encryption.clone(),
#[cfg(feature = "zstd")]
opts.config.zstd_dictionary.clone(),
opts.config.comparator.clone(),
#[cfg(feature = "metrics")]
opts.metrics.clone(),
Expand Down
227 changes: 227 additions & 0 deletions src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,83 @@ use crate::coding::{Decode, Encode};
use byteorder::{ReadBytesExt, WriteBytesExt};
use std::io::{Read, Write};

#[cfg(feature = "zstd")]
use std::sync::Arc;

/// Pre-trained zstd dictionary for improved compression of small blocks.
///
/// Zstd dictionaries significantly improve compression ratios for blocks
/// in the 4–64 KiB range typical of LSM-trees, especially when data has
/// recurring patterns (e.g., structured keys, repeated prefixes,
/// JSON/MessagePack values).
///
/// The dictionary is identified by a 32-bit ID derived from its content
/// (truncated xxh3 hash). This ID is stored alongside compressed blocks
/// so readers can detect dictionary mismatches.
///
/// # Example
///
/// ```ignore
/// use lsm_tree::ZstdDictionary;
///
/// let samples: &[u8] = &training_data;
/// let dict = ZstdDictionary::new(samples);
/// ```
#[cfg(feature = "zstd")]
#[derive(Clone)]
pub struct ZstdDictionary {
id: u32,
raw: Arc<[u8]>,
}

#[cfg(feature = "zstd")]
impl ZstdDictionary {
/// Creates a new dictionary from raw bytes.
///
/// The raw bytes should be a pre-trained zstd dictionary (e.g., output
/// of `zstd::dict::from_continuous` or `zstd --train`). The dictionary
/// ID is computed as a truncated xxh3 hash of the content.
#[must_use]
pub fn new(raw: &[u8]) -> Self {
Self {
id: compute_dict_id(raw),
raw: Arc::from(raw),
}
}

/// Returns the dictionary ID (truncated xxh3 hash of the raw bytes).
#[must_use]
pub fn id(&self) -> u32 {
self.id
}

/// Returns the raw dictionary bytes.
#[must_use]
pub fn raw(&self) -> &[u8] {
&self.raw
}
}

#[cfg(feature = "zstd")]
impl std::fmt::Debug for ZstdDictionary {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ZstdDictionary")
.field("id", &format_args!("{:#010x}", self.id))
.field("size", &self.raw.len())
.finish()
}
}

/// Compute a 32-bit dictionary ID from raw bytes via truncated xxh3.
#[cfg(feature = "zstd")]
#[expect(
clippy::cast_possible_truncation,
reason = "intentionally truncated to 32-bit fingerprint"
)]
fn compute_dict_id(raw: &[u8]) -> u32 {
xxhash_rust::xxh3::xxh3_64(raw) as u32
}

/// Compression algorithm to use
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
pub enum CompressionType {
Expand Down Expand Up @@ -39,6 +116,24 @@ pub enum CompressionType {
// via CompressionType::Zstd(level) must uphold the 1..=22 invariant.
#[cfg(feature = "zstd")]
Zstd(i32),

/// Zstd compression with a pre-trained dictionary
///
/// Uses a pre-trained dictionary for significantly better compression
/// ratios on small blocks (4–64 KiB), especially when data has recurring
/// patterns.
///
/// `level` is the compression level (1–22), `dict_id` identifies the
/// dictionary (truncated xxh3 hash of the dictionary bytes). The actual
/// dictionary must be provided via [`Config`] or the relevant writer/reader.
#[cfg(feature = "zstd")]
ZstdDict {
/// Compression level (1–22)
level: i32,

/// Dictionary fingerprint for mismatch detection
dict_id: u32,
},
}

impl CompressionType {
Expand Down Expand Up @@ -70,6 +165,21 @@ impl CompressionType {
Self::validate_zstd_level(level)?;
Ok(Self::Zstd(level))
}

/// Create a zstd dictionary compression configuration with checked level.
///
/// The `dict_id` should come from [`ZstdDictionary::id`] to ensure
/// consistency between the compression type stored on disk and the
/// dictionary used at runtime.
///
/// # Errors
///
/// Returns an error if `level` is outside the valid range `1..=22`.
#[cfg(feature = "zstd")]
pub fn zstd_dict(level: i32, dict_id: u32) -> crate::Result<Self> {
Self::validate_zstd_level(level)?;
Ok(Self::ZstdDict { level, dict_id })
}
}

impl std::fmt::Display for CompressionType {
Expand All @@ -85,6 +195,9 @@ impl std::fmt::Display for CompressionType {

#[cfg(feature = "zstd")]
Self::Zstd(_) => "zstd",

#[cfg(feature = "zstd")]
Self::ZstdDict { .. } => "zstd+dict",
}
)
}
Expand Down Expand Up @@ -117,6 +230,21 @@ impl Encode for CompressionType {
)]
writer.write_i8(*level as i8)?;
}

#[cfg(feature = "zstd")]
Self::ZstdDict { level, dict_id } => {
writer.write_u8(4)?;
debug_assert!(
(1..=22).contains(level),
"zstd level {level} outside valid range 1..=22"
);
#[expect(
clippy::cast_possible_truncation,
reason = "level range 1..=22 fits i8"
)]
writer.write_i8(*level as i8)?;
byteorder::WriteBytesExt::write_u32::<byteorder::LittleEndian>(writer, *dict_id)?;
}
}

Ok(())
Expand All @@ -141,6 +269,14 @@ impl Decode for CompressionType {
Ok(Self::Zstd(level))
}

#[cfg(feature = "zstd")]
4 => {
let level = i32::from(reader.read_i8()?);
Self::validate_zstd_level(level)?;
let dict_id = byteorder::ReadBytesExt::read_u32::<byteorder::LittleEndian>(reader)?;
Ok(Self::ZstdDict { level, dict_id })
}

tag => Err(crate::Error::InvalidTag(("CompressionType", tag))),
}
}
Expand Down Expand Up @@ -220,5 +356,96 @@ mod tests {
let result = CompressionType::decode_from(&mut &corrupted[..]);
assert!(result.is_err(), "level 23 should be rejected on decode");
}

#[test]
fn compression_serialize_zstd_dict() {
let serialized = CompressionType::ZstdDict {
level: 3,
dict_id: 0xDEAD_BEEF,
}
.encode_into_vec();
// tag=4, level=3 as i8, dict_id=0xDEAD_BEEF in little-endian
assert_eq!(serialized, [4, 3, 0xEF, 0xBE, 0xAD, 0xDE]);
}
Comment thread
polaz marked this conversation as resolved.

#[test]
fn compression_roundtrip_zstd_dict() {
for level in [1, 3, 9, 19] {
for dict_id in [0, 1, 0xDEAD_BEEF, u32::MAX] {
let original = CompressionType::ZstdDict { level, dict_id };
let serialized = original.encode_into_vec();
let decoded =
CompressionType::decode_from(&mut &serialized[..]).expect("decode failed");
assert_eq!(original, decoded);
}
}
}

#[test]
fn compression_display_zstd_dict() {
assert_eq!(
format!(
"{}",
CompressionType::ZstdDict {
level: 3,
dict_id: 42
}
),
"zstd+dict"
);
}

#[test]
fn compression_zstd_dict_rejects_invalid_level() {
for invalid_level in [0, 23, -1, 200] {
let result = CompressionType::zstd_dict(invalid_level, 42);
assert!(result.is_err(), "level {invalid_level} should be rejected");
}
}

#[test]
fn compression_zstd_dict_decode_rejects_invalid_level() {
// Serialize a valid ZstdDict, then corrupt the level byte to 0
let mut buf = CompressionType::ZstdDict {
level: 3,
dict_id: 42,
}
.encode_into_vec();
assert_eq!(buf[0], 4); // tag
buf[1] = 0; // corrupt level to 0 (out of range 1..=22)

let result = CompressionType::decode_from(&mut &buf[..]);
assert!(result.is_err(), "level 0 should be rejected on decode");
}

#[test]
fn zstd_dictionary_id_deterministic() {
let dict_bytes = b"sample dictionary content for testing";
let d1 = ZstdDictionary::new(dict_bytes);
let d2 = ZstdDictionary::new(dict_bytes);
assert_eq!(d1.id(), d2.id());
}

#[test]
fn zstd_dictionary_different_content_different_id() {
let d1 = ZstdDictionary::new(b"dictionary one");
let d2 = ZstdDictionary::new(b"dictionary two");
assert_ne!(d1.id(), d2.id());
}

#[test]
fn zstd_dictionary_raw_roundtrip() {
let raw = b"my dictionary bytes";
let dict = ZstdDictionary::new(raw);
assert_eq!(dict.raw(), raw);
}

#[test]
fn zstd_dictionary_debug_format() {
let dict = ZstdDictionary::new(b"test");
let debug = format!("{dict:?}");
assert!(debug.contains("ZstdDictionary"));
assert!(debug.contains("size: 4"));
}
}
}
Loading
Loading