diff --git a/parquet/src/arrow/arrow_writer/byte_array.rs b/parquet/src/arrow/arrow_writer/byte_array.rs index 9cb0718b4d84..145431c26465 100644 --- a/parquet/src/arrow/arrow_writer/byte_array.rs +++ b/parquet/src/arrow/arrow_writer/byte_array.rs @@ -30,10 +30,12 @@ use crate::geospatial::statistics::GeospatialStatistics; use crate::schema::types::ColumnDescPtr; use crate::util::bit_util::num_required_bits; use crate::util::interner::{Interner, Storage}; +use arrow_array::types::ByteArrayType; use arrow_array::{ Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray, - LargeBinaryArray, LargeStringArray, StringArray, StringViewArray, + GenericByteArray, LargeBinaryArray, LargeStringArray, StringArray, StringViewArray, }; +use arrow_buffer::{ArrowNativeType, Buffer}; use arrow_schema::DataType; macro_rules! downcast_dict_impl { @@ -481,6 +483,84 @@ impl ColumnValueEncoder for ByteArrayEncoder { Ok(()) } + fn count_values_within_byte_budget_gather( + values: &Self::Values, + indices: &[usize], + byte_budget: usize, + ) -> Option { + // `ByteArrayEncoder` only ever writes via `write_gather`, so this + // is the relevant method. + // + // Two-stage walk for the simple offset-buffer byte array types: + // 1. If indices are contiguous, compute the total payload in + // O(1) via a single subtraction on the offsets buffer. + // When the total fits the budget — the overwhelmingly + // common "small values" case — return immediately. + // 2. Otherwise, walk per-value byte sizes from the offsets + // buffer (still cheap, no slice/UTF-8 construction) and + // exit at the first value that pushes the cumulative sum + // past the budget. This bounds skewed distributions: an + // outlier value is caught wherever it lands in the chunk. + let count = match values.data_type() { + DataType::Utf8 => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + DataType::LargeUtf8 => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + DataType::Binary => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + DataType::LargeBinary => count_within_budget_offsets( + values.as_any().downcast_ref::().unwrap(), + indices, + byte_budget, + ), + // View arrays carry each value's length in the low 32 bits of + // its u128 view word, so lengths are scannable without touching + // any data buffer — and the common small-value case skips even + // that scan via an O(1) conservative bound. + DataType::Utf8View => { + let array = values.as_any().downcast_ref::().unwrap(); + count_within_budget_views( + array.views(), + indices, + byte_budget, + max_view_value_len(array.data_buffers()), + ) + } + DataType::BinaryView => { + let array = values.as_any().downcast_ref::().unwrap(); + count_within_budget_views( + array.views(), + indices, + byte_budget, + max_view_value_len(array.data_buffers()), + ) + } + // The values in an arrow dictionary are already small and + // deduplicated, so there is nothing to bound — treat every + // chunk as fitting and stay on the batched path. (A per-value + // walk through dict keys on every chunk also measured ~+30-80% + // slower than `main`.) + DataType::Dictionary(_, _) => indices.len(), + // Every byte-array type `ByteArrayEncoder` is constructed for + // has an explicit arm above. A `Dictionary(value = FixedSizeBinary)` + // column hits the `Dictionary(_, _)` arm (its `values.data_type()` + // is `Dictionary`), and a bare `FixedSizeBinary` column is routed + // to the generic column writer, never this encoder — so no other + // type can reach here. + data_type => unreachable!("ByteArrayEncoder cannot be constructed for {data_type:?}"), + }; + Some(count) + } + fn num_values(&self) -> usize { match &self.dict_encoder { Some(encoder) => encoder.indices.len(), @@ -593,6 +673,105 @@ where } } +/// Upper bound on any single value's byte length in a view array. +fn max_view_value_len(buffers: &[Buffer]) -> usize { + /// Bytes that fit inline in a u128 view word (the rest is len + prefix). + const MAX_INLINE_VIEW_LEN: usize = 12; + // An out-of-line view's data is a contiguous slice of exactly one data + // buffer, so it cannot exceed the largest buffer; inline views hold at + // most `MAX_INLINE_VIEW_LEN`. Loose (a value is usually far smaller than + // a whole buffer) but O(number of buffers) and always sound. + buffers + .iter() + .map(|b| b.len()) + .max() + .unwrap_or(0) + .max(MAX_INLINE_VIEW_LEN) +} + +/// Number of leading `indices` whose cumulative plain-encoded size fits +/// `byte_budget` (boundary value included), for view arrays (`Utf8View`, +/// `BinaryView`). +fn count_within_budget_views( + views: &[u128], + indices: &[usize], + byte_budget: usize, + max_value_len: usize, +) -> usize { + // Each plain-encoded BYTE_ARRAY value carries a 4-byte length prefix, so + // the budget is compared against `value_len + size_of::()` — the + // bytes actually written to the page, not just the payload. + // + // Stage 1: O(1) conservative bound. View arrays have no prefix-sum + // offsets buffer, so the exact span subtraction used by + // `count_within_budget_offsets` is unavailable; instead bound every + // value by `max_value_len`. Skips the walk for the common small-value + // case (what view arrays are built for, and where there is nothing to + // bound). + let per_value = max_value_len + std::mem::size_of::(); + if indices.len().saturating_mul(per_value) <= byte_budget { + return indices.len(); + } + // Stage 2: exact per-value scan, reading each length from the low 32 + // bits of its u128 view word (no data-buffer dereference). + let mut cum: usize = 0; + for (i, idx) in indices.iter().enumerate() { + let len = (views[*idx] as u32) as usize; + cum = cum.saturating_add(len + std::mem::size_of::()); + if cum > byte_budget { + return i + 1; + } + } + indices.len() +} + +/// Number of leading `indices` whose cumulative plain-encoded size fits +/// `byte_budget` (boundary value included), for offset-buffer byte arrays +/// (`Utf8`/`LargeUtf8`/`Binary`/`LargeBinary`). +/// +/// `indices` are assumed sorted ascending — they always are here, since +/// they come from `non_null_indices`, which is built in array order. +fn count_within_budget_offsets( + values: &GenericByteArray, + indices: &[usize], + byte_budget: usize, +) -> usize { + if indices.is_empty() { + return 0; + } + let n = indices.len(); + let first = indices[0]; + let last = indices[n - 1]; + let offsets = values.value_offsets(); + // Each plain-encoded value carries a 4-byte length prefix on the page. + let prefix_overhead = std::mem::size_of::(); + + // Stage 1: O(1) span upper bound. The span `offsets[last+1] - + // offsets[first]` covers every array position in `[first, last]`, a + // superset of `indices` — and the skipped positions in a nullable + // column are nulls with zero offset delta, so the span still equals the + // exact payload. If it fits the budget, every value fits. Covers the + // common small-value case for both non-null and (sparse) nullable + // columns. + if last >= first { + let payload = (offsets[last + 1] - offsets[first]).as_usize(); + if payload + n * prefix_overhead <= byte_budget { + return n; + } + } + + // Stage 2: scan per-index lengths from the offsets buffer. + let mut cum: usize = 0; + for (i, idx) in indices.iter().enumerate() { + let len = (offsets[idx + 1] - offsets[*idx]).as_usize() + prefix_overhead; + cum = cum.saturating_add(len); + if cum > byte_budget { + return i + 1; + } + } + n +} + /// Computes the min and max for the provided array and indices /// /// This is a free function so it can be used with `downcast_op!` diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 79542caed9b7..86e6cf081fed 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -4904,6 +4904,124 @@ mod tests { assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4); } + #[test] + fn test_arrow_writer_granular_mode_roundtrip() { + // Granular mode subdivides chunks and writes more pages than + // `main`. Make sure the data we write back is bit-identical to + // what went in — page-count assertions elsewhere only prove + // pages were cut, not that the encoded data is correct. + // + // Mix value sizes so that the cumulative-byte-budget cutoff + // lands mid-chunk, exercising both batched and granular paths + // within the same `write_batch_internal` call. + let small = "tiny".to_string(); + let big = "x".repeat(64 * 1024); + let strings: Vec = (0..256) + .map(|i| { + if i % 16 == 0 { + big.clone() + } else { + small.clone() + } + }) + .collect(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "col", + ArrowDataType::Utf8, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(strings.clone())) as _], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + .build(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + let data = Bytes::from(writer.into_inner().unwrap()); + + let mut reader = ParquetRecordBatchReader::try_new(data, 1024).unwrap(); + let read = reader.next().unwrap().unwrap(); + assert!(reader.next().is_none(), "expected one batch"); + let col = read + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(col.len(), strings.len()); + for (i, expected) in strings.iter().enumerate() { + assert_eq!( + col.value(i), + expected.as_str(), + "value mismatch at index {i}" + ); + } + } + + #[test] + fn test_arrow_writer_all_null_string_column() { + // The `LevelDataRef::value_count` Uniform branch with + // `value != max_def` (entirely-null chunk) must return 0 so the + // sub-batch sizer short-circuits to batch mode without trying + // to estimate byte budgets for non-existent values. + let num_rows = 1024; + let schema = Arc::new(Schema::new(vec![Field::new( + "col", + ArrowDataType::Utf8, + true, + )])); + let nulls: Vec> = vec![None; num_rows]; + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(StringArray::from(nulls)) as _], + ) + .unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + .build(); + let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap(); + writer.write(&batch).unwrap(); + let data = Bytes::from(writer.into_inner().unwrap()); + + // Re-parse the file: row group has one column, every row is + // null, all data pages report `num_rows / page_count` rows. + let mut metadata = ParquetMetaDataReader::new(); + metadata.try_parse(&data).unwrap(); + let metadata = metadata.finish().unwrap(); + let row_group = metadata.row_group(0); + let col_meta = row_group.column(0); + assert_eq!(row_group.num_rows() as usize, num_rows); + // Statistics record `null_count = num_rows` — proves every value + // was written as null. + if let Some(stats) = col_meta.statistics() { + assert_eq!( + stats.null_count_opt().unwrap_or(0) as usize, + num_rows, + "expected all-null column to report null_count = num_rows" + ); + } + + let mut reader = + SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap(); + let mut total_values = 0u32; + while let Some(page) = reader.get_next_page().unwrap() { + if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) { + total_values += page.num_values(); + } + } + assert_eq!( + total_values as usize, num_rows, + "expected every level position to be represented in some page" + ); + } + struct WriteBatchesShape { num_batches: usize, rows_per_batch: usize, diff --git a/parquet/src/column/writer/byte_budget_chunker.rs b/parquet/src/column/writer/byte_budget_chunker.rs new file mode 100644 index 000000000000..56c7b4c6f86d --- /dev/null +++ b/parquet/src/column/writer/byte_budget_chunker.rs @@ -0,0 +1,204 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! See [`ByteBudgetChunker`] for byte-budget-aware mini-batch sizing. + +use crate::basic::Type; +use crate::column::writer::LevelDataRef; +use crate::column::writer::encoder::ColumnValueEncoder; +use crate::file::properties::WriterProperties; +use crate::schema::types::ColumnDescriptor; + +/// Picks byte-budget-aware mini-batch sizes for one column. +/// +/// The parquet column writer checks the data page byte limit only *after* +/// each mini-batch finishes writing. Mini-batches are sized in rows +/// (`write_batch_size`, default 1024), so for BYTE_ARRAY columns whose +/// values are large (e.g. multi-MiB blobs) a single mini-batch can buffer +/// GiB into one page before the limit is consulted. +/// +/// This isolates the per-chunk decision that prevents that: given a chunk's +/// level data and the input values, pick the largest `sub_batch_size` such +/// that one mini-batch fits in one page byte budget. For the overwhelmingly +/// common case (small or fixed-width values) the answer is just `chunk_size` +/// and the decision is O(1) on the column type — only when the input might +/// overflow does the chunker consult the encoder's byte estimate. +pub(crate) struct ByteBudgetChunker { + /// Configured data page byte limit for the column. + page_byte_limit: usize, + /// Max definition level of the column; a level equal to this marks a + /// present (non-null) leaf value. Used to count values per chunk. + max_def_level: i16, + /// `true` when no chunk of `base_batch_size` values can ever overflow + /// `page_byte_limit` regardless of input. Set once at column open from + /// the physical type's known per-value byte size; lets the per-chunk + /// decision short-circuit with no work for every numeric, bool, or + /// narrow `FIXED_LEN_BYTE_ARRAY` column. + static_always_fits: bool, + /// Configured dictionary page byte limit for the column. + dict_page_byte_limit: usize, + /// As [`Self::static_always_fits`] but for the dictionary page: `true` + /// when one `base_batch_size` mini-batch of this fixed-width type cannot + /// overshoot `dict_page_byte_limit` by more than one mini-batch's worth. + static_dict_always_fits: bool, +} + +impl ByteBudgetChunker { + #[inline] + pub(crate) fn new( + descr: &ColumnDescriptor, + props: &WriterProperties, + base_batch_size: usize, + ) -> Self { + let page_byte_limit = props.column_data_page_size_limit(descr.path()); + let dict_page_byte_limit = props.column_dictionary_page_size_limit(descr.path()); + let static_bytes_per_value = match descr.physical_type() { + Type::BOOLEAN => Some(1), + Type::INT32 | Type::FLOAT => Some(std::mem::size_of::()), + Type::INT64 | Type::DOUBLE => Some(std::mem::size_of::()), + Type::INT96 => Some(12), + Type::FIXED_LEN_BYTE_ARRAY => Some(descr.type_length().max(0) as usize), + Type::BYTE_ARRAY => None, + }; + let static_fits = |limit: usize| { + static_bytes_per_value + .map(|b| b.saturating_mul(base_batch_size) <= limit) + .unwrap_or(false) + }; + Self { + page_byte_limit, + max_def_level: descr.max_def_level(), + static_always_fits: static_fits(page_byte_limit), + dict_page_byte_limit, + static_dict_always_fits: static_fits(dict_page_byte_limit), + } + } + + /// Decide how many levels at the start of a chunk belong in one + /// mini-batch, so the mini-batch cannot overflow whichever page is + /// currently accumulating value bytes: the data page when plain-encoding, + /// or the *dictionary* page while dictionary-encoding. A returned value + /// smaller than `chunk_size` triggers granular sub-batching in + /// `write_batch_internal`. + /// + /// While dictionary-encoding, the data page holds only small RLE indices, + /// but the dictionary page accumulates the distinct values themselves — + /// so it is the dictionary page's remaining budget that must bound the + /// mini-batch. The per-mini-batch dictionary spill check would otherwise + /// let one mini-batch of large values balloon the dictionary page. + /// + /// Returns `chunk_size` immediately (no value inspection) when the chunk + /// is empty, or when the column is a fixed-width type whose mini-batches + /// statically cannot overshoot the relevant page. + /// + /// `#[inline]`: this is a tiny per-chunk dispatcher; the actual byte + /// inspection lives in the out-of-line `byte_budget_sub_batch_size`. + #[inline] + pub(crate) fn pick_sub_batch_size( + &self, + encoder: &E, + values: &E::Values, + value_indices: Option<&[usize]>, + chunk_def: LevelDataRef<'_>, + values_offset: usize, + chunk_size: usize, + ) -> usize { + if chunk_size == 0 { + return chunk_size; + } + let budget = if encoder.has_dictionary() { + if self.static_dict_always_fits { + return chunk_size; + } + // Bound the mini-batch by the dictionary page's *remaining* + // budget (it accumulates across mini-batches until it spills). + match encoder.estimated_dict_page_size() { + Some(used) => self.dict_page_byte_limit.saturating_sub(used), + None => return chunk_size, + } + } else { + if self.static_always_fits { + return chunk_size; + } + self.page_byte_limit + }; + self.byte_budget_sub_batch_size::( + values, + value_indices, + chunk_def, + values_offset, + chunk_size, + budget, + ) + } + + /// Inspect value sizes to decide how many of the chunk's values fit in + /// `budget` bytes (the data page or dictionary page remaining budget). + /// + /// `#[inline(never)]` keeps this slow path out of the hot + /// `write_batch_internal` loop; numeric and bool columns never reach it. + #[inline(never)] + fn byte_budget_sub_batch_size( + &self, + values: &E::Values, + value_indices: Option<&[usize]>, + chunk_def: LevelDataRef<'_>, + values_offset: usize, + chunk_size: usize, + budget: usize, + ) -> usize { + // How many of this chunk's levels carry an actual value. For a + // non-nullable, unrepeated column every level is a value, so + // `value_count` is O(1) (`Absent`/`Uniform` def levels); only + // nullable or nested columns pay the O(chunk_size) def-level scan. + let vals_in_chunk = chunk_def.value_count(chunk_size, self.max_def_level); + if vals_in_chunk == 0 { + return chunk_size; + } + // Ask the encoder how many of the next values fit in one page byte + // budget. Dispatch on whether the caller supplied gather indices; + // this mirrors how `write_mini_batch` picks `write_gather` vs + // `write`. + let fit = match value_indices { + Some(idx) => { + let end = (values_offset + vals_in_chunk).min(idx.len()); + let start = values_offset.min(end); + E::count_values_within_byte_budget_gather(values, &idx[start..end], budget) + } + None => { + E::count_values_within_byte_budget(values, values_offset, vals_in_chunk, budget) + } + }; + match fit { + None => chunk_size, + Some(values_per_subbatch) => { + // Convert the value count back into a level count. For a + // non-nullable column this is a no-op; for nullable/nested + // columns scale by the chunk's observed value-to-level + // ratio. + let levels_per_subbatch = if vals_in_chunk == chunk_size { + values_per_subbatch + } else { + (values_per_subbatch * chunk_size) + .div_ceil(vals_in_chunk) + .max(1) + }; + chunk_size.min(levels_per_subbatch.max(1)) + } + } + } +} diff --git a/parquet/src/column/writer/encoder.rs b/parquet/src/column/writer/encoder.rs index 2ea3376ae708..d9adacff4101 100644 --- a/parquet/src/column/writer/encoder.rs +++ b/parquet/src/column/writer/encoder.rs @@ -90,6 +90,42 @@ pub trait ColumnValueEncoder { /// Write the values at the indexes in `indices` to this [`ColumnValueEncoder`] fn write_gather(&mut self, values: &Self::Values, indices: &[usize]) -> Result<()>; + /// Returns the largest `k` such that the first `k` values in + /// `values[offset..offset + len]` encode to at most `byte_budget` + /// bytes — i.e. how many values fit in a single page byte budget. + /// + /// Returns `len` if every value fits. Returns at least 1 if a single + /// value alone exceeds the budget, matching parquet's "at least one + /// value per data page" rule. + /// + /// `None` means "no cheap estimate available"; the caller stays on + /// the batched fast path and lets the post-write + /// `should_add_data_page` check handle bounding. + /// + /// Implementations should short-circuit aggressively: the typical + /// case is "everything fits, return `len`", and the next-most-common + /// case is "one wide value, return 1." The variable-width walk only + /// needs to be precise when the chunk is genuinely near the budget. + fn count_values_within_byte_budget( + _values: &Self::Values, + _offset: usize, + _len: usize, + _byte_budget: usize, + ) -> Option { + None + } + + /// As [`Self::count_values_within_byte_budget`] but using gather + /// `indices` rather than a contiguous range. Returns the number of + /// `indices` that fit, not the maximum index value. + fn count_values_within_byte_budget_gather( + _values: &Self::Values, + _indices: &[usize], + _byte_budget: usize, + ) -> Option { + None + } + /// Returns the number of buffered values fn num_values(&self) -> usize; @@ -247,6 +283,39 @@ impl ColumnValueEncoder for ColumnValueEncoderImpl { self.write_slice(&slice) } + fn count_values_within_byte_budget( + values: &[T::T], + offset: usize, + len: usize, + byte_budget: usize, + ) -> Option { + // Clamp so that a caller-supplied `len` that overruns the input + // (e.g. a level/value mismatch the encoder will reject later) + // returns an estimate instead of panicking here. + let end = (offset + len).min(values.len()); + let start = offset.min(end); + count_within_budget::( + end - start, + byte_budget, + values[start..end].iter().map(Some), + ) + } + + fn count_values_within_byte_budget_gather( + values: &[T::T], + indices: &[usize], + byte_budget: usize, + ) -> Option { + // `values.get` yields `None` for an out-of-range index (defensive + // against a level/value mismatch the encoder rejects later); such a + // position is counted but contributes no bytes. + count_within_budget::( + indices.len(), + byte_budget, + indices.iter().map(|&i| values.get(i)), + ) + } + fn num_values(&self) -> usize { self.num_values } @@ -411,3 +480,75 @@ where } } } + +/// Plain-encoded byte cost of a single value of type `T::T`. +/// +/// Derived from [`ParquetValueType::dict_encoding_size`] (which returns +/// `(per-value overhead, value-bytes)`) so we don't add a parallel +/// per-value-size hook to the trait. Mirrors the dispatch in +/// `KeyStorage::push` (`encodings/encoding/dict_encoder.rs`). +/// +/// Placed at the end of the module deliberately. Inserting it above the +/// `ColumnValueEncoder` trait shifts the trait and `ColumnValueEncoderImpl` +/// within the compiled module enough to perturb downstream code placement, +/// which measurably regresses unrelated arrow-writer string benchmarks +/// (~5-9% on `string` / `string_and_binary_view`). Defining it last keeps +/// the hot encoder code at the offsets it has on `main`. +#[inline] +fn plain_encoded_byte_size(value: &T::T) -> usize { + let (overhead, bytes) = value.dict_encoding_size(); + match ::PHYSICAL_TYPE { + // Plain BYTE_ARRAY = 4-byte length prefix + payload. + Type::BYTE_ARRAY => overhead + bytes, + // Plain FLBA = raw bytes only; `dict_encoding_size`'s length prefix + // is irrelevant here, so the encoder passes `type_length` directly. + Type::FIXED_LEN_BYTE_ARRAY => bytes, + // Numeric/bool are short-circuited by the caller via + // `mem::size_of`, so this is unreachable in practice; fall back to + // `overhead` defensively. + _ => overhead, + } +} + +/// How many leading values fit in `byte_budget` bytes, shared by the two +/// `ColumnValueEncoder::count_values_within_byte_budget*` methods (one walks a +/// contiguous slice, the other gathers by index). +/// +/// `n` is the answer when everything fits; `vals` yields each candidate value, +/// or `None` for a position that should still be counted but contributes no +/// bytes (an out-of-range gather index). The boundary value that crosses the +/// budget is included in the count so the caller's page-flush check trips on +/// this mini-batch rather than leaving a sliver for the next page; this also +/// catches a lone outlier wherever it lands among small values. +/// +/// Defined at the end of the module alongside `plain_encoded_byte_size` for +/// the same reason — see that function's note on code placement and the +/// `string` / `string_and_binary_view` benchmarks. +#[inline] +fn count_within_budget<'a, T: DataType>( + n: usize, + byte_budget: usize, + vals: impl Iterator>, +) -> Option +where + T::T: 'a, +{ + // Fixed-size physical types have a constant per-value byte cost, so the + // answer is one division — no walk needed. + let phys = ::PHYSICAL_TYPE; + if phys != Type::BYTE_ARRAY && phys != Type::FIXED_LEN_BYTE_ARRAY { + let per = std::mem::size_of::().max(1); + return Some((byte_budget / per).max(1).min(n)); + } + // Variable-width: accumulate, exit at the first value past the budget. + let mut cum: usize = 0; + for (i, v) in vals.enumerate() { + if let Some(v) = v { + cum = cum.saturating_add(plain_encoded_byte_size::(v)); + } + if cum > byte_budget { + return Some(i + 1); + } + } + Some(n) +} diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs index 4e53230bbf89..090350f53aae 100644 --- a/parquet/src/column/writer/mod.rs +++ b/parquet/src/column/writer/mod.rs @@ -49,8 +49,11 @@ use crate::file::properties::{ use crate::file::statistics::{Statistics, ValueStatistics}; use crate::schema::types::{ColumnDescPtr, ColumnDescriptor}; +mod byte_budget_chunker; pub(crate) mod encoder; +use byte_budget_chunker::ByteBudgetChunker; + macro_rules! downcast_writer { ($e:expr, $i:ident, $b:expr) => { match $e { @@ -374,6 +377,24 @@ impl<'a> LevelDataRef<'a> { Self::Uniform { value, .. } => Self::Uniform { value, count: len }, } } + + /// Count of positions in this slice that represent an actual value + /// (definition level equal to `max_def`). `Absent` means the column has + /// `max_def == 0` and every position is a value, so the implicit count + /// is the caller-supplied `total`. + pub(crate) fn value_count(self, total: usize, max_def: i16) -> usize { + match self { + Self::Absent => total, + Self::Materialized(values) => values.iter().filter(|&&d| d == max_def).count(), + Self::Uniform { value, count } => { + if value == max_def { + count + } else { + 0 + } + } + } + } } /// Typed column writer for a primitive column. @@ -545,6 +566,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } else { self.props.write_batch_size() }; + let chunker = ByteBudgetChunker::new(&self.descr, &self.props, base_batch_size); while levels_offset < num_levels { let mut end_offset = num_levels.min(levels_offset + base_batch_size); @@ -555,14 +577,45 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { } } - values_offset += self.write_mini_batch( + let chunk_size = end_offset - levels_offset; + let chunk_def = def_levels.slice(levels_offset, chunk_size); + let chunk_rep = rep_levels.slice(levels_offset, chunk_size); + + // Key decision point: can we write this whole chunk as one + // mini-batch (the common case — small or fixed-width values, no + // further page-size accounting needed), or must we fall back to + // byte-budget-aware sub-batching to keep a page from overshooting + // `data_page_size_limit`? `pick_sub_batch_size` returns + // `chunk_size` for the former. + let sub_batch_size = chunker.pick_sub_batch_size( + &self.encoder, values, - values_offset, value_indices, - end_offset - levels_offset, - def_levels.slice(levels_offset, end_offset - levels_offset), - rep_levels.slice(levels_offset, end_offset - levels_offset), - )?; + chunk_def, + values_offset, + chunk_size, + ); + + if sub_batch_size >= chunk_size { + values_offset += self.write_mini_batch( + values, + values_offset, + value_indices, + chunk_size, + chunk_def, + chunk_rep, + )?; + } else { + values_offset += self.write_granular_chunk( + values, + values_offset, + value_indices, + chunk_size, + chunk_def, + chunk_rep, + sub_batch_size, + )?; + } levels_offset = end_offset; } @@ -713,6 +766,69 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> { }) } + /// Writes a chunk in `sub_batch_size`-level sub-batches, checking the + /// data page byte limit after each. This keeps the page size close to + /// `data_page_size_limit` instead of overshooting it by a whole chunk. + /// + /// For repeated/nested columns sub-batches step from one `rep == 0` + /// boundary to the next so a record never spans data pages, matching + /// the parquet format rule. + /// + /// Returns the total number of values consumed across all sub-batches. + /// + /// `#[inline(never)]` keeps this slow path — only reached for + /// variable-width columns whose values need page splitting — out of + /// the hot `write_batch_internal` loop. + #[allow(clippy::too_many_arguments)] + #[inline(never)] + fn write_granular_chunk( + &mut self, + values: &E::Values, + values_offset: usize, + value_indices: Option<&[usize]>, + chunk_size: usize, + chunk_def: LevelDataRef<'_>, + chunk_rep: LevelDataRef<'_>, + sub_batch_size: usize, + ) -> Result { + // The chunker always sizes a sub-batch to at least one level, so each + // iteration below makes progress (`sub_end > sub_start`). + debug_assert!(sub_batch_size >= 1, "chunker must size at least one level"); + let mut values_consumed = 0; + let mut sub_start = 0; + while sub_start < chunk_size { + let sub_end = match chunk_rep { + LevelDataRef::Materialized(levels) => { + // Pack up to `sub_batch_size` levels per mini-batch, then + // extend to the next record boundary (rep == 0) so a + // record never spans data pages. Packing whole records + // rather than stepping one record at a time avoids + // calling `write_mini_batch` per record: records average + // only a handful of levels, so a record-at-a-time step + // would issue many more mini-batches than necessary. + let mut e = (sub_start + sub_batch_size).min(chunk_size); + while e < chunk_size && levels[e] != 0 { + e += 1; + } + e + } + _ => (sub_start + sub_batch_size).min(chunk_size), + }; + let sub_len = sub_end - sub_start; + let written = self.write_mini_batch( + values, + values_offset + values_consumed, + value_indices, + sub_len, + chunk_def.slice(sub_start, sub_len), + chunk_rep.slice(sub_start, sub_len), + )?; + values_consumed += written; + sub_start = sub_end; + } + Ok(values_consumed) + } + /// Creates a new streaming level encoder appropriate for the writer version. fn create_level_encoder(max_level: i16, props: &WriterProperties) -> LevelEncoder { match props.writer_version() { @@ -2676,6 +2792,310 @@ mod tests { assert_eq!(other_values, vec![10]); } + #[test] + fn test_column_writer_caps_page_size_for_large_byte_array_values() { + // Regression: the post-write data page byte limit check only fires + // at mini-batch boundaries, so a 1024-row mini-batch of multi-MiB + // BYTE_ARRAY values used to buffer multiple GiB into a single page + // before the limit was even consulted. With the threshold-based + // granular mode this batch should split into ~one page per value. + let value_size = 64 * 1024; // 64 KiB per value + let page_byte_limit = 16 * 1024; // 16 KiB page limit + let num_rows = 64; + + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + // Default write_batch_size (1024) — without the fix this + // buffers the entire input into a single ~4 MiB page. + .build(); + + let data: Vec<_> = (0..num_rows) + .map(|i| ByteArray::from(vec![i as u8; value_size])) + .collect(); + let pages = write_and_collect_pages::(props, 0, 0, &data, None, None); + + // Every value must end up somewhere. + let total_values: u32 = pages.data_pages.iter().map(|(_, n)| n).sum(); + assert_eq!(total_values as usize, num_rows); + // Without the fix this assertion fired with one ~4 MiB page; the + // threshold splits the input so that no page holds more than a + // single oversized value's worth of bytes. + assert!( + pages.data_pages.len() >= num_rows / 2, + "expected pages to be cut close to one per value, got {:?}", + pages.data_pages, + ); + // Each page must be bounded by roughly one value's worth of bytes; + // parquet allows a single oversized value to occupy a page by + // itself but never lets us pile many of them together. + for (size, _) in &pages.data_pages { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound ({}B) — pages {:?}", + value_size + 64, + pages.data_pages, + ); + } + } + + #[test] + fn test_column_writer_caps_page_size_for_large_values_in_list() { + // Coverage for the Materialized-rep branch of + // `write_granular_chunk`. The flat-column regression test + // exercises the per-level step; this exercises the + // record-by-record step used when rep levels are present. + // + // Column is `list` (max_def = 1, max_rep = 1) + // with 3 records of 3 large blobs each. The page byte limit is + // smaller than a single blob, so granular mode kicks in, and the + // Materialized-rep arm of `write_granular_chunk` steps from one + // `rep == 0` boundary to the next so a record never spans pages. + let value_size = 32 * 1024; + let page_byte_limit = 16 * 1024; + let values_per_record = 3; + let num_records = 3; + let num_values = values_per_record * num_records; + + // rep levels: 0, 1, 1, 0, 1, 1, 0, 1, 1 + let mut rep_levels = Vec::with_capacity(num_values); + for _ in 0..num_records { + rep_levels.push(0i16); + rep_levels.extend(std::iter::repeat_n(1i16, values_per_record - 1)); + } + let def_levels = vec![1i16; num_values]; + + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + .build(); + + let data: Vec<_> = (0..num_values) + .map(|i| ByteArray::from(vec![i as u8; value_size])) + .collect(); + let pages = write_and_collect_pages::( + props, + 1, + 1, + &data, + Some(&def_levels), + Some(&rep_levels), + ); + let data_pages = pages.data_pages; + + // The Materialized-rep arm groups levels by record, and each + // record's bytes blow the page byte limit on its own, so we get + // exactly one page per record. + assert_eq!( + data_pages.len(), + num_records, + "expected one data page per record, got {data_pages:?}" + ); + for (bytes, n_values) in &data_pages { + assert_eq!( + *n_values as usize, values_per_record, + "each page must hold a whole record's leaves, got {data_pages:?}" + ); + // Each page is one full record (its leaves cannot be split), + // so allow up to `values_per_record` blobs of payload plus a + // small fudge for level encoding overhead. + let upper_bound = values_per_record * (value_size + 16); + assert!( + *bytes <= upper_bound, + "page size {bytes} exceeds whole-record bound ({upper_bound}); pages {data_pages:?}" + ); + } + } + + #[test] + fn test_column_writer_caps_page_size_with_nullable_large_values() { + // Coverage for `LevelDataRef::value_count` on Materialized def + // levels: a nullable column with mixed nulls and large values. + // `value_count` must return the actual non-null count so the + // byte estimate reflects bytes that will actually be written, + // not the level count. + let value_size = 32 * 1024; + let page_byte_limit = 16 * 1024; + let num_levels = 32; + + // Alternating null / non-null: 16 nulls and 16 values. + let def_levels: Vec = (0..num_levels as i16).map(|i| i % 2).collect(); + let num_values = def_levels.iter().filter(|&&d| d == 1).count(); + + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + .build(); + + let data: Vec<_> = (0..num_values) + .map(|i| ByteArray::from(vec![i as u8; value_size])) + .collect(); + let pages = + write_and_collect_pages::(props, 1, 0, &data, Some(&def_levels), None); + let data_pages: Vec<_> = pages.data_pages.iter().map(|(size, _)| *size).collect(); + + // With 16 actual values of 32 KiB each and a 16 KiB page limit, + // every non-null value should get its own page (plus possibly + // adjacent nulls). At minimum, the number of pages must be + // roughly the value count, not 1 (which is what `main` produced). + assert!( + data_pages.len() >= num_values / 2, + "expected at least {} pages for {num_values} large values, got {} pages: {data_pages:?}", + num_values / 2, + data_pages.len(), + ); + // No page contains more than ~one value's worth of payload bytes. + for size in &data_pages { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound; pages {data_pages:?}" + ); + } + } + + #[test] + fn test_column_writer_dict_enabled_large_values_post_spill() { + // While dictionary encoding is active, `has_dictionary()` short- + // circuits `estimated_value_bytes` — the byte estimate is plain- + // encoded size but dict-encoded pages only store small RLE + // indices, so we'd otherwise shrink pages spuriously. Once the + // dictionary spills (each value is large + unique), plain + // encoding takes over and the byte-budget sub-batch kicks in. + // + // This test makes sure the writer survives that transition and + // produces bounded pages thereafter. + let value_size = 64 * 1024; + let page_byte_limit = 16 * 1024; + let num_rows = 32; + + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(true) + // Force a small dict so it spills quickly even though + // each value here is unique. + .set_dictionary_page_size_limit(1024) + .set_data_page_size_limit(page_byte_limit) + // Small mini-batches so dict fallback happens part-way + // through the input, leaving subsequent mini-batches to + // exercise the post-spill plain-encoding path that the + // page-size fix actually targets. + .set_write_batch_size(4) + .build(); + + let data: Vec<_> = (0..num_rows) + .map(|i| ByteArray::from(vec![i as u8; value_size])) + .collect(); + let pages = write_and_collect_pages::(props, 0, 0, &data, None, None); + let data_pages: Vec<_> = pages.data_pages.iter().map(|(size, _)| *size).collect(); + + // After spill, plain encoding writes one ~64 KiB value per page. + // Without the fix, post-spill writes still buffered all 32 + // values into a single ~2 MiB page. + assert!( + data_pages.len() >= num_rows / 2, + "expected >= {} data pages after dict spill, got {} ({data_pages:?})", + num_rows / 2, + data_pages.len(), + ); + for size in &data_pages { + assert!( + *size <= value_size + 64, + "page size {size} exceeds one-value bound; pages {data_pages:?}" + ); + } + } + + #[test] + fn test_column_writer_caps_dictionary_page_size() { + // A column of large *distinct* values with dictionary encoding on: + // the dictionary page accumulates the values themselves, and its + // spill check runs only once per mini-batch. Without bounding the + // dictionary-encoding mini-batch, one `write_batch_size` mini-batch + // would intern `write_batch_size * value_size` bytes into the + // dictionary page before the check fires (~16 MiB here). The chunker + // must sub-batch the dictionary-encoding phase too. + let value_size = 8 * 1024; + let dict_page_limit = 64 * 1024; + let num_rows = 2048; + + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(true) + .set_dictionary_page_size_limit(dict_page_limit) + .build(); + + let data: Vec<_> = (0..num_rows) + .map(|i| { + // each value distinct, so the dictionary cannot dedup them + let mut v = vec![0u8; value_size]; + v[..8].copy_from_slice(&(i as u64).to_le_bytes()); + ByteArray::from(v) + }) + .collect(); + let pages = write_and_collect_pages::(props, 0, 0, &data, None, None); + let dict_page_size = pages.dict_page_size; + + assert!( + dict_page_size > 0, + "expected the column to dictionary-encode" + ); + // Bounded near the limit (~2x from the post-mini-batch check). Before + // the fix the dictionary page reached num_rows * value_size (~16 MiB, + // 256x the limit). + assert!( + dict_page_size <= 3 * dict_page_limit, + "dictionary page {dict_page_size} exceeds 3x the {dict_page_limit} limit", + ); + } + + #[test] + fn test_column_writer_caps_page_size_for_fixed_len_byte_array() { + // Coverage for `ParquetValueType::byte_size` override on + // `FixedLenByteArray`. With `type_length = 1`, each plain-encoded + // value is one byte, so a 4-byte page byte limit forces the + // sub-batch sizer to write ~4 values per page rather than one + // page for the whole batch. + let page_byte_limit = 4; + let num_values = 128; + + let props = WriterProperties::builder() + .set_writer_version(WriterVersion::PARQUET_1_0) + .set_dictionary_enabled(false) + .set_encoding(Encoding::PLAIN) + .set_data_page_size_limit(page_byte_limit) + .build(); + + let data: Vec<_> = (0..num_values) + .map(|i| { + let mut fla = FixedLenByteArray::default(); + fla.set_data(Bytes::from(vec![i as u8])); + fla + }) + .collect(); + let pages = + write_and_collect_pages::(props, 0, 0, &data, None, None); + let data_pages: Vec<_> = pages.data_pages.iter().map(|(size, _)| *size).collect(); + + // Without the fix this is a single 128-byte page; with the fix + // the byte budget caps each page at ~`page_byte_limit` bytes. + assert!( + data_pages.len() >= num_values / 8, + "expected pages capped by byte budget, got {data_pages:?}" + ); + for size in &data_pages { + assert!( + *size <= page_byte_limit * 4, + "page size {size} larger than expected; pages {data_pages:?}" + ); + } + } + #[test] fn test_bool_statistics() { let stats = statistics_roundtrip::(&[true, false, false, true]); @@ -4309,6 +4729,69 @@ mod tests { get_typed_column_writer::(column_writer) } + /// Pages collected by [`write_and_collect_pages`]. + struct CollectedPages { + /// `(compressed byte size, value count)` for every data page, in order. + data_pages: Vec<(usize, u32)>, + /// Largest dictionary page seen, or 0 if the column wasn't dict-encoded. + dict_page_size: usize, + } + + /// Writes `data` (with optional def/rep levels) through a raw + /// `ColumnWriterImpl` configured by `props`, then re-reads the file and + /// returns its page layout. Shared by the page-size regression tests so + /// each only has to express its props, input, and assertions. + fn write_and_collect_pages( + props: WriterProperties, + max_def_level: i16, + max_rep_level: i16, + data: &[T::T], + def_levels: Option<&[i16]>, + rep_levels: Option<&[i16]>, + ) -> CollectedPages { + let mut file = tempfile::tempfile().unwrap(); + let mut write = TrackedWrite::new(&mut file); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let mut writer = + get_test_column_writer::(page_writer, max_def_level, max_rep_level, Arc::new(props)); + writer.write_batch(data, def_levels, rep_levels).unwrap(); + let r = writer.close().unwrap(); + drop(write); + + let read_props = ReaderProperties::builder() + .set_backward_compatible_lz4(false) + .build(); + let mut page_reader = Box::new( + SerializedPageReader::new_with_properties( + Arc::new(file), + &r.metadata, + r.rows_written as usize, + None, + Arc::new(read_props), + ) + .unwrap(), + ); + + let mut collected = CollectedPages { + data_pages: Vec::new(), + dict_page_size: 0, + }; + while let Some(page) = page_reader.get_next_page().unwrap() { + match page.page_type() { + PageType::DATA_PAGE | PageType::DATA_PAGE_V2 => { + collected + .data_pages + .push((page.buffer().len(), page.num_values())); + } + PageType::DICTIONARY_PAGE => { + collected.dict_page_size = collected.dict_page_size.max(page.buffer().len()); + } + _ => {} + } + } + collected + } + /// Returns column reader. fn get_test_column_reader( page_reader: Box, @@ -4717,6 +5200,42 @@ mod tests { } } + #[test] + fn test_level_data_ref_value_count() { + // `value_count` is what the byte-budget chunker uses to convert a + // chunk's level span into a leaf-value count. It must work for any + // column shape — flat, nullable, or nested — because the leaf + // values array is decoupled from the rep/def level stream. + let max_def = 2; + // Non-nullable / unrepeated: no def levels materialized — every + // level is a value. + assert_eq!(LevelDataRef::Absent.value_count(64, max_def), 64); + // Uniform run of present values, and of nulls. + assert_eq!( + LevelDataRef::Uniform { + value: max_def, + count: 40 + } + .value_count(40, max_def), + 40 + ); + assert_eq!( + LevelDataRef::Uniform { + value: max_def - 1, + count: 40 + } + .value_count(40, max_def), + 0 + ); + // Materialized def levels (nullable / nested): only levels equal to + // `max_def` are values; empty-list / null levels are not. + let levels = [2i16, 0, 2, 1, 2, 2, 0]; + assert_eq!( + LevelDataRef::Materialized(&levels).value_count(levels.len(), max_def), + 4 + ); + } + #[test] fn test_uniform_def_levels_all_null() { // All-null column: def_level=0 (null) for every slot, no values written. diff --git a/parquet/tests/arrow_writer_layout.rs b/parquet/tests/arrow_writer_layout.rs index b9d997beb289..1c63a3144391 100644 --- a/parquet/tests/arrow_writer_layout.rs +++ b/parquet/tests/arrow_writer_layout.rs @@ -19,14 +19,16 @@ use arrow::array::{Int32Array, StringArray}; use arrow::record_batch::RecordBatch; -use arrow_array::builder::{Int32Builder, ListBuilder}; +use arrow_array::builder::{BinaryBuilder, Int32Builder, ListBuilder}; +use arrow_array::types::Int32Type; +use arrow_array::{DictionaryArray, FixedSizeBinaryArray, StringViewArray}; use bytes::Bytes; use parquet::arrow::ArrowWriter; use parquet::arrow::arrow_reader::{ArrowReaderOptions, ParquetRecordBatchReaderBuilder}; use parquet::basic::{Encoding, PageType}; use parquet::file::metadata::PageIndexPolicy; use parquet::file::metadata::ParquetMetaData; -use parquet::file::properties::{ReaderProperties, WriterProperties}; +use parquet::file::properties::{EnabledStatistics, ReaderProperties, WriterProperties}; use parquet::file::reader::SerializedPageReader; use parquet::schema::types::ColumnPath; use std::sync::Arc; @@ -408,16 +410,16 @@ fn test_string() { columns: vec![ColumnChunk { pages: vec![ Page { - rows: 130, + rows: 126, page_header_size: 38, - compressed_size: 138, + compressed_size: 114, encoding: Encoding::RLE_DICTIONARY, page_type: PageType::DATA_PAGE, }, Page { - rows: 1250, + rows: 1254, page_header_size: 40, - compressed_size: 10000, + compressed_size: 10032, encoding: Encoding::PLAIN, page_type: PageType::DATA_PAGE, }, @@ -429,10 +431,16 @@ fn test_string() { page_type: PageType::DATA_PAGE, }, ], + // The byte-budget chunker sub-batches the dictionary + // phase. The mini-batch deliberately includes the value + // that crosses the 1000-byte limit so the spill triggers + // on this chunk rather than carrying a sliver into the + // next page, giving a 126-row dictionary page at 1008 + // bytes. dictionary_page: Some(Page { - rows: 130, + rows: 126, page_header_size: 38, - compressed_size: 1040, + compressed_size: 1008, encoding: Encoding::PLAIN, page_type: PageType::DICTIONARY_PAGE, }), @@ -599,3 +607,344 @@ fn test_per_column_data_page_size_limit() { assert_eq!(col_a_page_count, 16); assert_eq!(col_b_page_count, 1); } + +#[test] +fn test_fixed_size_binary() { + // FixedSizeBinary values larger than the data page byte limit. + let value_size = 1024usize; + let num_rows = 64usize; + let values: Vec = (0..num_rows) + .flat_map(|i| vec![i as u8; value_size]) + .collect(); + let array = + Arc::new(FixedSizeBinaryArray::try_new(value_size as i32, values.into(), None).unwrap()) + as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(4096) + .set_write_page_header_statistics(true) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + // 12 pages of 5 values (5 * 1024 = 5120 B, the boundary + // value pushes each page just past the 4096 B limit) plus + // a final page with the remaining 4 values. + pages: (0..12) + .map(|_| Page { + rows: 5, + page_header_size: 157, + compressed_size: 5120, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }) + .chain(std::iter::once(Page { + rows: 4, + page_header_size: 157, + compressed_size: 4096, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + })) + .collect(), + dictionary_page: None, + }], + }], + }, + }); +} + +#[test] +fn test_dictionary() { + // Arrow `DictionaryArray` input. + let num_rows = 2000; + let dict_values = StringArray::from_iter_values(["alpha", "beta", "gamma", "delta"]); + let keys = Int32Array::from_iter_values((0..num_rows).map(|i| i % 4)); + let array = + Arc::new(DictionaryArray::::try_new(keys, Arc::new(dict_values)).unwrap()) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + + let props = WriterProperties::builder() + .set_dictionary_enabled(true) + .set_dictionary_page_size_limit(1000) + .set_data_page_size_limit(1000) + .set_write_batch_size(10) + .set_write_page_header_statistics(true) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: vec![Page { + rows: 2000, + page_header_size: 40, + compressed_size: 505, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }], + dictionary_page: Some(Page { + rows: 4, + page_header_size: 38, + compressed_size: 35, + encoding: Encoding::PLAIN, + page_type: PageType::DICTIONARY_PAGE, + }), + }], + }], + }, + }); +} + +#[test] +fn test_large_string() { + // Large `Utf8` values (64 KiB each) with a 16 KiB data page limit. + // + // Each value already exceeds the page byte budget, so the byte-budget + // chunker in `ByteArrayEncoder` (the offsets-buffer scan in + // `count_within_budget_offsets`) cuts one value per page instead of + // buffering the whole ~2 MiB column into a single page. This drives the + // real `ArrowWriter` user path; the lower-level column writer is covered + // by `test_column_writer_caps_page_size_for_large_byte_array_values`. + let value_size = 64 * 1024; + let strings: Vec = (0..32).map(|_| "x".repeat(value_size)).collect(); + let array = Arc::new(StringArray::from(strings)) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + // Disable statistics so page headers stay small and the layout is + // determined purely by the page-splitting logic under test. + .set_statistics_enabled(EnabledStatistics::None) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + // One 64 KiB value per page (4-byte length prefix + value). + pages: (0..32) + .map(|_| Page { + rows: 1, + page_header_size: 21, + compressed_size: 65540, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }) + .collect(), + dictionary_page: None, + }], + }], + }, + }); +} + +#[test] +fn test_large_string_view() { + // Same bytes and expected layout as `test_large_string`, but the input + // is a `Utf8View` array. View arrays expose no contiguous offsets + // buffer, so the arrow writer bounds pages via the view-specific scan + // (`count_within_budget_views`, reading each value's length from the + // low 32 bits of its view word) rather than the offsets scan. This + // confirms that path caps pages identically. + let value_size = 64 * 1024; + let strings: Vec = (0..32).map(|_| "x".repeat(value_size)).collect(); + let array = Arc::new(StringViewArray::from_iter_values( + strings.iter().map(|s| s.as_str()), + )) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + .set_statistics_enabled(EnabledStatistics::None) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: (0..32) + .map(|_| Page { + rows: 1, + page_header_size: 21, + compressed_size: 65540, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }) + .collect(), + dictionary_page: None, + }], + }], + }, + }); +} + +#[test] +fn test_large_values_in_list() { + // `list` with large leaf values, driving the record-by-record + // (Materialized-rep) arm of the byte-budget chunker through the arrow + // path. Because repetition levels are present, a list element's leaves + // can never span pages, so the chunker steps from one `rep == 0` + // boundary to the next. Three records of three 32 KiB blobs each, with + // a 16 KiB page limit, yield exactly one page per record (a whole + // record's ~96 KiB of leaves stays together even though it blows the + // budget — it cannot be split). The raw-writer analogue is + // `test_column_writer_caps_page_size_for_large_values_in_list`. + let value_size = 32 * 1024; + let mut builder = ListBuilder::new(BinaryBuilder::new()); + let mut byte = 0u8; + for _ in 0..3 { + for _ in 0..3 { + builder.values().append_value(vec![byte; value_size]); + byte = byte.wrapping_add(1); + } + builder.append(true); + } + let array = Arc::new(builder.finish()) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + .set_statistics_enabled(EnabledStatistics::None) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + // One record (3 leaves + rep/def levels) per page. + pages: (0..3) + .map(|_| Page { + rows: 1, + page_header_size: 21, + compressed_size: 98328, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }) + .collect(), + dictionary_page: None, + }], + }], + }, + }); +} + +#[test] +fn test_nullable_large_values() { + // Nullable `Utf8` column alternating null / 32 KiB value. The byte + // budget must count only the non-null values (the def-level value + // count), not the level count — otherwise the estimate would be wrong + // for sparse columns. With a 16 KiB page limit each 32 KiB value still + // gets its own page (carrying its adjacent leading null), giving 16 + // two-row pages. Mirrors the raw-writer + // `test_column_writer_caps_page_size_with_nullable_large_values`. + let value_size = 32 * 1024; + let big = "x".repeat(value_size); + let values: Vec> = (0..32) + .map(|i| if i % 2 == 1 { Some(big.clone()) } else { None }) + .collect(); + let array = Arc::new(StringArray::from(values)) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + let props = WriterProperties::builder() + .set_dictionary_enabled(false) + .set_data_page_size_limit(16 * 1024) + .set_statistics_enabled(EnabledStatistics::None) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + // Each page holds one null + one 32 KiB value (2 rows). + pages: (0..16) + .map(|_| Page { + rows: 2, + page_header_size: 21, + compressed_size: 32778, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + }) + .collect(), + dictionary_page: None, + }], + }], + }, + }); +} + +#[test] +fn test_dictionary_spill_large_values() { + // Dictionary encoding is enabled, but each value is large (64 KiB) and + // unique, so the dictionary spills almost immediately (1 KiB dict page + // limit). After the spill, plain encoding takes over and the byte-budget + // sub-batch bounds each page to a single value. The first value is + // interned into the dictionary page (one RLE_DICTIONARY data page + // referencing it); the remaining 31 fall back to PLAIN, one per page. + // Mirrors `test_column_writer_dict_enabled_large_values_post_spill`, + // exercising the same dict→plain transition via the arrow path. + let value_size = 64 * 1024; + let strings: Vec = (0..32) + .map(|i| format!("{i:05}") + &"x".repeat(value_size - 5)) + .collect(); + let array = Arc::new(StringArray::from(strings)) as _; + let batch = RecordBatch::try_from_iter([("col", array)]).unwrap(); + let props = WriterProperties::builder() + .set_dictionary_enabled(true) + // Tiny dict limit so it spills after the first (already oversized) + // value, leaving the rest to exercise the post-spill plain path. + .set_dictionary_page_size_limit(1024) + .set_data_page_size_limit(16 * 1024) + .set_write_batch_size(4) + .set_statistics_enabled(EnabledStatistics::None) + .build(); + + do_test(LayoutTest { + props, + batches: vec![batch], + layout: Layout { + row_groups: vec![RowGroup { + columns: vec![ColumnChunk { + pages: std::iter::once(Page { + // The single value interned before the dict spilled. + rows: 1, + page_header_size: 17, + compressed_size: 2, + encoding: Encoding::RLE_DICTIONARY, + page_type: PageType::DATA_PAGE, + }) + .chain((0..31).map(|_| Page { + // Post-spill plain-encoded values, one per page. + rows: 1, + page_header_size: 21, + compressed_size: 65540, + encoding: Encoding::PLAIN, + page_type: PageType::DATA_PAGE, + })) + .collect(), + dictionary_page: Some(Page { + rows: 1, + page_header_size: 21, + compressed_size: 65540, + encoding: Encoding::PLAIN, + page_type: PageType::DICTIONARY_PAGE, + }), + }], + }], + }, + }); +}