Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 180 additions & 1 deletion parquet/src/arrow/arrow_writer/byte_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ use crate::geospatial::statistics::GeospatialStatistics;
use crate::schema::types::ColumnDescPtr;
use crate::util::bit_util::num_required_bits;
use crate::util::interner::{Interner, Storage};
use arrow_array::types::ByteArrayType;
use arrow_array::{
Array, ArrayAccessor, BinaryArray, BinaryViewArray, DictionaryArray, FixedSizeBinaryArray,
LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
GenericByteArray, LargeBinaryArray, LargeStringArray, StringArray, StringViewArray,
};
use arrow_buffer::{ArrowNativeType, Buffer};
use arrow_schema::DataType;

macro_rules! downcast_dict_impl {
Expand Down Expand Up @@ -481,6 +483,84 @@ impl ColumnValueEncoder for ByteArrayEncoder {
Ok(())
}

fn count_values_within_byte_budget_gather(
values: &Self::Values,
indices: &[usize],
byte_budget: usize,
) -> Option<usize> {
// `ByteArrayEncoder` only ever writes via `write_gather`, so this
// is the relevant method.
//
// Two-stage walk for the simple offset-buffer byte array types:
// 1. If indices are contiguous, compute the total payload in
// O(1) via a single subtraction on the offsets buffer.
// When the total fits the budget — the overwhelmingly
// common "small values" case — return immediately.
// 2. Otherwise, walk per-value byte sizes from the offsets
// buffer (still cheap, no slice/UTF-8 construction) and
// exit at the first value that pushes the cumulative sum
// past the budget. This bounds skewed distributions: an
// outlier value is caught wherever it lands in the chunk.
let count = match values.data_type() {
DataType::Utf8 => count_within_budget_offsets(
values.as_any().downcast_ref::<StringArray>().unwrap(),
indices,
byte_budget,
),
DataType::LargeUtf8 => count_within_budget_offsets(
values.as_any().downcast_ref::<LargeStringArray>().unwrap(),
indices,
byte_budget,
),
DataType::Binary => count_within_budget_offsets(
values.as_any().downcast_ref::<BinaryArray>().unwrap(),
indices,
byte_budget,
),
DataType::LargeBinary => count_within_budget_offsets(
values.as_any().downcast_ref::<LargeBinaryArray>().unwrap(),
indices,
byte_budget,
),
// View arrays carry each value's length in the low 32 bits of
// its u128 view word, so lengths are scannable without touching
// any data buffer — and the common small-value case skips even
// that scan via an O(1) conservative bound.
DataType::Utf8View => {
let array = values.as_any().downcast_ref::<StringViewArray>().unwrap();
count_within_budget_views(
array.views(),
indices,
byte_budget,
max_view_value_len(array.data_buffers()),
)
}
DataType::BinaryView => {
let array = values.as_any().downcast_ref::<BinaryViewArray>().unwrap();
count_within_budget_views(
array.views(),
indices,
byte_budget,
max_view_value_len(array.data_buffers()),
)
}
// The values in an arrow dictionary are already small and
// deduplicated, so there is nothing to bound — treat every
// chunk as fitting and stay on the batched path. (A per-value
// walk through dict keys on every chunk also measured ~+30-80%
// slower than `main`.)
DataType::Dictionary(_, _) => indices.len(),
// Every byte-array type `ByteArrayEncoder` is constructed for
// has an explicit arm above. A `Dictionary(value = FixedSizeBinary)`
// column hits the `Dictionary(_, _)` arm (its `values.data_type()`
// is `Dictionary`), and a bare `FixedSizeBinary` column is routed
// to the generic column writer, never this encoder — so no other
// type can reach here.
data_type => unreachable!("ByteArrayEncoder cannot be constructed for {data_type:?}"),
};
Some(count)
}

fn num_values(&self) -> usize {
match &self.dict_encoder {
Some(encoder) => encoder.indices.len(),
Expand Down Expand Up @@ -593,6 +673,105 @@ where
}
}

/// Upper bound on any single value's byte length in a view array.
fn max_view_value_len(buffers: &[Buffer]) -> usize {
/// Bytes that fit inline in a u128 view word (the rest is len + prefix).
const MAX_INLINE_VIEW_LEN: usize = 12;
// An out-of-line view's data is a contiguous slice of exactly one data
// buffer, so it cannot exceed the largest buffer; inline views hold at
// most `MAX_INLINE_VIEW_LEN`. Loose (a value is usually far smaller than
// a whole buffer) but O(number of buffers) and always sound.
buffers
.iter()
.map(|b| b.len())
.max()
.unwrap_or(0)
.max(MAX_INLINE_VIEW_LEN)
}

/// Number of leading `indices` whose cumulative plain-encoded size fits
/// `byte_budget` (boundary value included), for view arrays (`Utf8View`,
/// `BinaryView`).
fn count_within_budget_views(
views: &[u128],
indices: &[usize],
byte_budget: usize,
max_value_len: usize,
) -> usize {
// Each plain-encoded BYTE_ARRAY value carries a 4-byte length prefix, so
// the budget is compared against `value_len + size_of::<u32>()` — the
// bytes actually written to the page, not just the payload.
//
// Stage 1: O(1) conservative bound. View arrays have no prefix-sum
// offsets buffer, so the exact span subtraction used by
// `count_within_budget_offsets` is unavailable; instead bound every
// value by `max_value_len`. Skips the walk for the common small-value
// case (what view arrays are built for, and where there is nothing to
// bound).
let per_value = max_value_len + std::mem::size_of::<u32>();
if indices.len().saturating_mul(per_value) <= byte_budget {
return indices.len();
}
// Stage 2: exact per-value scan, reading each length from the low 32
// bits of its u128 view word (no data-buffer dereference).
let mut cum: usize = 0;
for (i, idx) in indices.iter().enumerate() {
let len = (views[*idx] as u32) as usize;
cum = cum.saturating_add(len + std::mem::size_of::<u32>());
Comment thread
adriangb marked this conversation as resolved.
if cum > byte_budget {
return i + 1;
}
}
indices.len()
}

/// Number of leading `indices` whose cumulative plain-encoded size fits
/// `byte_budget` (boundary value included), for offset-buffer byte arrays
/// (`Utf8`/`LargeUtf8`/`Binary`/`LargeBinary`).
///
/// `indices` are assumed sorted ascending — they always are here, since
/// they come from `non_null_indices`, which is built in array order.
fn count_within_budget_offsets<T: ByteArrayType>(
values: &GenericByteArray<T>,
indices: &[usize],
byte_budget: usize,
) -> usize {
if indices.is_empty() {
return 0;
}
let n = indices.len();
let first = indices[0];
let last = indices[n - 1];
let offsets = values.value_offsets();
// Each plain-encoded value carries a 4-byte length prefix on the page.
let prefix_overhead = std::mem::size_of::<u32>();

// Stage 1: O(1) span upper bound. The span `offsets[last+1] -
// offsets[first]` covers every array position in `[first, last]`, a
// superset of `indices` — and the skipped positions in a nullable
// column are nulls with zero offset delta, so the span still equals the
// exact payload. If it fits the budget, every value fits. Covers the
// common small-value case for both non-null and (sparse) nullable
// columns.
if last >= first {
let payload = (offsets[last + 1] - offsets[first]).as_usize();
if payload + n * prefix_overhead <= byte_budget {
return n;
}
}

// Stage 2: scan per-index lengths from the offsets buffer.
let mut cum: usize = 0;
for (i, idx) in indices.iter().enumerate() {
let len = (offsets[idx + 1] - offsets[*idx]).as_usize() + prefix_overhead;
cum = cum.saturating_add(len);
if cum > byte_budget {
return i + 1;
}
}
n
}

/// Computes the min and max for the provided array and indices
///
/// This is a free function so it can be used with `downcast_op!`
Expand Down
118 changes: 118 additions & 0 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4904,6 +4904,124 @@ mod tests {
assert_eq!(get_dict_page_size(col1_meta), 1024 * 1024 * 4);
}

#[test]
fn test_arrow_writer_granular_mode_roundtrip() {
// Granular mode subdivides chunks and writes more pages than
// `main`. Make sure the data we write back is bit-identical to
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments comparing to main are confusing I think as once this PR merges it will become main

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix

// what went in — page-count assertions elsewhere only prove
// pages were cut, not that the encoded data is correct.
//
// Mix value sizes so that the cumulative-byte-budget cutoff
// lands mid-chunk, exercising both batched and granular paths
// within the same `write_batch_internal` call.
let small = "tiny".to_string();
let big = "x".repeat(64 * 1024);
let strings: Vec<String> = (0..256)
.map(|i| {
if i % 16 == 0 {
big.clone()
} else {
small.clone()
}
})
.collect();

let schema = Arc::new(Schema::new(vec![Field::new(
"col",
ArrowDataType::Utf8,
false,
)]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(strings.clone())) as _],
Comment thread
adriangb marked this conversation as resolved.
)
.unwrap();

let props = WriterProperties::builder()
.set_dictionary_enabled(false)
.set_data_page_size_limit(16 * 1024)
.build();
let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
writer.write(&batch).unwrap();
let data = Bytes::from(writer.into_inner().unwrap());

let mut reader = ParquetRecordBatchReader::try_new(data, 1024).unwrap();
let read = reader.next().unwrap().unwrap();
assert!(reader.next().is_none(), "expected one batch");
let col = read
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!(col.len(), strings.len());
for (i, expected) in strings.iter().enumerate() {
assert_eq!(
col.value(i),
expected.as_str(),
"value mismatch at index {i}"
);
}
}

#[test]
fn test_arrow_writer_all_null_string_column() {
// The `LevelDataRef::value_count` Uniform branch with
// `value != max_def` (entirely-null chunk) must return 0 so the
// sub-batch sizer short-circuits to batch mode without trying
// to estimate byte budgets for non-existent values.
let num_rows = 1024;
let schema = Arc::new(Schema::new(vec![Field::new(
"col",
ArrowDataType::Utf8,
true,
)]));
let nulls: Vec<Option<&str>> = vec![None; num_rows];
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(StringArray::from(nulls)) as _],
)
.unwrap();

let props = WriterProperties::builder()
.set_dictionary_enabled(false)
.set_data_page_size_limit(16 * 1024)
.build();
let mut writer = ArrowWriter::try_new(Vec::new(), schema, Some(props)).unwrap();
writer.write(&batch).unwrap();
let data = Bytes::from(writer.into_inner().unwrap());

// Re-parse the file: row group has one column, every row is
// null, all data pages report `num_rows / page_count` rows.
let mut metadata = ParquetMetaDataReader::new();
metadata.try_parse(&data).unwrap();
let metadata = metadata.finish().unwrap();
let row_group = metadata.row_group(0);
let col_meta = row_group.column(0);
assert_eq!(row_group.num_rows() as usize, num_rows);
// Statistics record `null_count = num_rows` — proves every value
// was written as null.
if let Some(stats) = col_meta.statistics() {
assert_eq!(
stats.null_count_opt().unwrap_or(0) as usize,
num_rows,
"expected all-null column to report null_count = num_rows"
);
}

let mut reader =
SerializedPageReader::new(Arc::new(data.clone()), col_meta, num_rows, None).unwrap();
let mut total_values = 0u32;
while let Some(page) = reader.get_next_page().unwrap() {
if matches!(page, Page::DataPage { .. } | Page::DataPageV2 { .. }) {
total_values += page.num_values();
}
}
assert_eq!(
total_values as usize, num_rows,
"expected every level position to be represented in some page"
);
}

struct WriteBatchesShape {
num_batches: usize,
rows_per_batch: usize,
Expand Down
Loading
Loading