From 887f92901ae6d7311b84d00d35bbd59a0b873697 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Sat, 30 May 2026 20:27:30 -0700 Subject: [PATCH 1/4] feat(parquet): truncate manifest bounds for STRING/BINARY/FIXED to match Java Apply Iceberg's default 16-unit bound truncation to manifest lower/upper bounds for STRING (codepoint-based), BINARY, and FIXED (byte-based) when collecting per-row-group statistics in `MinMaxColAggregator`. This mirrors Java's `org.apache.iceberg.util.UnicodeUtil#truncateStringMin/Max` and `BinaryUtil#truncateBinaryMin/Max`, called from `ParquetUtil#updateMin/Max`. Without this, long values produced manifest bounds that exceeded the conventional 16-unit budget and didn't agree with bounds Spark/Java would have written for the same data. Upper-bound truncation: take the 16-unit prefix, then increment the last unit; on overflow drop that position and try the previous one. If every position in the prefix is at max, we cannot produce a sound upper bound and drop it (matches Java semantics; lower bound is still recorded). For STRING upper bounds we walk past UTF-16 surrogates (U+D800-U+DFFF) when incrementing because Rust's `char::from_u32` rejects them; Java's `Character.isValidCodePoint` accepts surrogates, but skipping them in Rust preserves monotonic ordering for valid UTF-8. Tests added (18): - 13 helper unit tests covering short input, long input, overflow drop, all-max fallback, and the UTF-16 surrogate skip - 4 aggregator tests for STRING/BINARY truncation behavior and the drop-only-upper case - 1 end-to-end tokio test that writes long-string rows through ParquetWriter and asserts the resulting `data_file.lower_bounds()` / `upper_bounds()` --- .../src/writer/file_writer/parquet_writer.rs | 434 +++++++++++++++++- 1 file changed, 426 insertions(+), 8 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 840d1a5f16..7ab06f7ecc 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -39,8 +39,8 @@ use crate::arrow::{ use crate::io::{FileIO, FileWrite, OutputFile}; use crate::spec::{ DataContentType, DataFileBuilder, DataFileFormat, Datum, ListType, Literal, MapType, - NestedFieldRef, PartitionSpec, PrimitiveType, Schema, SchemaRef, SchemaVisitor, Struct, - StructType, TableMetadata, Type, visit_schema, + NestedFieldRef, PartitionSpec, PrimitiveLiteral, PrimitiveType, Schema, SchemaRef, + SchemaVisitor, Struct, StructType, TableMetadata, Type, visit_schema, }; use crate::transform::create_transform_function; use crate::writer::{CurrentFileStatus, DataFile}; @@ -218,6 +218,105 @@ pub struct ParquetWriter { nan_value_count_visitor: NanValueCountVisitor, } +/// Default Iceberg manifest bound truncation length, matching Java's +/// `MetricsModes.Truncate(16)` default for STRING and the conventional +/// 16-byte truncation for BINARY/FIXED. +const ICEBERG_BOUND_TRUNCATE_LENGTH: usize = 16; + +/// Returns a string of at most `length` Unicode code points that is `<= input`. +/// Mirrors `org.apache.iceberg.util.UnicodeUtil#truncateStringMin`. +fn truncate_string_min(input: &str, length: usize) -> String { + match input.char_indices().nth(length) { + Some((byte_offset, _)) => input[..byte_offset].to_string(), + None => input.to_string(), + } +} + +/// Returns a string of at most `length` Unicode code points that is `> input`, +/// or `None` when no such bound exists (e.g. all `length` leading code points +/// are `char::MAX`). Mirrors `org.apache.iceberg.util.UnicodeUtil#truncateStringMax`. +fn truncate_string_max(input: &str, length: usize) -> Option { + let mut prefix: Vec = input.chars().take(length + 1).collect(); + if prefix.len() <= length { + return Some(input.to_string()); + } + prefix.truncate(length); + for i in (0..prefix.len()).rev() { + let mut next_cp = prefix[i] as u32 + 1; + while next_cp <= 0x10FFFF { + if let Some(c) = char::from_u32(next_cp) { + prefix[i] = c; + return Some(prefix.into_iter().take(i + 1).collect()); + } + next_cp += 1; + } + } + None +} + +/// Returns at most `length` bytes of `input` (a valid lower bound). +/// Mirrors `org.apache.iceberg.util.BinaryUtil#truncateBinaryMin`. +fn truncate_binary_min(input: &[u8], length: usize) -> Vec { + if input.len() <= length { + input.to_vec() + } else { + input[..length].to_vec() + } +} + +/// Returns at most `length` bytes that compare strictly greater than `input`, +/// or `None` when the leading `length` bytes are all `0xFF`. +/// Mirrors `org.apache.iceberg.util.BinaryUtil#truncateBinaryMax`. +fn truncate_binary_max(input: &[u8], length: usize) -> Option> { + if input.len() <= length { + return Some(input.to_vec()); + } + let mut prefix = input[..length].to_vec(); + for i in (0..prefix.len()).rev() { + if prefix[i] != u8::MAX { + prefix[i] += 1; + prefix.truncate(i + 1); + return Some(prefix); + } + } + None +} + +/// Apply Iceberg manifest truncation for STRING/BINARY/FIXED lower bounds. +/// Other primitive types are returned unchanged. +fn truncate_lower_bound(ty: &PrimitiveType, datum: Datum) -> Datum { + match (ty, datum.literal()) { + (PrimitiveType::String, PrimitiveLiteral::String(s)) => { + Datum::string(truncate_string_min(s, ICEBERG_BOUND_TRUNCATE_LENGTH)) + } + (PrimitiveType::Binary, PrimitiveLiteral::Binary(bytes)) => { + Datum::binary(truncate_binary_min(bytes, ICEBERG_BOUND_TRUNCATE_LENGTH)) + } + (PrimitiveType::Fixed(_), PrimitiveLiteral::Binary(bytes)) => { + Datum::fixed(truncate_binary_min(bytes, ICEBERG_BOUND_TRUNCATE_LENGTH)) + } + _ => datum, + } +} + +/// Apply Iceberg manifest truncation for STRING/BINARY/FIXED upper bounds. +/// Returns `None` only for STRING/BINARY/FIXED inputs whose truncated prefix +/// cannot be incremented (no valid upper bound at the truncate length). +fn truncate_upper_bound(ty: &PrimitiveType, datum: Datum) -> Option { + match (ty, datum.literal()) { + (PrimitiveType::String, PrimitiveLiteral::String(s)) => { + truncate_string_max(s, ICEBERG_BOUND_TRUNCATE_LENGTH).map(Datum::string) + } + (PrimitiveType::Binary, PrimitiveLiteral::Binary(bytes)) => { + truncate_binary_max(bytes, ICEBERG_BOUND_TRUNCATE_LENGTH).map(Datum::binary) + } + (PrimitiveType::Fixed(_), PrimitiveLiteral::Binary(bytes)) => { + truncate_binary_max(bytes, ICEBERG_BOUND_TRUNCATE_LENGTH).map(Datum::fixed) + } + _ => Some(datum), + } +} + /// Used to aggregate min and max value of each column. struct MinMaxColAggregator { lower_bounds: HashMap, @@ -258,6 +357,12 @@ impl MinMaxColAggregator { } /// Update statistics + /// + /// Inexact (truncated) Parquet bounds are kept: a Parquet-prefix-truncated + /// min is still `<=` every value in the column, and a Parquet-truncated max + /// is still `>=` every value. Both are then re-truncated to the Iceberg + /// manifest bound length to match Java semantics + /// (`org.apache.iceberg.parquet.ParquetUtil#updateMin/updateMax`). fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> { let Some(ty) = self .schema @@ -275,26 +380,26 @@ impl MinMaxColAggregator { )); }; - if value.min_is_exact() { + if value.min_bytes_opt().is_some() { let Some(min_datum) = get_parquet_stat_min_as_datum(&ty, &value)? else { return Err(Error::new( ErrorKind::Unexpected, format!("Statistics {value} is not match with field type {ty}."), )); }; - - self.update_state_min(field_id, min_datum); + self.update_state_min(field_id, truncate_lower_bound(&ty, min_datum)); } - if value.max_is_exact() { + if value.max_bytes_opt().is_some() { let Some(max_datum) = get_parquet_stat_max_as_datum(&ty, &value)? else { return Err(Error::new( ErrorKind::Unexpected, format!("Statistics {value} is not match with field type {ty}."), )); }; - - self.update_state_max(field_id, max_datum); + if let Some(truncated) = truncate_upper_bound(&ty, max_datum) { + self.update_state_max(field_id, truncated); + } } Ok(()) @@ -861,6 +966,64 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_parquet_writer_truncates_long_string_bounds() -> Result<()> { + let temp_dir = TempDir::new().unwrap(); + let file_io = FileIO::new_with_fs(); + let location_gen = DefaultLocationGenerator::with_data_location( + temp_dir.path().to_str().unwrap().to_string(), + ); + let file_name_gen = + DefaultFileNameGenerator::new("test".to_string(), None, DataFileFormat::Parquet); + + let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![ + Field::new("s", DataType::Utf8, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "0".to_string(), + )])), + ])); + + // Strings ≫ 16 codepoints. The smallest sorts to "a..." and the largest + // to "z...", so manifest bounds must be 16-codepoint truncations of each. + let values = vec![ + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa".to_string(), + "mmmmmmmmmmmmmmmmmmmmmmmmmmmmmm".to_string(), + "zzzzzzzzzzzzzzzzzzzzzzzzzzzzzz".to_string(), + ]; + let col = Arc::new(arrow_array::StringArray::from(values)) as ArrayRef; + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![col]).unwrap(); + + let output_file = file_io.new_output( + location_gen.generate_location(None, &file_name_gen.generate_file_name()), + )?; + let mut pw = ParquetWriterBuilder::new( + WriterProperties::builder().build(), + Arc::new(arrow_schema.as_ref().try_into().unwrap()), + ) + .build(output_file) + .await?; + pw.write(&batch).await?; + let res = pw.close().await?; + let data_file = res + .into_iter() + .next() + .unwrap() + .content(DataContentType::Data) + .partition(Struct::empty()) + .partition_spec_id(0) + .build() + .unwrap(); + + let lower = data_file.lower_bounds().get(&0).expect("lower bound set"); + let upper = data_file.upper_bounds().get(&0).expect("upper bound set"); + + // Lower: 16 'a's; Upper: 15 'z's followed by '{' (z+1). + assert_eq!(lower, &Datum::string("aaaaaaaaaaaaaaaa")); + assert_eq!(upper, &Datum::string("zzzzzzzzzzzzzzz{")); + + Ok(()) + } + #[tokio::test] async fn test_parquet_writer_with_complex_schema() -> Result<()> { let temp_dir = TempDir::new().unwrap(); @@ -2279,4 +2442,259 @@ mod tests { assert_eq!(lower_bounds, HashMap::from([(0, Datum::int(i32::MIN))])); assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))])); } + + #[test] + fn test_truncate_string_min_short_input_unchanged() { + // ≤ length code points: returned as-is. + assert_eq!(truncate_string_min("abc", 16), "abc"); + assert_eq!(truncate_string_min("", 16), ""); + let exactly_16 = "0123456789abcdef"; + assert_eq!(truncate_string_min(exactly_16, 16), exactly_16); + } + + #[test] + fn test_truncate_string_min_long_input_truncates_codepoints() { + let s = "0123456789abcdefXYZ"; // 19 ASCII + assert_eq!(truncate_string_min(s, 16), "0123456789abcdef"); + // Multi-byte: each Greek letter is 2 UTF-8 bytes but 1 code point. + let greek = "α".repeat(20); + let truncated = truncate_string_min(&greek, 16); + assert_eq!(truncated.chars().count(), 16); + assert!(truncated.chars().all(|c| c == 'α')); + // 4-byte UTF-8 (emoji) at exact boundary. + let emoji_str = "😀".repeat(20); + let truncated = truncate_string_min(&emoji_str, 16); + assert_eq!(truncated.chars().count(), 16); + } + + #[test] + fn test_truncate_string_max_short_input_unchanged() { + assert_eq!(truncate_string_max("abc", 16), Some("abc".to_string())); + let exactly_16 = "0123456789abcdef"; + assert_eq!( + truncate_string_max(exactly_16, 16), + Some(exactly_16.to_string()) + ); + } + + #[test] + fn test_truncate_string_max_long_input_increments_last_codepoint() { + // ASCII: last 'f' (0x66) → 'g' (0x67). + let s = "0123456789abcdefXYZ"; + assert_eq!( + truncate_string_max(s, 16), + Some("0123456789abcdeg".to_string()) + ); + // Multi-byte: 'α' (U+03B1) → 'β' (U+03B2). + let s = "α".repeat(20); + let truncated = truncate_string_max(&s, 4).unwrap(); + assert_eq!(truncated, "αααβ"); + } + + #[test] + fn test_truncate_string_max_overflow_drops_position() { + // Last code point is char::MAX (U+10FFFF). Increment fails → drop and try i-1. + let mut s = String::from("ab"); + s.push(char::MAX); + s.push('x'); + // length=3 code points; input has 4 → triggers truncation. + let truncated = truncate_string_max(&s, 3).unwrap(); + // Position 2 fails to increment; position 1 ('b' → 'c'); result "ac" (length 2). + assert_eq!(truncated, "ac"); + } + + #[test] + fn test_truncate_string_max_skips_utf16_surrogates() { + // Last code point at U+D7FF; +1 lands on a surrogate (0xD800), which is + // not a Rust char. Helper must skip to U+E000 for a valid bound. + let mut s = String::from("a"); + s.push(char::from_u32(0xD7FF).unwrap()); + s.push('x'); + let truncated = truncate_string_max(&s, 2).unwrap(); + assert_eq!(truncated.chars().count(), 2); + let last = truncated.chars().nth(1).unwrap() as u32; + assert_eq!(last, 0xE000); + } + + #[test] + fn test_truncate_string_max_all_max_returns_none() { + let s: String = std::iter::repeat_n(char::MAX, 20).collect(); + assert_eq!(truncate_string_max(&s, 16), None); + } + + #[test] + fn test_truncate_binary_min_short_input_unchanged() { + assert_eq!(truncate_binary_min(b"abc", 16), b"abc".to_vec()); + let exactly_16: Vec = (0..16).collect(); + assert_eq!(truncate_binary_min(&exactly_16, 16), exactly_16); + } + + #[test] + fn test_truncate_binary_min_long_input_truncates() { + let bytes: Vec = (0..32).collect(); + let truncated = truncate_binary_min(&bytes, 16); + assert_eq!(truncated, (0..16).collect::>()); + } + + #[test] + fn test_truncate_binary_max_short_input_unchanged() { + assert_eq!(truncate_binary_max(b"abc", 16), Some(b"abc".to_vec())); + } + + #[test] + fn test_truncate_binary_max_long_input_increments_last_byte() { + let bytes: Vec = (0..32).collect(); + // First 16 bytes are 0..15. Last byte 15 → 16. + let mut expected: Vec = (0..15).collect(); + expected.push(16); + assert_eq!(truncate_binary_max(&bytes, 16), Some(expected)); + } + + #[test] + fn test_truncate_binary_max_drops_trailing_0xff() { + let mut bytes = vec![0xFEu8]; + bytes.extend(std::iter::repeat_n(0xFFu8, 31)); + // length=16. Prefix is [0xFE, 0xFF, 0xFF, ..., 0xFF]. + // Walk from i=15 to i=0: positions 15..1 are 0xFF (skip), position 0 is 0xFE → 0xFF. + // Result is the 1-byte vec [0xFF]. + assert_eq!(truncate_binary_max(&bytes, 16), Some(vec![0xFFu8])); + } + + #[test] + fn test_truncate_binary_max_all_ff_returns_none() { + let bytes = vec![0xFFu8; 32]; + assert_eq!(truncate_binary_max(&bytes, 16), None); + } + + #[test] + fn test_min_max_aggregator_keeps_inexact_string_stats() { + // Regression: previously, inexact (Parquet-truncated) stats were + // dropped, leaving long-string columns with no manifest bounds. + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "s", Type::Primitive(PrimitiveType::String)) + .with_id(0) + .into(), + ]) + .build() + .unwrap(), + ); + let mut agg = MinMaxColAggregator::new(schema); + let stats = Statistics::ByteArray( + ValueStatistics::new( + Some("aaaaaaaaaaaaaaaaaaaaaaaaaaaa".as_bytes().into()), + Some("zzzzzzzzzzzzzzzzzzzzzzzzzzzz".as_bytes().into()), + None, + None, + false, + ) + .with_min_is_exact(false) + .with_max_is_exact(false), + ); + agg.update(0, stats).unwrap(); + let (lower, upper) = agg.produce(); + assert_eq!( + lower.get(&0), + Some(&Datum::string("aaaaaaaaaaaaaaaa")) // 16 'a's + ); + // 'z' (0x7A) → '{' (0x7B); 16 codepoints total. + assert_eq!(upper.get(&0), Some(&Datum::string("zzzzzzzzzzzzzzz{"))); + } + + #[test] + fn test_min_max_aggregator_truncates_long_string_bounds() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "s", Type::Primitive(PrimitiveType::String)) + .with_id(0) + .into(), + ]) + .build() + .unwrap(), + ); + let mut agg = MinMaxColAggregator::new(schema); + // Exact stats but values > 16 codepoints. + let stats = Statistics::ByteArray(ValueStatistics::new( + Some("0123456789abcdefXYZ".as_bytes().into()), + Some("0123456789abcdefXYZ".as_bytes().into()), + None, + None, + false, + )); + agg.update(0, stats).unwrap(); + let (lower, upper) = agg.produce(); + assert_eq!(lower.get(&0), Some(&Datum::string("0123456789abcdef"))); + assert_eq!(upper.get(&0), Some(&Datum::string("0123456789abcdeg"))); + } + + #[test] + fn test_min_max_aggregator_truncates_long_binary_bounds() { + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "b", Type::Primitive(PrimitiveType::Binary)) + .with_id(0) + .into(), + ]) + .build() + .unwrap(), + ); + let mut agg = MinMaxColAggregator::new(schema); + let long: Vec = (0..32).collect(); + let stats = Statistics::ByteArray( + ValueStatistics::new( + Some(long.clone().into()), + Some(long.clone().into()), + None, + None, + false, + ) + .with_min_is_exact(false) + .with_max_is_exact(false), + ); + agg.update(0, stats).unwrap(); + let (lower, upper) = agg.produce(); + assert_eq!( + lower.get(&0), + Some(&Datum::binary((0..16).collect::>())) + ); + let mut expected_upper: Vec = (0..15).collect(); + expected_upper.push(16); + assert_eq!(upper.get(&0), Some(&Datum::binary(expected_upper))); + } + + #[test] + fn test_min_max_aggregator_drops_only_upper_when_unboundable() { + // String of all char::MAX: lower truncates fine, upper has no valid bound. + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, "s", Type::Primitive(PrimitiveType::String)) + .with_id(0) + .into(), + ]) + .build() + .unwrap(), + ); + let mut agg = MinMaxColAggregator::new(schema); + let s: String = std::iter::repeat_n(char::MAX, 20).collect(); + let stats = Statistics::ByteArray(ValueStatistics::new( + Some(s.as_bytes().into()), + Some(s.as_bytes().into()), + None, + None, + false, + )); + agg.update(0, stats).unwrap(); + let (lower, upper) = agg.produce(); + assert!(lower.contains_key(&0)); + // No valid 16-codepoint upper bound exists. + assert!(!upper.contains_key(&0)); + } } From 14f9faf2914c23f4b51844e864a84afabf43db77 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Sat, 30 May 2026 20:37:40 -0700 Subject: [PATCH 2/4] fix(parquet_writer): preserve Fixed(N) type and drop upper on unbounded chunk Round 1 review fixes for the manifest-bound truncation in MinMaxColAggregator: - truncate_lower_bound / truncate_upper_bound for Fixed(N) now keep the column's declared PrimitiveType::Fixed(N) instead of re-typing as Fixed() via Datum::fixed. Use Datum::new(ty, Binary(bytes)) so downstream code that introspects datum.data_type() continues to see the schema's declared length. - MinMaxColAggregator now tracks an upper_unbounded set. When truncate_upper_bound returns None for any row group's max, the column's partial upper bound is dropped and further updates are blocked. Without this, an earlier row group's small upper bound could be left in place while a later row group's true max strictly exceeds it, producing a manifest upper_bound < true_max and breaking scan-time pruning. - Doc-comment on update() corrected: Java's ParquetUtil#updateMin/updateMax does not consult isMinExact/isMaxExact; we mirror that. - Doc on truncate_string_max calls out the Java-equivalent UTF-16 surrogate jump from U+D7FF to U+E000 (relevant for #2486). - Note on truncate-then-compare equivalence with Java's compare-then-truncate added inline. - Tests: extracted single_primitive_field_schema helper; added test_min_max_aggregator_merges_truncated_strings_across_row_groups, test_min_max_aggregator_drops_upper_after_unbounded_row_group, test_truncate_lower_upper_bound_fixed_preserves_declared_type, test_min_max_aggregator_truncates_long_fixed_bounds. --- .../src/writer/file_writer/parquet_writer.rs | 259 ++++++++++++++---- 1 file changed, 203 insertions(+), 56 deletions(-) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 7ab06f7ecc..14cf876893 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -17,7 +17,7 @@ //! The module contains the file writer for parquet file format. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Arc; use arrow_schema::SchemaRef as ArrowSchemaRef; @@ -235,6 +235,12 @@ fn truncate_string_min(input: &str, length: usize) -> String { /// Returns a string of at most `length` Unicode code points that is `> input`, /// or `None` when no such bound exists (e.g. all `length` leading code points /// are `char::MAX`). Mirrors `org.apache.iceberg.util.UnicodeUtil#truncateStringMax`. +/// +/// Note: when the last codepoint of the truncated prefix is `U+D7FF`, this +/// implementation increments to `U+E000` (skipping the UTF-16 surrogate gap) +/// because Rust strings cannot hold surrogates. Java's `incrementCodePoint` +/// performs the same jump, so the produced bound matches Java for any input +/// that itself contains no surrogates — i.e. every valid `&str`. fn truncate_string_max(input: &str, length: usize) -> Option { let mut prefix: Vec = input.chars().take(length + 1).collect(); if prefix.len() <= length { @@ -284,6 +290,10 @@ fn truncate_binary_max(input: &[u8], length: usize) -> Option> { /// Apply Iceberg manifest truncation for STRING/BINARY/FIXED lower bounds. /// Other primitive types are returned unchanged. +/// +/// For `Fixed(N)`, the produced `Datum` keeps the column's declared type even +/// when the byte length shrinks, so the in-memory bound continues to satisfy +/// the schema (`Datum::fixed(bytes)` would re-type as `Fixed(bytes.len())`). fn truncate_lower_bound(ty: &PrimitiveType, datum: Datum) -> Datum { match (ty, datum.literal()) { (PrimitiveType::String, PrimitiveLiteral::String(s)) => { @@ -292,9 +302,10 @@ fn truncate_lower_bound(ty: &PrimitiveType, datum: Datum) -> Datum { (PrimitiveType::Binary, PrimitiveLiteral::Binary(bytes)) => { Datum::binary(truncate_binary_min(bytes, ICEBERG_BOUND_TRUNCATE_LENGTH)) } - (PrimitiveType::Fixed(_), PrimitiveLiteral::Binary(bytes)) => { - Datum::fixed(truncate_binary_min(bytes, ICEBERG_BOUND_TRUNCATE_LENGTH)) - } + (PrimitiveType::Fixed(_), PrimitiveLiteral::Binary(bytes)) => Datum::new( + ty.clone(), + PrimitiveLiteral::Binary(truncate_binary_min(bytes, ICEBERG_BOUND_TRUNCATE_LENGTH)), + ), _ => datum, } } @@ -302,6 +313,9 @@ fn truncate_lower_bound(ty: &PrimitiveType, datum: Datum) -> Datum { /// Apply Iceberg manifest truncation for STRING/BINARY/FIXED upper bounds. /// Returns `None` only for STRING/BINARY/FIXED inputs whose truncated prefix /// cannot be incremented (no valid upper bound at the truncate length). +/// +/// For `Fixed(N)`, the produced `Datum` keeps the column's declared type even +/// when the byte length shrinks; see [`truncate_lower_bound`] for rationale. fn truncate_upper_bound(ty: &PrimitiveType, datum: Datum) -> Option { match (ty, datum.literal()) { (PrimitiveType::String, PrimitiveLiteral::String(s)) => { @@ -311,7 +325,8 @@ fn truncate_upper_bound(ty: &PrimitiveType, datum: Datum) -> Option { truncate_binary_max(bytes, ICEBERG_BOUND_TRUNCATE_LENGTH).map(Datum::binary) } (PrimitiveType::Fixed(_), PrimitiveLiteral::Binary(bytes)) => { - truncate_binary_max(bytes, ICEBERG_BOUND_TRUNCATE_LENGTH).map(Datum::fixed) + truncate_binary_max(bytes, ICEBERG_BOUND_TRUNCATE_LENGTH) + .map(|truncated| Datum::new(ty.clone(), PrimitiveLiteral::Binary(truncated))) } _ => Some(datum), } @@ -321,6 +336,12 @@ fn truncate_upper_bound(ty: &PrimitiveType, datum: Datum) -> Option { struct MinMaxColAggregator { lower_bounds: HashMap, upper_bounds: HashMap, + /// Fields whose upper bound was unboundable (truncate-and-increment + /// returned `None`) for at least one row group. The contributing row + /// group's true max may exceed any partial upper bound aggregated from + /// earlier row groups, so we drop the column's upper bound entirely + /// rather than emit one that could be `< true_max`. + upper_unbounded: HashSet, schema: SchemaRef, } @@ -330,6 +351,7 @@ impl MinMaxColAggregator { Self { lower_bounds: HashMap::new(), upper_bounds: HashMap::new(), + upper_unbounded: HashSet::new(), schema, } } @@ -358,11 +380,21 @@ impl MinMaxColAggregator { /// Update statistics /// - /// Inexact (truncated) Parquet bounds are kept: a Parquet-prefix-truncated - /// min is still `<=` every value in the column, and a Parquet-truncated max - /// is still `>=` every value. Both are then re-truncated to the Iceberg - /// manifest bound length to match Java semantics - /// (`org.apache.iceberg.parquet.ParquetUtil#updateMin/updateMax`). + /// Java's `ParquetUtil#updateMin/updateMax` does not consult + /// `isMinExact`/`isMaxExact`; it always feeds the parquet-reported value + /// through `BinaryUtil`/`UnicodeUtil` truncation (gated only on + /// `hasNonNullValue`). We mirror that by using `min_bytes_opt`/ + /// `max_bytes_opt` to detect presence and re-truncating to the Iceberg + /// manifest bound length. A parquet-prefix-truncated min is still + /// `<=` every value, and a parquet-truncated max is still `>=` every + /// value, so the secondary Iceberg truncation is sound. + /// + /// Note on truncate-then-compare: Java truncates then stores; we truncate + /// then min/max-merge across row groups. Both produce identical results + /// because prefix truncation is order-preserving (`a <= b` implies + /// `truncate_min(a) <= truncate_min(b)` and `truncate_max(a) <= + /// truncate_max(b)`), so merging truncated values is equivalent to + /// truncating the merged value. fn update(&mut self, field_id: i32, value: Statistics) -> Result<()> { let Some(ty) = self .schema @@ -397,8 +429,20 @@ impl MinMaxColAggregator { format!("Statistics {value} is not match with field type {ty}."), )); }; - if let Some(truncated) = truncate_upper_bound(&ty, max_datum) { - self.update_state_max(field_id, truncated); + match truncate_upper_bound(&ty, max_datum) { + Some(truncated) if !self.upper_unbounded.contains(&field_id) => { + self.update_state_max(field_id, truncated); + } + Some(_) => { + // Field already marked unbounded; ignore further upper updates. + } + None => { + // No representable upper bound for this row group's max; + // drop the column's aggregated upper bound entirely so we + // never emit a bound that is `< true_max`. + self.upper_bounds.remove(&field_id); + self.upper_unbounded.insert(field_id); + } } } @@ -2443,6 +2487,20 @@ mod tests { assert_eq!(upper_bounds, HashMap::from([(0, Datum::int(i32::MAX))])); } + fn single_primitive_field_schema(name: &str, ty: PrimitiveType) -> SchemaRef { + Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(0, name, Type::Primitive(ty)) + .with_id(0) + .into(), + ]) + .build() + .unwrap(), + ) + } + #[test] fn test_truncate_string_min_short_input_unchanged() { // ≤ length code points: returned as-is. @@ -2570,17 +2628,7 @@ mod tests { fn test_min_max_aggregator_keeps_inexact_string_stats() { // Regression: previously, inexact (Parquet-truncated) stats were // dropped, leaving long-string columns with no manifest bounds. - let schema = Arc::new( - Schema::builder() - .with_schema_id(1) - .with_fields(vec![ - NestedField::required(0, "s", Type::Primitive(PrimitiveType::String)) - .with_id(0) - .into(), - ]) - .build() - .unwrap(), - ); + let schema = single_primitive_field_schema("s", PrimitiveType::String); let mut agg = MinMaxColAggregator::new(schema); let stats = Statistics::ByteArray( ValueStatistics::new( @@ -2605,17 +2653,7 @@ mod tests { #[test] fn test_min_max_aggregator_truncates_long_string_bounds() { - let schema = Arc::new( - Schema::builder() - .with_schema_id(1) - .with_fields(vec![ - NestedField::required(0, "s", Type::Primitive(PrimitiveType::String)) - .with_id(0) - .into(), - ]) - .build() - .unwrap(), - ); + let schema = single_primitive_field_schema("s", PrimitiveType::String); let mut agg = MinMaxColAggregator::new(schema); // Exact stats but values > 16 codepoints. let stats = Statistics::ByteArray(ValueStatistics::new( @@ -2633,17 +2671,7 @@ mod tests { #[test] fn test_min_max_aggregator_truncates_long_binary_bounds() { - let schema = Arc::new( - Schema::builder() - .with_schema_id(1) - .with_fields(vec![ - NestedField::required(0, "b", Type::Primitive(PrimitiveType::Binary)) - .with_id(0) - .into(), - ]) - .build() - .unwrap(), - ); + let schema = single_primitive_field_schema("b", PrimitiveType::Binary); let mut agg = MinMaxColAggregator::new(schema); let long: Vec = (0..32).collect(); let stats = Statistics::ByteArray( @@ -2671,17 +2699,7 @@ mod tests { #[test] fn test_min_max_aggregator_drops_only_upper_when_unboundable() { // String of all char::MAX: lower truncates fine, upper has no valid bound. - let schema = Arc::new( - Schema::builder() - .with_schema_id(1) - .with_fields(vec![ - NestedField::required(0, "s", Type::Primitive(PrimitiveType::String)) - .with_id(0) - .into(), - ]) - .build() - .unwrap(), - ); + let schema = single_primitive_field_schema("s", PrimitiveType::String); let mut agg = MinMaxColAggregator::new(schema); let s: String = std::iter::repeat_n(char::MAX, 20).collect(); let stats = Statistics::ByteArray(ValueStatistics::new( @@ -2697,4 +2715,133 @@ mod tests { // No valid 16-codepoint upper bound exists. assert!(!upper.contains_key(&0)); } + + #[test] + fn test_min_max_aggregator_merges_truncated_strings_across_row_groups() { + // Two row groups produce different truncation outputs; aggregator must + // pick the lex-min lower and lex-max upper across both. + let schema = single_primitive_field_schema("s", PrimitiveType::String); + let mut agg = MinMaxColAggregator::new(schema); + let stats_a = Statistics::ByteArray(ValueStatistics::new( + Some("bbbbbbbbbbbbbbbbXX".as_bytes().into()), + Some("yyyyyyyyyyyyyyyyXX".as_bytes().into()), + None, + None, + false, + )); + let stats_b = Statistics::ByteArray(ValueStatistics::new( + Some("aaaaaaaaaaaaaaaaXX".as_bytes().into()), + Some("zzzzzzzzzzzzzzzzXX".as_bytes().into()), + None, + None, + false, + )); + agg.update(0, stats_a).unwrap(); + agg.update(0, stats_b).unwrap(); + let (lower, upper) = agg.produce(); + assert_eq!(lower.get(&0), Some(&Datum::string("aaaaaaaaaaaaaaaa"))); + assert_eq!(upper.get(&0), Some(&Datum::string("zzzzzzzzzzzzzzz{"))); + } + + #[test] + fn test_min_max_aggregator_drops_upper_after_unbounded_row_group() { + // First row group produces a normal upper; second row group's max is + // all char::MAX and cannot produce a 16-codepoint upper, which means + // the file's true max strictly exceeds anything we could safely emit. + // The aggregator must drop the column's upper bound entirely. + let schema = single_primitive_field_schema("s", PrimitiveType::String); + let mut agg = MinMaxColAggregator::new(schema); + + let stats_a = Statistics::ByteArray(ValueStatistics::new( + Some("aaaaaaaaaaaaaaaaXX".as_bytes().into()), + Some("yyyyyyyyyyyyyyyyXX".as_bytes().into()), + None, + None, + false, + )); + let max_string: String = std::iter::repeat_n(char::MAX, 20).collect(); + let stats_b = Statistics::ByteArray(ValueStatistics::new( + Some("aaaaaaaaaaaaaaaaXX".as_bytes().into()), + Some(max_string.as_bytes().into()), + None, + None, + false, + )); + agg.update(0, stats_a).unwrap(); + agg.update(0, stats_b).unwrap(); + let (lower, upper) = agg.produce(); + assert_eq!(lower.get(&0), Some(&Datum::string("aaaaaaaaaaaaaaaa"))); + assert!(!upper.contains_key(&0)); + + // Subsequent row groups with representable maxes must not re-add + // the upper bound after it has been declared unbounded. + let stats_c = Statistics::ByteArray(ValueStatistics::new( + Some("aaaaaaaaaaaaaaaaXX".as_bytes().into()), + Some("ccccccccccccccccXX".as_bytes().into()), + None, + None, + false, + )); + let mut agg = + MinMaxColAggregator::new(single_primitive_field_schema("s", PrimitiveType::String)); + let max_first = Statistics::ByteArray(ValueStatistics::new( + Some("aaaaaaaaaaaaaaaaXX".as_bytes().into()), + Some(max_string.as_bytes().into()), + None, + None, + false, + )); + agg.update(0, max_first).unwrap(); + agg.update(0, stats_c).unwrap(); + let (_, upper) = agg.produce(); + assert!(!upper.contains_key(&0)); + } + + #[test] + fn test_truncate_lower_upper_bound_fixed_preserves_declared_type() { + // Truncating a Fixed(20) value to 16 bytes must keep PrimitiveType::Fixed(20), + // not re-type as Fixed(16). + let ty = PrimitiveType::Fixed(20); + let input: Vec = (0..20).collect(); + let lower = truncate_lower_bound( + &ty, + Datum::new(ty.clone(), PrimitiveLiteral::Binary(input.clone())), + ); + assert_eq!(lower.data_type(), &ty); + + let upper = + truncate_upper_bound(&ty, Datum::new(ty.clone(), PrimitiveLiteral::Binary(input))) + .expect("upper must be representable"); + assert_eq!(upper.data_type(), &ty); + } + + #[test] + fn test_min_max_aggregator_truncates_long_fixed_bounds() { + let ty = PrimitiveType::Fixed(20); + let schema = single_primitive_field_schema("f", ty.clone()); + let mut agg = MinMaxColAggregator::new(schema); + let long: Vec = (0..20).collect(); + let stats = Statistics::FixedLenByteArray(ValueStatistics::new( + Some(long.clone().into()), + Some(long.clone().into()), + None, + None, + false, + )); + agg.update(0, stats).unwrap(); + let (lower, upper) = agg.produce(); + + let expected_lower = Datum::new( + ty.clone(), + PrimitiveLiteral::Binary((0..16).collect::>()), + ); + let mut expected_upper_bytes: Vec = (0..15).collect(); + expected_upper_bytes.push(16); + let expected_upper = Datum::new(ty.clone(), PrimitiveLiteral::Binary(expected_upper_bytes)); + + assert_eq!(lower.get(&0), Some(&expected_lower)); + assert_eq!(upper.get(&0), Some(&expected_upper)); + assert_eq!(lower.get(&0).unwrap().data_type(), &ty); + assert_eq!(upper.get(&0).unwrap().data_type(), &ty); + } } From 3658da93362888e66d6c1bd60fc3e192e3ada262 Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Sat, 30 May 2026 22:49:57 -0700 Subject: [PATCH 3/4] test(parquet_writer): lock roundtrip contract for truncated Fixed(N) datum MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit After truncation, a Fixed(N) Datum carries fewer than N bytes in its literal even though the declared type says N. Add a regression test that exercises the two paths downstream consumers actually use: - `Datum::to_bytes()` — manifest single-value serialization writes the literal bytes verbatim regardless of declared Fixed length. - `PartialOrd` — wildcards on Fixed length and compares lex on raw bytes, so two truncated Fixed datums (16 bytes typed Fixed(20)) order correctly relative to each other. This locks the contract that future changes to Datum / PrimitiveLiteral serialization must preserve. --- .../src/writer/file_writer/parquet_writer.rs | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 14cf876893..5b88caeef5 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -2844,4 +2844,28 @@ mod tests { assert_eq!(lower.get(&0).unwrap().data_type(), &ty); assert_eq!(upper.get(&0).unwrap().data_type(), &ty); } + + #[test] + fn test_truncated_fixed_datum_roundtrip() { + use std::cmp::Ordering; + // After truncation, a Fixed(N) Datum holds N-typed metadata but its + // literal carries fewer than N bytes. Verify the two paths that + // downstream consumers exercise — `to_bytes()` and `partial_cmp` + // (used by manifest serialization and the aggregator's update_state_*) + // — tolerate that mismatch and produce the truncated bytes verbatim. + let ty = PrimitiveType::Fixed(20); + let truncated = Datum::new(ty.clone(), PrimitiveLiteral::Binary((0..16).collect())); + let other_truncated = Datum::new( + ty.clone(), + PrimitiveLiteral::Binary((0..15).chain([16]).collect()), + ); + + // to_bytes serializes the literal bytes regardless of declared Fixed length. + let encoded = truncated.to_bytes().expect("to_bytes succeeds"); + assert_eq!(encoded.as_slice(), &(0u8..16).collect::>()[..]); + + // partial_cmp wildcards on Fixed length and compares lex on raw bytes. + assert!(truncated < other_truncated); + assert_eq!(truncated.partial_cmp(&truncated), Some(Ordering::Equal)); + } } From 6702559d967a87d00bad08ba25ada176caa056fb Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Sun, 31 May 2026 10:31:38 -0700 Subject: [PATCH 4/4] chore(ci): retrigger CI (transient infra flake on check (macos-latest))