From fa4b19a928656a15351ac8b629945f28e2402c73 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 3 Jun 2026 17:09:58 -0500 Subject: [PATCH 1/3] fix(parquet): bound page byte size for large variable-width values - closes https://github.com/apache/arrow-rs/issues/10061 The column writer only checks the data/dictionary page byte limit *after* each `write_batch_size` mini-batch, so a batch of large variable-width values piles into a single oversized page before the check fires (we've observed multi-GiB data pages and large dictionary-page overshoot at default settings). Make the mini-batch size byte-budget aware in the generic column writer: - `ColumnValueEncoder::count_values_within_byte_budget{,_gather}` (default `None` = "no estimate, stay batched"), with a concrete impl on `ColumnValueEncoderImpl` driven by `plain_encoded_byte_size`. Fixed-width physical types answer in one division; only variable-width BYTE_ARRAY/FLBA walk values, stopping at the first that overruns. - `LevelDataRef::value_count` converts a chunk's level span into a leaf value count (O(1) for flat columns, def-level scan when nullable/nested). - `ByteBudgetChunker` picks the largest sub-batch that fits one page budget. The common case (small or fixed-width values) returns the whole chunk with no value inspection, so the hot path is unchanged. During dictionary encoding it sizes against the dictionary page's remaining budget instead, since the data page then holds only small RLE indices. - `write_batch_internal` consults the chunker per chunk and, only when a chunk would overflow, routes through `write_granular_chunk`, which sub-batches so the post-write page check fires in time. Repeated/nested columns step on record (rep == 0) boundaries so a record never spans pages. Includes the `ColumnWriterImpl`-level regression tests (data page, list, nullable, FLBA, dictionary spill, dictionary page bound). Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/column/writer/byte_budget_chunker.rs | 204 +++++++ parquet/src/column/writer/encoder.rs | 141 +++++ parquet/src/column/writer/mod.rs | 531 +++++++++++++++++- 3 files changed, 870 insertions(+), 6 deletions(-) create mode 100644 parquet/src/column/writer/byte_budget_chunker.rs diff --git a/parquet/src/column/writer/byte_budget_chunker.rs b/parquet/src/column/writer/byte_budget_chunker.rs new file mode 100644 index 000000000000..56c7b4c6f86d --- /dev/null +++ b/parquet/src/column/writer/byte_budget_chunker.rs @@ -0,0 +1,204 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! See [`ByteBudgetChunker`] for byte-budget-aware mini-batch sizing. + +use crate::basic::Type; +use crate::column::writer::LevelDataRef; +use crate::column::writer::encoder::ColumnValueEncoder; +use crate::file::properties::WriterProperties; +use crate::schema::types::ColumnDescriptor; + +/// Picks byte-budget-aware mini-batch sizes for one column. +/// +/// The parquet column writer checks the data page byte limit only *after* +/// each mini-batch finishes writing. Mini-batches are sized in rows +/// (`write_batch_size`, default 1024), so for BYTE_ARRAY columns whose +/// values are large (e.g. multi-MiB blobs) a single mini-batch can buffer +/// GiB into one page before the limit is consulted. +/// +/// This isolates the per-chunk decision that prevents that: given a chunk's +/// level data and the input values, pick the largest `sub_batch_size` such +/// that one mini-batch fits in one page byte budget. For the overwhelmingly +/// common case (small or fixed-width values) the answer is just `chunk_size` +/// and the decision is O(1) on the column type — only when the input might +/// overflow does the chunker consult the encoder's byte estimate. +pub(crate) struct ByteBudgetChunker { + /// Configured data page byte limit for the column. + page_byte_limit: usize, + /// Max definition level of the column; a level equal to this marks a + /// present (non-null) leaf value. Used to count values per chunk. + max_def_level: i16, + /// `true` when no chunk of `base_batch_size` values can ever overflow + /// `page_byte_limit` regardless of input. Set once at column open from + /// the physical type's known per-value byte size; lets the per-chunk + /// decision short-circuit with no work for every numeric, bool, or + /// narrow `FIXED_LEN_BYTE_ARRAY` column. + static_always_fits: bool, + /// Configured dictionary page byte limit for the column. + dict_page_byte_limit: usize, + /// As [`Self::static_always_fits`] but for the dictionary page: `true` + /// when one `base_batch_size` mini-batch of this fixed-width type cannot + /// overshoot `dict_page_byte_limit` by more than one mini-batch's worth. + static_dict_always_fits: bool, +} + +impl ByteBudgetChunker { + #[inline] + pub(crate) fn new( + descr: &ColumnDescriptor, + props: &WriterProperties, + base_batch_size: usize, + ) -> Self { + let page_byte_limit = props.column_data_page_size_limit(descr.path()); + let dict_page_byte_limit = props.column_dictionary_page_size_limit(descr.path()); + let static_bytes_per_value = match descr.physical_type() { + Type::BOOLEAN => Some(1), + Type::INT32 | Type::FLOAT => Some(std::mem::size_of::()), + Type::INT64 | Type::DOUBLE => Some(std::mem::size_of::()), + Type::INT96 => Some(12), + Type::FIXED_LEN_BYTE_ARRAY => Some(descr.type_length().max(0) as usize), + Type::BYTE_ARRAY => None, + }; + let static_fits = |limit: usize| { + static_bytes_per_value + .map(|b| b.saturating_mul(base_batch_size) <= limit) + .unwrap_or(false) + }; + Self { + page_byte_limit, + max_def_level: descr.max_def_level(), + static_always_fits: static_fits(page_byte_limit), + dict_page_byte_limit, + static_dict_always_fits: static_fits(dict_page_byte_limit), + } + } + + /// Decide how many levels at the start of a chunk belong in one + /// mini-batch, so the mini-batch cannot overflow whichever page is + /// currently accumulating value bytes: the data page when plain-encoding, + /// or the *dictionary* page while dictionary-encoding. A returned value + /// smaller than `chunk_size` triggers granular sub-batching in + /// `write_batch_internal`. + /// + /// While dictionary-encoding, the data page holds only small RLE indices, + /// but the dictionary page accumulates the distinct values themselves — + /// so it is the dictionary page's remaining budget that must bound the + /// mini-batch. The per-mini-batch dictionary spill check would otherwise + /// let one mini-batch of large values balloon the dictionary page. + /// + /// Returns `chunk_size` immediately (no value inspection) when the chunk + /// is empty, or when the column is a fixed-width type whose mini-batches + /// statically cannot overshoot the relevant page. + /// + /// `#[inline]`: this is a tiny per-chunk dispatcher; the actual byte + /// inspection lives in the out-of-line `byte_budget_sub_batch_size`. + #[inline] + pub(crate) fn pick_sub_batch_size( + &self, + encoder: &E, + values: &E::Values, + value_indices: Option<&[usize]>, + chunk_def: LevelDataRef<'_>, + values_offset: usize, + chunk_size: usize, + ) -> usize { + if chunk_size == 0 { + return chunk_size; + } + let budget = if encoder.has_dictionary() { + if self.static_dict_always_fits { + return chunk_size; + } + // Bound the mini-batch by the dictionary page's *remaining* + // budget (it accumulates across mini-batches until it spills). + match encoder.estimated_dict_page_size() { + Some(used) => self.dict_page_byte_limit.saturating_sub(used), + None => return chunk_size, + } + } else { + if self.static_always_fits { + return chunk_size; + } + self.page_byte_limit + }; + self.byte_budget_sub_batch_size::( + values, + value_indices, + chunk_def, + values_offset, + chunk_size, + budget, + ) + } + + /// Inspect value sizes to decide how many of the chunk's values fit in + /// `budget` bytes (the data page or dictionary page remaining budget). + /// + /// `#[inline(never)]` keeps this slow path out of the hot + /// `write_batch_internal` loop; numeric and bool columns never reach it. + #[inline(never)] + fn byte_budget_sub_batch_size( + &self, + values: &E::Values, + value_indices: Option<&[usize]>, + chunk_def: LevelDataRef<'_>, + values_offset: usize, + chunk_size: usize, + budget: usize, + ) -> usize { + // How many of this chunk's levels carry an actual value. For a + // non-nullable, unrepeated column every level is a value, so + // `value_count` is O(1) (`Absent`/`Uniform` def levels); only + // nullable or nested columns pay the O(chunk_size) def-level scan. + let vals_in_chunk = chunk_def.value_count(chunk_size, self.max_def_level); + if vals_in_chunk == 0 { + return chunk_size; + } + // Ask the encoder how many of the next values fit in one page byte + // budget. Dispatch on whether the caller supplied gather indices; + // this mirrors how `write_mini_batch` picks `write_gather` vs + // `write`. + let fit = match value_indices { + Some(idx) => { + let end = (values_offset + vals_in_chunk).min(idx.len()); + let start = values_offset.min(end); + E::count_values_within_byte_budget_gather(values, &idx[start..end], budget) + } + None => { + E::count_values_within_byte_budget(values, values_offset, vals_in_chunk, budget) + } + }; + match fit { + None => chunk_size, + Some(values_per_subbatch) => { + // Convert the value count back into a level count. For a + // non-nullable column this is a no-op; for nullable/nested + // columns scale by the chunk's observed value-to-level + // ratio. + let levels_per_subbatch = if vals_in_chunk == chunk_size { + values_per_subbatch + } else { + (values_per_subbatch * chunk_size) + .div_ceil(vals_in_chunk) + .max(1) + }; + chunk_size.min(levels_per_subbatch.max(1)) + } + } + } +} diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 2ea3376ae708..d9adacff4101 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -90,6 +90,42 @@ pub trait ColumnValueEncoder { /// Write the values at the indexes in `indices` to this [`ColumnValueEncoder`] fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()>; + /// Returns the largest `k` such that the first `k` values in + /// `values[offset..offset + len]` encode to at most `byte_budget` + /// bytes — i.e. how many values fit in a single page byte budget. + /// + /// Returns `len` if every value fits. Returns at least 1 if a single + /// value alone exceeds the budget, matching parquet's "at least one + /// value per data page" rule. + /// + /// `None` means "no cheap estimate available"; the caller stays on + /// the batched fast path and lets the post-write + /// `should_add_data_page` check handle bounding. + /// + /// Implementations should short-circuit aggressively: the typical + /// case is "everything fits, return `len`", and the next-most-common + /// case is "one wide value, return 1." The variable-width walk only + /// needs to be precise when the chunk is genuinely near the budget. + fn count_values_within_byte_budget( + _values: &Self::Values, + _offset: usize, + _len: usize, + _byte_budget: usize, + ) -> Option { + None + } + + /// As [`Self::count_values_within_byte_budget`] but using gather + /// `indices` rather than a contiguous range. Returns the number of + /// `indices` that fit, not the maximum index value. + fn count_values_within_byte_budget_gather( + _values: &Self::Values, + _indices: &[usize], + _byte_budget: usize, + ) -> Option { + None + } + /// Returns the number of buffered values fn num_values(&self) -> usize; @@ -247,6 +283,39 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { self.write_slice(&slice) } + fn count_values_within_byte_budget( + values: &[T::T], + offset: usize, + len: usize, + byte_budget: usize, + ) -> Option { + // Clamp so that a caller-supplied `len` that overruns the input + // (e.g. a level/value mismatch the encoder will reject later) + // returns an estimate instead of panicking here. + let end = (offset + len).min(values.len()); + let start = offset.min(end); + count_within_budget::( + end - start, + byte_budget, + values[start..end].iter().map(Some), + ) + } + + fn count_values_within_byte_budget_gather( + values: &[T::T], + indices: &[usize], + byte_budget: usize, + ) -> Option { + // `values.get` yields `None` for an out-of-range index (defensive + // against a level/value mismatch the encoder rejects later); such a + // position is counted but contributes no bytes. + count_within_budget::( + indices.len(), + byte_budget, + indices.iter().map(|&i| values.get(i)), + ) + } + fn num_values(&self) -> usize { self.num_values } @@ -411,3 +480,75 @@ where } } } + +/// Plain-encoded byte cost of a single value of type `T::T`. +/// +/// Derived from [`ParquetValueType::dict_encoding_size`] (which returns +/// `(per-value overhead, value-bytes)`) so we don't add a parallel +/// per-value-size hook to the trait. Mirrors the dispatch in +/// `KeyStorage::push` (`encodings/encoding/dict_encoder.rs`). +/// +/// Placed at the end of the module deliberately. Inserting it above the +/// `ColumnValueEncoder` trait shifts the trait and `ColumnValueEncoderImpl` +/// within the compiled module enough to perturb downstream code placement, +/// which measurably regresses unrelated arrow-writer string benchmarks +/// (~5-9% on `string` / `string_and_binary_view`). Defining it last keeps +/// the hot encoder code at the offsets it has on `main`. +#[inline] +fn plain_encoded_byte_size(value: &T::T) -> usize { + let (overhead, bytes) = value.dict_encoding_size(); + match ::PHYSICAL_TYPE { + // Plain BYTE_ARRAY = 4-byte length prefix + payload. + Type::BYTE_ARRAY => overhead + bytes, + // Plain FLBA = raw bytes only; `dict_encoding_size`'s length prefix + // is irrelevant here, so the encoder passes `type_length` directly. + Type::FIXED_LEN_BYTE_ARRAY => bytes, + // Numeric/bool are short-circuited by the caller via + // `mem::size_of`, so this is unreachable in practice; fall back to + // `overhead` defensively. + _ => overhead, + } +} + +/// How many leading values fit in `byte_budget` bytes, shared by the two +/// `ColumnValueEncoder::count_values_within_byte_budget*` methods (one walks a +/// contiguous slice, the other gathers by index). +/// +/// `n` is the answer when everything fits; `vals` yields each candidate value, +/// or `None` for a position that should still be counted but contributes no +/// bytes (an out-of-range gather index). The boundary value that crosses the +/// budget is included in the count so the caller's page-flush check trips on +/// this mini-batch rather than leaving a sliver for the next page; this also +/// catches a lone outlier wherever it lands among small values. +/// +/// Defined at the end of the module alongside `plain_encoded_byte_size` for +/// the same reason — see that function's note on code placement and the +/// `string` / `string_and_binary_view` benchmarks. +#[inline] +fn count_within_budget<'a, T: DataType>( + n: usize, + byte_budget: usize, + vals: impl Iterator>, +) -> Option +where + T::T: 'a, +{ + // Fixed-size physical types have a constant per-value byte cost, so the + // answer is one division — no walk needed. + let phys = ::PHYSICAL_TYPE; + if phys != Type::BYTE_ARRAY && phys != Type::FIXED_LEN_BYTE_ARRAY { + let per = std::mem::size_of::().max(1); + return Some((byte_budget / per).max(1).min(n)); + } + // Variable-width: accumulate, exit at the first value past the budget. + let mut cum: usize = 0; + for (i, v) in vals.enumerate() { + if let Some(v) = v { + cum = cum.saturating_add(plain_encoded_byte_size::(v)); + } + if cum > byte_budget { + return Some(i + 1); + } + } + Some(n) +} diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 4e53230bbf89..090350f53aae 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -49,8 +49,11 @@ use crate::file::properties::{ use crate::file::statistics::{Statistics, ValueStatistics}; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; +mod byte_budget_chunker; pub(crate) mod encoder; +use byte_budget_chunker::ByteBudgetChunker; + macro_rules! downcast_writer { ($e:expr, $i:ident, $b:expr) => { match $e { @@ -374,6 +377,24 @@ impl<'a> LevelDataRef<'a> { Self::Uniform { value, .. } => Self::Uniform { value, count: len }, } } + + /// Count of positions in this slice that represent an actual value + /// (definition level equal to `max_def`). `Absent` means the column has + /// `max_def == 0` and every position is a value, so the implicit count + /// is the caller-supplied `total`. + pub(crate) fn value_count(self, total: usize, max_def: i16) -> usize { + match self { + Self::Absent => total, + Self::Materialized(values) => values.iter().filter(|&&d| d == max_def).count(), + Self::Uniform { value, count } => { + if value == max_def { + count + } else { + 0 + } + } + } + } } /// Typed column writer for a primitive column. @@ -545,6 +566,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } else { self.props.write_batch_size() }; + let chunker = ByteBudgetChunker::new(&self.descr, &self.props, base_batch_size); while levels_offset < num_levels { let mut end_offset = num_levels.min(levels_offset + base_batch_size); @@ -555,14 +577,45 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } } - values_offset += self.write_mini_batch( + let chunk_size = end_offset - levels_offset; + let chunk_def = def_levels.slice(levels_offset, chunk_size); + let chunk_rep = rep_levels.slice(levels_offset, chunk_size); + + // Key decision point: can we write this whole chunk as one + // mini-batch (the common case — small or fixed-width values, no + // further page-size accounting needed), or must we fall back to + // byte-budget-aware sub-batching to keep a page from overshooting + // `data_page_size_limit`? `pick_sub_batch_size` returns + // `chunk_size` for the former. + let sub_batch_size = chunker.pick_sub_batch_size( + &self.encoder, values, - values_offset, value_indices, - end_offset - levels_offset, - def_levels.slice(levels_offset, end_offset - levels_offset), - rep_levels.slice(levels_offset, end_offset - levels_offset), - )?; + chunk_def, + values_offset, + chunk_size, + ); + + if sub_batch_size >= chunk_size { + values_offset += self.write_mini_batch( + values, + values_offset, + value_indices, + chunk_size, + chunk_def, + chunk_rep, + )?; + } else { + values_offset += self.write_granular_chunk( + values, + values_offset, + value_indices, + chunk_size, + chunk_def, + chunk_rep, + sub_batch_size, + )?; + } levels_offset = end_offset; } @@ -713,6 +766,69 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { }) } + /// Writes a chunk in `sub_batch_size`-level sub-batches, checking the + /// data page byte limit after each. This keeps the page size close to + /// `data_page_size_limit` instead of overshooting it by a whole chunk. + /// + /// For repeated/nested columns sub-batches step from one `rep == 0` + /// boundary to the next so a record never spans data pages, matching + /// the parquet format rule. + /// + /// Returns the total number of values consumed across all sub-batches. + /// + /// `#[inline(never)]` keeps this slow path — only reached for + /// variable-width columns whose values need page splitting — out of + /// the hot `write_batch_internal` loop. + #[allow(clippy::too_many_arguments)] + #[inline(never)] + fn write_granular_chunk( + &mut self, + values: &E::Values, + values_offset: usize, + value_indices: Option<&[usize]>, + chunk_size: usize, + chunk_def: LevelDataRef<'_>, + chunk_rep: LevelDataRef<'_>, + sub_batch_size: usize, + ) -> Result { + // The chunker always sizes a sub-batch to at least one level, so each + // iteration below makes progress (`sub_end > sub_start`). + debug_assert!(sub_batch_size >= 1, "chunker must size at least one level"); + let mut values_consumed = 0; + let mut sub_start = 0; + while sub_start < chunk_size { + let sub_end = match chunk_rep { + LevelDataRef::Materialized(levels) => { + // Pack up to `sub_batch_size` levels per mini-batch, then + // extend to the next record boundary (rep == 0) so a + // record never spans data pages. Packing whole records + // rather than stepping one record at a time avoids + // calling `write_mini_batch` per record: records average + // only a handful of levels, so a record-at-a-time step + // would issue many more mini-batches than necessary. + let mut e = (sub_start + sub_batch_size).min(chunk_size); + while e < chunk_size && levels[e] != 0 { + e += 1; + } + e + } + _ => (sub_start + sub_batch_size).min(chunk_size), + }; + let sub_len = sub_end - sub_start; + let written = self.write_mini_batch( + values, + values_offset + values_consumed, + value_indices, + sub_len, + chunk_def.slice(sub_start, sub_len), + chunk_rep.slice(sub_start, sub_len), + )?; + values_consumed += written; + sub_start = sub_end; + } + Ok(values_consumed) + } + /// Creates a new streaming level encoder appropriate for the writer version. fn create_level_encoder(max_level: i16, props: &WriterProperties) -> LevelEncoder { match props.writer_version() { @@ -2676,6 +2792,310 @@ mod tests { assert_eq!(other_values, vec![10]); } + #[test] + fn test_column_writer_caps_page_size_for_large_byte_array_values() { + // Regression: the post-write data page byte limit check only fires + // at mini-batch boundaries, so a 1024-row mini-batch of multi-MiB + // BYTE_ARRAY values used to buffer multiple GiB into a single page + // before the limit was even consulted. With the threshold-based + // granular mode this batch should split into ~one page per value. + let value_size = 64 * 1024; // 64 KiB per value + let page_byte_limit = 16 * 1024; // 16 KiB page limit + let num_rows = 64; + + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + // Default write_batch_size (1024) — without the fix this + // buffers the entire input into a single ~4 MiB page. + .build(); + + let data: Vec<_> = (0..num_rows) + .map(|i| ByteArray::from(vec![i as u8; value_size])) + .collect(); + let pages = write_and_collect_pages::(props, 0, 0, &data, None, None); + + // Every value must end up somewhere. + let total_values: u32 = pages.data_pages.iter().map(|(_, n)| n).sum(); + assert_eq!(total_values as usize, num_rows); + // Without the fix this assertion fired with one ~4 MiB page; the + // threshold splits the input so that no page holds more than a + // single oversized value's worth of bytes. + assert!( + pages.data_pages.len() >= num_rows / 2, + "expected pages to be cut close to one per value, got {:?}", + pages.data_pages, + ); + // Each page must be bounded by roughly one value's worth of bytes; + // parquet allows a single oversized value to occupy a page by + // itself but never lets us pile many of them together. + for (size, _) in &pages.data_pages { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound ({}B) — pages {:?}", + value_size + 64, + pages.data_pages, + ); + } + } + + #[test] + fn test_column_writer_caps_page_size_for_large_values_in_list() { + // Coverage for the Materialized-rep branch of + // `write_granular_chunk`. The flat-column regression test + // exercises the per-level step; this exercises the + // record-by-record step used when rep levels are present. + // + // Column is `list` (max_def = 1, max_rep = 1) + // with 3 records of 3 large blobs each. The page byte limit is + // smaller than a single blob, so granular mode kicks in, and the + // Materialized-rep arm of `write_granular_chunk` steps from one + // `rep == 0` boundary to the next so a record never spans pages. + let value_size = 32 * 1024; + let page_byte_limit = 16 * 1024; + let values_per_record = 3; + let num_records = 3; + let num_values = values_per_record * num_records; + + // rep levels: 0, 1, 1, 0, 1, 1, 0, 1, 1 + let mut rep_levels = Vec::with_capacity(num_values); + for _ in 0..num_records { + rep_levels.push(0i16); + rep_levels.extend(std::iter::repeat_n(1i16, values_per_record - 1)); + } + let def_levels = vec![1i16; num_values]; + + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + .build(); + + let data: Vec<_> = (0..num_values) + .map(|i| ByteArray::from(vec![i as u8; value_size])) + .collect(); + let pages = write_and_collect_pages::( + props, + 1, + 1, + &data, + Some(&def_levels), + Some(&rep_levels), + ); + let data_pages = pages.data_pages; + + // The Materialized-rep arm groups levels by record, and each + // record's bytes blow the page byte limit on its own, so we get + // exactly one page per record. + assert_eq!( + data_pages.len(), + num_records, + "expected one data page per record, got {data_pages:?}" + ); + for (bytes, n_values) in &data_pages { + assert_eq!( + *n_values as usize, values_per_record, + "each page must hold a whole record's leaves, got {data_pages:?}" + ); + // Each page is one full record (its leaves cannot be split), + // so allow up to `values_per_record` blobs of payload plus a + // small fudge for level encoding overhead. + let upper_bound = values_per_record * (value_size + 16); + assert!( + *bytes <= upper_bound, + "page size {bytes} exceeds whole-record bound ({upper_bound}); pages {data_pages:?}" + ); + } + } + + #[test] + fn test_column_writer_caps_page_size_with_nullable_large_values() { + // Coverage for `LevelDataRef::value_count` on Materialized def + // levels: a nullable column with mixed nulls and large values. + // `value_count` must return the actual non-null count so the + // byte estimate reflects bytes that will actually be written, + // not the level count. + let value_size = 32 * 1024; + let page_byte_limit = 16 * 1024; + let num_levels = 32; + + // Alternating null / non-null: 16 nulls and 16 values. + let def_levels: Vec = (0..num_levels as i16).map(|i| i % 2).collect(); + let num_values = def_levels.iter().filter(|&&d| d == 1).count(); + + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + .build(); + + let data: Vec<_> = (0..num_values) + .map(|i| ByteArray::from(vec![i as u8; value_size])) + .collect(); + let pages = + write_and_collect_pages::(props, 1, 0, &data, Some(&def_levels), None); + let data_pages: Vec<_> = pages.data_pages.iter().map(|(size, _)| *size).collect(); + + // With 16 actual values of 32 KiB each and a 16 KiB page limit, + // every non-null value should get its own page (plus possibly + // adjacent nulls). At minimum, the number of pages must be + // roughly the value count, not 1 (which is what `main` produced). + assert!( + data_pages.len() >= num_values / 2, + "expected at least {} pages for {num_values} large values, got {} pages: {data_pages:?}", + num_values / 2, + data_pages.len(), + ); + // No page contains more than ~one value's worth of payload bytes. + for size in &data_pages { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound; pages {data_pages:?}" + ); + } + } + + #[test] + fn test_column_writer_dict_enabled_large_values_post_spill() { + // While dictionary encoding is active, `has_dictionary()` short- + // circuits `estimated_value_bytes` — the byte estimate is plain- + // encoded size but dict-encoded pages only store small RLE + // indices, so we'd otherwise shrink pages spuriously. Once the + // dictionary spills (each value is large + unique), plain + // encoding takes over and the byte-budget sub-batch kicks in. + // + // This test makes sure the writer survives that transition and + // produces bounded pages thereafter. + let value_size = 64 * 1024; + let page_byte_limit = 16 * 1024; + let num_rows = 32; + + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(true) + // Force a small dict so it spills quickly even though + // each value here is unique. + .set_dictionary_page_size_limit(1024) + .set_data_page_size_limit(page_byte_limit) + // Small mini-batches so dict fallback happens part-way + // through the input, leaving subsequent mini-batches to + // exercise the post-spill plain-encoding path that the + // page-size fix actually targets. + .set_write_batch_size(4) + .build(); + + let data: Vec<_> = (0..num_rows) + .map(|i| ByteArray::from(vec![i as u8; value_size])) + .collect(); + let pages = write_and_collect_pages::(props, 0, 0, &data, None, None); + let data_pages: Vec<_> = pages.data_pages.iter().map(|(size, _)| *size).collect(); + + // After spill, plain encoding writes one ~64 KiB value per page. + // Without the fix, post-spill writes still buffered all 32 + // values into a single ~2 MiB page. + assert!( + data_pages.len() >= num_rows / 2, + "expected >= {} data pages after dict spill, got {} ({data_pages:?})", + num_rows / 2, + data_pages.len(), + ); + for size in &data_pages { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound; pages {data_pages:?}" + ); + } + } + + #[test] + fn test_column_writer_caps_dictionary_page_size() { + // A column of large *distinct* values with dictionary encoding on: + // the dictionary page accumulates the values themselves, and its + // spill check runs only once per mini-batch. Without bounding the + // dictionary-encoding mini-batch, one `write_batch_size` mini-batch + // would intern `write_batch_size * value_size` bytes into the + // dictionary page before the check fires (~16 MiB here). The chunker + // must sub-batch the dictionary-encoding phase too. + let value_size = 8 * 1024; + let dict_page_limit = 64 * 1024; + let num_rows = 2048; + + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(true) + .set_dictionary_page_size_limit(dict_page_limit) + .build(); + + let data: Vec<_> = (0..num_rows) + .map(|i| { + // each value distinct, so the dictionary cannot dedup them + let mut v = vec![0u8; value_size]; + v[..8].copy_from_slice(&(i as u64).to_le_bytes()); + ByteArray::from(v) + }) + .collect(); + let pages = write_and_collect_pages::(props, 0, 0, &data, None, None); + let dict_page_size = pages.dict_page_size; + + assert!( + dict_page_size > 0, + "expected the column to dictionary-encode" + ); + // Bounded near the limit (~2x from the post-mini-batch check). Before + // the fix the dictionary page reached num_rows * value_size (~16 MiB, + // 256x the limit). + assert!( + dict_page_size <= 3 * dict_page_limit, + "dictionary page {dict_page_size} exceeds 3x the {dict_page_limit} limit", + ); + } + + #[test] + fn test_column_writer_caps_page_size_for_fixed_len_byte_array() { + // Coverage for `ParquetValueType::byte_size` override on + // `FixedLenByteArray`. With `type_length = 1`, each plain-encoded + // value is one byte, so a 4-byte page byte limit forces the + // sub-batch sizer to write ~4 values per page rather than one + // page for the whole batch. + let page_byte_limit = 4; + let num_values = 128; + + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + .build(); + + let data: Vec<_> = (0..num_values) + .map(|i| { + let mut fla = FixedLenByteArray::default(); + fla.set_data(Bytes::from(vec![i as u8])); + fla + }) + .collect(); + let pages = + write_and_collect_pages::(props, 0, 0, &data, None, None); + let data_pages: Vec<_> = pages.data_pages.iter().map(|(size, _)| *size).collect(); + + // Without the fix this is a single 128-byte page; with the fix + // the byte budget caps each page at ~`page_byte_limit` bytes. + assert!( + data_pages.len() >= num_values / 8, + "expected pages capped by byte budget, got {data_pages:?}" + ); + for size in &data_pages { + assert!( + *size <= page_byte_limit * 4, + "page size {size} larger than expected; pages {data_pages:?}" + ); + } + } + #[test] fn test_bool_statistics() { let stats = statistics_roundtrip::(&[true, false, false, true]); @@ -4309,6 +4729,69 @@ mod tests { get_typed_column_writer::(column_writer) } + /// Pages collected by [`write_and_collect_pages`]. + struct CollectedPages { + /// `(compressed byte size, value count)` for every data page, in order. + data_pages: Vec<(usize, u32)>, + /// Largest dictionary page seen, or 0 if the column wasn't dict-encoded. + dict_page_size: usize, + } + + /// Writes `data` (with optional def/rep levels) through a raw + /// `ColumnWriterImpl` configured by `props`, then re-reads the file and + /// returns its page layout. Shared by the page-size regression tests so + /// each only has to express its props, input, and assertions. + fn write_and_collect_pages( + props: WriterProperties, + max_def_level: i16, + max_rep_level: i16, + data: &[T::T], + def_levels: Option<&[i16]>, + rep_levels: Option<&[i16]>, + ) -> CollectedPages { + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let mut writer = + get_test_column_writer::(page_writer, max_def_level, max_rep_level, Arc::new(props)); + writer.write_batch(data, def_levels, rep_levels).unwrap(); + let r = writer.close().unwrap(); + drop(write); + + let read_props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &r.metadata, + r.rows_written as usize, + None, + Arc::new(read_props), + ) + .unwrap(), + ); + + let mut collected = CollectedPages { + data_pages: Vec::new(), + dict_page_size: 0, + }; + while let Some(page) = page_reader.get_next_page().unwrap() { + match page.page_type() { + PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => { + collected + .data_pages + .push((page.buffer().len(), page.num_values())); + } + PageType::DICTIONARY_PAGE => { + collected.dict_page_size = collected.dict_page_size.max(page.buffer().len()); + } + _ => {} + } + } + collected + } + /// Returns column reader. fn get_test_column_reader( page_reader: Box, @@ -4717,6 +5200,42 @@ mod tests { } } + #[test] + fn test_level_data_ref_value_count() { + // `value_count` is what the byte-budget chunker uses to convert a + // chunk's level span into a leaf-value count. It must work for any + // column shape — flat, nullable, or nested — because the leaf + // values array is decoupled from the rep/def level stream. + let max_def = 2; + // Non-nullable / unrepeated: no def levels materialized — every + // level is a value. + assert_eq!(LevelDataRef::Absent.value_count(64, max_def), 64); + // Uniform run of present values, and of nulls. + assert_eq!( + LevelDataRef::Uniform { + value: max_def, + count: 40 + } + .value_count(40, max_def), + 40 + ); + assert_eq!( + LevelDataRef::Uniform { + value: max_def - 1, + count: 40 + } + .value_count(40, max_def), + 0 + ); + // Materialized def levels (nullable / nested): only levels equal to + // `max_def` are values; empty-list / null levels are not. + let levels = [2i16, 0, 2, 1, 2, 2, 0]; + assert_eq!( + LevelDataRef::Materialized(&levels).value_count(levels.len(), max_def), + 4 + ); + } + #[test] fn test_uniform_def_levels_all_null() { // All-null column: def_level=0 (null) for every slot, no values written. From 0eee8782aa92aab9f9bb7603e1fe5288663b9590 Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 3 Jun 2026 17:10:13 -0500 Subject: [PATCH 2/3] fix(parquet): bound page size for arrow byte-array column writes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement `ColumnValueEncoder::count_values_within_byte_budget_gather` for `ByteArrayEncoder`, the encoder real `ArrowWriter` users hit, so the page-size bound from the previous commit also fires for arrow string/binary columns (the generic path only covered `ColumnValueEncoderImpl`). The impl stays off the hot path for small values via cheap O(1) upper bounds before any per-value walk: - Offset-backed arrays (`Utf8`/`LargeUtf8`/`Binary`/`LargeBinary`): the span `offsets[last+1] - offsets[first]` bounds the chunk payload in O(1); exact even for nullable columns (skipped positions add zero), so sparse `indices` skip the per-value walk too. - View arrays (`Utf8View`/`BinaryView`): lengths live in the low 32 bits of each view word, so an O(1) `n * (max_value_len + 4)` bound skips the scan in the common case; otherwise scan lengths with no data-buffer deref. - Dictionary input: treated as always-fitting — dict-encoded arrow input implies values small enough to dedup, the opposite of the blob case this targets, and a per-key walk measurably regressed the bench. Includes the arrow-writer unit tests for granular-mode round-trip and the all-null string column. Co-Authored-By: Claude Opus 4.8 (1M context) --- parquet/src/arrow/arrow_writer/byte_array.rs | 181 ++++++++++++++++++- parquet/src/arrow/arrow_writer/mod.rs | 118 ++++++++++++ 2 files changed, 298 insertions(+), 1 deletion(-) diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 9cb0718b4d84..145431c26465 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -30,10 +30,12 @@ use crate::geospatial::statistics::GeospatialStatistics; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; +use arrow_array::types::ByteArrayType; use arrow_array::{ Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray, - LargeBinaryArray, LargeStringArray, StringArray, StringViewArray, + GenericByteArray, LargeBinaryArray, LargeStringArray, StringArray, StringViewArray, }; +use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_schema::DataType; macro_rules! downcast_dict_impl { @@ -481,6 +483,84 @@ impl ColumnValueEncoder for ByteArrayEncoder { Ok(()) } + fn count_values_within_byte_budget_gather( + values: &Self::Values, + indices: &[usize], + byte_budget: usize, + ) -> Option { + // `ByteArrayEncoder` only ever writes via `write_gather`, so this + // is the relevant method. + // + // Two-stage walk for the simple offset-buffer byte array types: + // 1. If indices are contiguous, compute the total payload in + // O(1) via a single subtraction on the offsets buffer. + // When the total fits the budget — the overwhelmingly + // common "small values" case — return immediately. + // 2. Otherwise, walk per-value byte sizes from the offsets + // buffer (still cheap, no slice/UTF-8 construction) and + // exit at the first value that pushes the cumulative sum + // past the budget. This bounds skewed distributions: an + // outlier value is caught wherever it lands in the chunk. + let count = match values.data_type() { + DataType::Utf8 => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + DataType::LargeUtf8 => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + DataType::Binary => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + DataType::LargeBinary => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + // View arrays carry each value's length in the low 32 bits of + // its u128 view word, so lengths are scannable without touching + // any data buffer — and the common small-value case skips even + // that scan via an O(1) conservative bound. + DataType::Utf8View => { + let array = values.as_any().downcast_ref::().unwrap(); + count_within_budget_views( + array.views(), + indices, + byte_budget, + max_view_value_len(array.data_buffers()), + ) + } + DataType::BinaryView => { + let array = values.as_any().downcast_ref::().unwrap(); + count_within_budget_views( + array.views(), + indices, + byte_budget, + max_view_value_len(array.data_buffers()), + ) + } + // The values in an arrow dictionary are already small and + // deduplicated, so there is nothing to bound — treat every + // chunk as fitting and stay on the batched path. (A per-value + // walk through dict keys on every chunk also measured ~+30-80% + // slower than `main`.) + DataType::Dictionary(_, _) => indices.len(), + // Every byte-array type `ByteArrayEncoder` is constructed for + // has an explicit arm above. A `Dictionary(value = FixedSizeBinary)` + // column hits the `Dictionary(_, _)` arm (its `values.data_type()` + // is `Dictionary`), and a bare `FixedSizeBinary` column is routed + // to the generic column writer, never this encoder — so no other + // type can reach here. + data_type => unreachable!("ByteArrayEncoder cannot be constructed for {data_type:?}"), + }; + Some(count) + } + fn num_values(&self) -> usize { match &self.dict_encoder { Some(encoder) => encoder.indices.len(), @@ -593,6 +673,105 @@ where } } +/// Upper bound on any single value's byte length in a view array. +fn max_view_value_len(buffers: &[Buffer]) -> usize { + /// Bytes that fit inline in a u128 view word (the rest is len + prefix). + const MAX_INLINE_VIEW_LEN: usize = 12; + // An out-of-line view's data is a contiguous slice of exactly one data + // buffer, so it cannot exceed the largest buffer; inline views hold at + // most `MAX_INLINE_VIEW_LEN`. Loose (a value is usually far smaller than + // a whole buffer) but O(number of buffers) and always sound. + buffers + .iter() + .map(|b| b.len()) + .max() + .unwrap_or(0) + .max(MAX_INLINE_VIEW_LEN) +} + +/// Number of leading `indices` whose cumulative plain-encoded size fits +/// `byte_budget` (boundary value included), for view arrays (`Utf8View`, +/// `BinaryView`). +fn count_within_budget_views( + views: &[u128], + indices: &[usize], + byte_budget: usize, + max_value_len: usize, +) -> usize { + // Each plain-encoded BYTE_ARRAY value carries a 4-byte length prefix, so + // the budget is compared against `value_len + size_of::()` — the + // bytes actually written to the page, not just the payload. + // + // Stage 1: O(1) conservative bound. View arrays have no prefix-sum + // offsets buffer, so the exact span subtraction used by + // `count_within_budget_offsets` is unavailable; instead bound every + // value by `max_value_len`. Skips the walk for the common small-value + // case (what view arrays are built for, and where there is nothing to + // bound). + let per_value = max_value_len + std::mem::size_of::(); + if indices.len().saturating_mul(per_value) <= byte_budget { + return indices.len(); + } + // Stage 2: exact per-value scan, reading each length from the low 32 + // bits of its u128 view word (no data-buffer dereference). + let mut cum: usize = 0; + for (i, idx) in indices.iter().enumerate() { + let len = (views[*idx] as u32) as usize; + cum = cum.saturating_add(len + std::mem::size_of::()); + if cum > byte_budget { + return i + 1; + } + } + indices.len() +} + +/// Number of leading `indices` whose cumulative plain-encoded size fits +/// `byte_budget` (boundary value included), for offset-buffer byte arrays +/// (`Utf8`/`LargeUtf8`/`Binary`/`LargeBinary`). +/// +/// `indices` are assumed sorted ascending — they always are here, since +/// they come from `non_null_indices`, which is built in array order. +fn count_within_budget_offsets( + values: &GenericByteArray, + indices: &[usize], + byte_budget: usize, +) -> usize { + if indices.is_empty() { + return 0; + } + let n = indices.len(); + let first = indices[0]; + let last = indices[n - 1]; + let offsets = values.value_offsets(); + // Each plain-encoded value carries a 4-byte length prefix on the page. + let prefix_overhead = std::mem::size_of::(); + + // Stage 1: O(1) span upper bound. The span `offsets[last+1] - + // offsets[first]` covers every array position in `[first, last]`, a + // superset of `indices` — and the skipped positions in a nullable + // column are nulls with zero offset delta, so the span still equals the + // exact payload. If it fits the budget, every value fits. Covers the + // common small-value case for both non-null and (sparse) nullable + // columns. + if last >= first { + let payload = (offsets[last + 1] - offsets[first]).as_usize(); + if payload + n * prefix_overhead <= byte_budget { + return n; + } + } + + // Stage 2: scan per-index lengths from the offsets buffer. + let mut cum: usize = 0; + for (i, idx) in indices.iter().enumerate() { + let len = (offsets[idx + 1] - offsets[*idx]).as_usize() + prefix_overhead; + cum = cum.saturating_add(len); + if cum > byte_budget { + return i + 1; + } + } + n +} + /// Computes the min and max for the provided array and indices /// /// This is a free function so it can be used with `downcast_op!` diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 79542caed9b7..86e6cf081fed 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -4904,6 +4904,124 @@ mod tests { assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4); } + #[test] + fn test_arrow_writer_granular_mode_roundtrip() { + // Granular mode subdivides chunks and writes more pages than + // `main`. Make sure the data we write back is bit-identical to + // what went in — page-count assertions elsewhere only prove + // pages were cut, not that the encoded data is correct. + // + // Mix value sizes so that the cumulative-byte-budget cutoff + // lands mid-chunk, exercising both batched and granular paths + // within the same `write_batch_internal` call. + let small = "tiny".to_string(); + let big = "x".repeat(64 * 1024); + let strings: Vec = (0..256) + .map(|i| { + if i % 16 == 0 { + big.clone() + } else { + small.clone() + } + }) + .collect(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "col", + ArrowDataType::Utf8, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(strings.clone())) as _], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + .build(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + let data = Bytes::from(writer.into_inner().unwrap()); + + let mut reader = ParquetRecordBatchReader::try_new(data, 1024).unwrap(); + let read = reader.next().unwrap().unwrap(); + assert!(reader.next().is_none(), "expected one batch"); + let col = read + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.len(), strings.len()); + for (i, expected) in strings.iter().enumerate() { + assert_eq!( + col.value(i), + expected.as_str(), + "value mismatch at index {i}" + ); + } + } + + #[test] + fn test_arrow_writer_all_null_string_column() { + // The `LevelDataRef::value_count` Uniform branch with + // `value != max_def` (entirely-null chunk) must return 0 so the + // sub-batch sizer short-circuits to batch mode without trying + // to estimate byte budgets for non-existent values. + let num_rows = 1024; + let schema = Arc::new(Schema::new(vec![Field::new( + "col", + ArrowDataType::Utf8, + true, + )])); + let nulls: Vec> = vec![None; num_rows]; + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(nulls)) as _], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + .build(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + let data = Bytes::from(writer.into_inner().unwrap()); + + // Re-parse the file: row group has one column, every row is + // null, all data pages report `num_rows / page_count` rows. + let mut metadata = ParquetMetaDataReader::new(); + metadata.try_parse(&data).unwrap(); + let metadata = metadata.finish().unwrap(); + let row_group = metadata.row_group(0); + let col_meta = row_group.column(0); + assert_eq!(row_group.num_rows() as usize, num_rows); + // Statistics record `null_count = num_rows` — proves every value + // was written as null. + if let Some(stats) = col_meta.statistics() { + assert_eq!( + stats.null_count_opt().unwrap_or(0) as usize, + num_rows, + "expected all-null column to report null_count = num_rows" + ); + } + + let mut reader = + SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap(); + let mut total_values = 0u32; + while let Some(page) = reader.get_next_page().unwrap() { + if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) { + total_values += page.num_values(); + } + } + assert_eq!( + total_values as usize, num_rows, + "expected every level position to be represented in some page" + ); + } + struct WriteBatchesShape { num_batches: usize, rows_per_batch: usize, From c65423731c50ed5b7ff46f6a63f55fa2c8b661cd Mon Sep 17 00:00:00 2001 From: Adrian Garcia Badaracco <1755071+adriangb@users.noreply.github.com> Date: Wed, 3 Jun 2026 17:10:13 -0500 Subject: [PATCH 3/3] test(parquet): page-layout coverage for byte-budget page sizing Add declarative `LayoutTest` cases covering the arrow write path's page layout under the new byte budget, replacing hand-rolled page-reading loops with exact page counts/sizes: - large `Utf8` strings and `Utf8View` strings (one page per value) - large values inside a list column (record-by-record stepping) - nullable large values (def-level value counting) - dictionary spill then plain-encode transition - FixedSizeBinary byte budget Also updates the existing `test_string` dict-spill expectations: the dictionary page is now bounded at its limit and spills one mini-batch earlier instead of overshooting. Co-Authored-By: Claude Opus 4.8 (1M context) --- parquet/tests/arrow_writer_layout.rs | 365 ++++++++++++++++++++++++++- 1 file changed, 357 insertions(+), 8 deletions(-) diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs index b9d997beb289..1c63a3144391 100644 --- a/parquet/tests/arrow_writer_layout.rs +++ b/parquet/tests/arrow_writer_layout.rs @@ -19,14 +19,16 @@ use arrow::array::{Int32Array, StringArray}; use arrow::record_batch::RecordBatch; -use arrow_array::builder::{Int32Builder, ListBuilder}; +use arrow_array::builder::{BinaryBuilder, Int32Builder, ListBuilder}; +use arrow_array::types::Int32Type; +use arrow_array::{DictionaryArray, FixedSizeBinaryArray, StringViewArray}; use bytes::Bytes; use parquet::arrow::ArrowWriter; use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder}; use parquet::basic::{Encoding, PageType}; use parquet::file::metadata::PageIndexPolicy; use parquet::file::metadata::ParquetMetaData; -use parquet::file::properties::{ReaderProperties, WriterProperties}; +use parquet::file::properties::{EnabledStatistics, ReaderProperties, WriterProperties}; use parquet::file::reader::SerializedPageReader; use parquet::schema::types::ColumnPath; use std::sync::Arc; @@ -408,16 +410,16 @@ fn test_string() { columns: vec![ColumnChunk { pages: vec![ Page { - rows: 130, + rows: 126, page_header_size: 38, - compressed_size: 138, + compressed_size: 114, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { - rows: 1250, + rows: 1254, page_header_size: 40, - compressed_size: 10000, + compressed_size: 10032, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, }, @@ -429,10 +431,16 @@ fn test_string() { page_type: PageType::DATA_PAGE, }, ], + // The byte-budget chunker sub-batches the dictionary + // phase. The mini-batch deliberately includes the value + // that crosses the 1000-byte limit so the spill triggers + // on this chunk rather than carrying a sliver into the + // next page, giving a 126-row dictionary page at 1008 + // bytes. dictionary_page: Some(Page { - rows: 130, + rows: 126, page_header_size: 38, - compressed_size: 1040, + compressed_size: 1008, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, }), @@ -599,3 +607,344 @@ fn test_per_column_data_page_size_limit() { assert_eq!(col_a_page_count, 16); assert_eq!(col_b_page_count, 1); } + +#[test] +fn test_fixed_size_binary() { + // FixedSizeBinary values larger than the data page byte limit. + let value_size = 1024usize; + let num_rows = 64usize; + let values: Vec = (0..num_rows) + .flat_map(|i| vec![i as u8; value_size]) + .collect(); + let array = + Arc::new(FixedSizeBinaryArray::try_new(value_size as i32, values.into(), None).unwrap()) + as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(4096) + .set_write_page_header_statistics(true) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + // 12 pages of 5 values (5 * 1024 = 5120 B, the boundary + // value pushes each page just past the 4096 B limit) plus + // a final page with the remaining 4 values. + pages: (0..12) + .map(|_| Page { + rows: 5, + page_header_size: 157, + compressed_size: 5120, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }) + .chain(std::iter::once(Page { + rows: 4, + page_header_size: 157, + compressed_size: 4096, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + })) + .collect(), + dictionary_page: None, + }], + }], + }, + }); +} + +#[test] +fn test_dictionary() { + // Arrow `DictionaryArray` input. + let num_rows = 2000; + let dict_values = StringArray::from_iter_values(["alpha", "beta", "gamma", "delta"]); + let keys = Int32Array::from_iter_values((0..num_rows).map(|i| i % 4)); + let array = + Arc::new(DictionaryArray::::try_new(keys, Arc::new(dict_values)).unwrap()) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(true) + .set_dictionary_page_size_limit(1000) + .set_data_page_size_limit(1000) + .set_write_batch_size(10) + .set_write_page_header_statistics(true) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: vec![Page { + rows: 2000, + page_header_size: 40, + compressed_size: 505, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }], + dictionary_page: Some(Page { + rows: 4, + page_header_size: 38, + compressed_size: 35, + encoding: Encoding::PLAIN, + page_type: PageType::DICTIONARY_PAGE, + }), + }], + }], + }, + }); +} + +#[test] +fn test_large_string() { + // Large `Utf8` values (64 KiB each) with a 16 KiB data page limit. + // + // Each value already exceeds the page byte budget, so the byte-budget + // chunker in `ByteArrayEncoder` (the offsets-buffer scan in + // `count_within_budget_offsets`) cuts one value per page instead of + // buffering the whole ~2 MiB column into a single page. This drives the + // real `ArrowWriter` user path; the lower-level column writer is covered + // by `test_column_writer_caps_page_size_for_large_byte_array_values`. + let value_size = 64 * 1024; + let strings: Vec = (0..32).map(|_| "x".repeat(value_size)).collect(); + let array = Arc::new(StringArray::from(strings)) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + // Disable statistics so page headers stay small and the layout is + // determined purely by the page-splitting logic under test. + .set_statistics_enabled(EnabledStatistics::None) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + // One 64 KiB value per page (4-byte length prefix + value). + pages: (0..32) + .map(|_| Page { + rows: 1, + page_header_size: 21, + compressed_size: 65540, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }) + .collect(), + dictionary_page: None, + }], + }], + }, + }); +} + +#[test] +fn test_large_string_view() { + // Same bytes and expected layout as `test_large_string`, but the input + // is a `Utf8View` array. View arrays expose no contiguous offsets + // buffer, so the arrow writer bounds pages via the view-specific scan + // (`count_within_budget_views`, reading each value's length from the + // low 32 bits of its view word) rather than the offsets scan. This + // confirms that path caps pages identically. + let value_size = 64 * 1024; + let strings: Vec = (0..32).map(|_| "x".repeat(value_size)).collect(); + let array = Arc::new(StringViewArray::from_iter_values( + strings.iter().map(|s| s.as_str()), + )) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + .set_statistics_enabled(EnabledStatistics::None) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: (0..32) + .map(|_| Page { + rows: 1, + page_header_size: 21, + compressed_size: 65540, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }) + .collect(), + dictionary_page: None, + }], + }], + }, + }); +} + +#[test] +fn test_large_values_in_list() { + // `list` with large leaf values, driving the record-by-record + // (Materialized-rep) arm of the byte-budget chunker through the arrow + // path. Because repetition levels are present, a list element's leaves + // can never span pages, so the chunker steps from one `rep == 0` + // boundary to the next. Three records of three 32 KiB blobs each, with + // a 16 KiB page limit, yield exactly one page per record (a whole + // record's ~96 KiB of leaves stays together even though it blows the + // budget — it cannot be split). The raw-writer analogue is + // `test_column_writer_caps_page_size_for_large_values_in_list`. + let value_size = 32 * 1024; + let mut builder = ListBuilder::new(BinaryBuilder::new()); + let mut byte = 0u8; + for _ in 0..3 { + for _ in 0..3 { + builder.values().append_value(vec![byte; value_size]); + byte = byte.wrapping_add(1); + } + builder.append(true); + } + let array = Arc::new(builder.finish()) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + .set_statistics_enabled(EnabledStatistics::None) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + // One record (3 leaves + rep/def levels) per page. + pages: (0..3) + .map(|_| Page { + rows: 1, + page_header_size: 21, + compressed_size: 98328, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }) + .collect(), + dictionary_page: None, + }], + }], + }, + }); +} + +#[test] +fn test_nullable_large_values() { + // Nullable `Utf8` column alternating null / 32 KiB value. The byte + // budget must count only the non-null values (the def-level value + // count), not the level count — otherwise the estimate would be wrong + // for sparse columns. With a 16 KiB page limit each 32 KiB value still + // gets its own page (carrying its adjacent leading null), giving 16 + // two-row pages. Mirrors the raw-writer + // `test_column_writer_caps_page_size_with_nullable_large_values`. + let value_size = 32 * 1024; + let big = "x".repeat(value_size); + let values: Vec> = (0..32) + .map(|i| if i % 2 == 1 { Some(big.clone()) } else { None }) + .collect(); + let array = Arc::new(StringArray::from(values)) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + .set_statistics_enabled(EnabledStatistics::None) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + // Each page holds one null + one 32 KiB value (2 rows). + pages: (0..16) + .map(|_| Page { + rows: 2, + page_header_size: 21, + compressed_size: 32778, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }) + .collect(), + dictionary_page: None, + }], + }], + }, + }); +} + +#[test] +fn test_dictionary_spill_large_values() { + // Dictionary encoding is enabled, but each value is large (64 KiB) and + // unique, so the dictionary spills almost immediately (1 KiB dict page + // limit). After the spill, plain encoding takes over and the byte-budget + // sub-batch bounds each page to a single value. The first value is + // interned into the dictionary page (one RLE_DICTIONARY data page + // referencing it); the remaining 31 fall back to PLAIN, one per page. + // Mirrors `test_column_writer_dict_enabled_large_values_post_spill`, + // exercising the same dict→plain transition via the arrow path. + let value_size = 64 * 1024; + let strings: Vec = (0..32) + .map(|i| format!("{i:05}") + &"x".repeat(value_size - 5)) + .collect(); + let array = Arc::new(StringArray::from(strings)) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + let props = WriterProperties::builder() + .set_dictionary_enabled(true) + // Tiny dict limit so it spills after the first (already oversized) + // value, leaving the rest to exercise the post-spill plain path. + .set_dictionary_page_size_limit(1024) + .set_data_page_size_limit(16 * 1024) + .set_write_batch_size(4) + .set_statistics_enabled(EnabledStatistics::None) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: std::iter::once(Page { + // The single value interned before the dict spilled. + rows: 1, + page_header_size: 17, + compressed_size: 2, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }) + .chain((0..31).map(|_| Page { + // Post-spill plain-encoded values, one per page. + rows: 1, + page_header_size: 21, + compressed_size: 65540, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + })) + .collect(), + dictionary_page: Some(Page { + rows: 1, + page_header_size: 21, + compressed_size: 65540, + encoding: Encoding::PLAIN, + page_type: PageType::DICTIONARY_PAGE, + }), + }], + }], + }, + }); +}