Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 180 additions & 11 deletions parquet/src/arrow/arrow_writer/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,15 @@ enum LevelInfoBuilder {
LevelContext, // Context
OffsetBuffer<i32>, // Offsets
Option<NullBuffer>, // Nulls
bool, // is_last_level (child has no nested rep)
),
/// A large list array
LargeList(
Box<LevelInfoBuilder>, // Child Values
LevelContext, // Context
OffsetBuffer<i64>, // Offsets
Option<NullBuffer>, // Nulls
bool, // is_last_level (child has no nested rep)
),
/// A fixed size list array
FixedSizeList(
Expand Down Expand Up @@ -223,22 +225,31 @@ impl LevelInfoBuilder {
DataType::List(_) => {
let list = array.as_list();
let child = Self::try_new(child.as_ref(), ctx, list.values())?;
let is_last = child.child_has_no_nested_rep();
let offsets = list.offsets().clone();
Self::List(Box::new(child), ctx, offsets, list.nulls().cloned())
Self::List(
Box::new(child),
ctx,
offsets,
list.nulls().cloned(),
is_last,
)
}
DataType::LargeList(_) => {
let list = array.as_list();
let child = Self::try_new(child.as_ref(), ctx, list.values())?;
let is_last = child.child_has_no_nested_rep();
let offsets = list.offsets().clone();
let nulls = list.nulls().cloned();
Self::LargeList(Box::new(child), ctx, offsets, nulls)
Self::LargeList(Box::new(child), ctx, offsets, nulls, is_last)
}
DataType::Map(_, _) => {
let map = array.as_map();
let entries = Arc::new(map.entries().clone()) as ArrayRef;
let child = Self::try_new(child.as_ref(), ctx, &entries)?;
let is_last = child.child_has_no_nested_rep();
let offsets = map.offsets().clone();
Self::List(Box::new(child), ctx, offsets, map.nulls().cloned())
Self::List(Box::new(child), ctx, offsets, map.nulls().cloned(), is_last)
}
DataType::FixedSizeList(_, size) => {
let list = array.as_fixed_size_list();
Expand Down Expand Up @@ -274,8 +285,8 @@ impl LevelInfoBuilder {
fn finish(self) -> Vec<ArrayLevels> {
match self {
LevelInfoBuilder::Primitive(v) => vec![v],
LevelInfoBuilder::List(v, _, _, _)
| LevelInfoBuilder::LargeList(v, _, _, _)
LevelInfoBuilder::List(v, _, _, _, _)
| LevelInfoBuilder::LargeList(v, _, _, _, _)
| LevelInfoBuilder::FixedSizeList(v, _, _, _)
| LevelInfoBuilder::ListView(v, _, _, _, _)
| LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(),
Expand All @@ -287,11 +298,11 @@ impl LevelInfoBuilder {
fn write(&mut self, range: Range<usize>) {
match self {
LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range),
LevelInfoBuilder::List(child, ctx, offsets, nulls) => {
Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
LevelInfoBuilder::List(child, ctx, offsets, nulls, is_last) => {
Self::write_list(child, ctx, offsets, nulls.as_ref(), range, *is_last)
}
LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => {
Self::write_list(child, ctx, offsets, nulls.as_ref(), range)
LevelInfoBuilder::LargeList(child, ctx, offsets, nulls, is_last) => {
Self::write_list(child, ctx, offsets, nulls.as_ref(), range, *is_last)
}
LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => {
Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range)
Expand All @@ -308,6 +319,19 @@ impl LevelInfoBuilder {
}
}

/// Returns `true` if the child contains no nested repetition levels, meaning
/// each child element produces exactly one rep_level entry in the leaf.
/// This is true for `Primitive` children and `Struct` trees with no list descendants.
fn child_has_no_nested_rep(&self) -> bool {
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be stored as an member of List element, avoiding querying nested.

match self {
LevelInfoBuilder::Primitive(_) => true,
LevelInfoBuilder::Struct(children, _, _) => {
children.iter().all(|c| c.child_has_no_nested_rep())
}
_ => false,
}
}

/// Write `range` elements from ListArray `array`
///
/// Note: MapArrays are `ListArray<i32>` under the hood and so are dispatched to this method
Expand All @@ -317,6 +341,7 @@ impl LevelInfoBuilder {
offsets: &[O],
nulls: Option<&NullBuffer>,
range: Range<usize>,
is_last_level: bool,
) {
// Fast path: entire list array is null; emit bulk null rep/def levels
if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) {
Expand All @@ -327,6 +352,18 @@ impl LevelInfoBuilder {
return;
}

// Fast path for "last-level list": when the child has no nested rep_levels,
// each child element produces exactly one rep_level entry. We can batch
// contiguous non-empty list slots into a single child.write() call, then
// fix up the rep_levels at list-slot boundaries using offsets directly.
//
// Kept as a separate function so the compiler can optimize write_list's
// hot loop independently (function body size affects codegen quality).
if is_last_level {
Self::write_list_last_level(child, ctx, offsets, nulls, range);
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't works well on the case of List<Struct<a: List<i32>, b:i32, ...>, once any child has repetition level, the performance would be hurted and cannot batching the write.

return;
}

let offsets = &offsets[range.start..range.end + 1];

let write_non_null_slice =
Expand Down Expand Up @@ -427,6 +464,138 @@ impl LevelInfoBuilder {
}
}

/// Optimized write path for lists whose child has no nested repetition levels.
///
/// When the child is a leaf (or a struct of leaves), each child element maps to
/// exactly one rep_level entry. This lets us batch contiguous non-empty list
/// slots into a single `child.write()` call, then stamp the list-start markers
/// at positions computed directly from offsets — avoiding per-slot `write` +
/// reverse-scan overhead.
fn write_list_last_level<O: OffsetSizeTrait>(
child: &mut LevelInfoBuilder,
ctx: &LevelContext,
offsets: &[O],
nulls: Option<&NullBuffer>,
range: Range<usize>,
) {
let null_offset = range.start;
let offsets = &offsets[range.start..range.end + 1];
let list_start_rep = ctx.rep_level - 1;

let emit_nulls = |child: &mut LevelInfoBuilder, count: usize| {
child.visit_leaves(|leaf| {
leaf.append_rep_level_run(list_start_rep, count);
leaf.append_def_level_run(ctx.def_level - 2, count);
});
};

let emit_empties = |child: &mut LevelInfoBuilder, count: usize| {
child.visit_leaves(|leaf| {
leaf.append_rep_level_run(list_start_rep, count);
leaf.append_def_level_run(ctx.def_level - 1, count);
});
};

let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: &[O]| {
debug_assert!(run_offsets.len() >= 2);
let values_start = run_offsets[0].as_usize();
let values_end = run_offsets[run_offsets.len() - 1].as_usize();
debug_assert!(values_end > values_start);

// Write all leaf values in one batch. Since the child has no nested
// rep, this emits (values_end - values_start) rep_levels all equal
// to ctx.rep_level (= "continuation within list").
child.write(values_start..values_end);

// The first element of each list slot needs rep_level =
// list_start_rep to mark a new list boundary. Because there's a 1:1
// mapping between child elements and rep_level entries, the position
// of each slot's first element is directly computable from offsets.
child.visit_leaves(|leaf| {
let rep_levels = leaf.rep_levels.materialize_mut().unwrap();
let batch_len = values_end - values_start;
let batch_base = rep_levels.len() - batch_len;

for slot_offset in run_offsets.iter().take(run_offsets.len() - 1) {
let list_start_pos = batch_base + (slot_offset.as_usize() - values_start);
rep_levels[list_start_pos] = list_start_rep;
}
});
};

// Classify each slot, detect run boundaries, flush on transition.
#[derive(Clone, Copy, PartialEq)]
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code here is much cleaner...

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

list_primitive_sparse_99pct_null/bloom_filter 1.00 10.9±0.05ms 3.4 GB/sec 1.19 13.0±0.04ms 2.8 GB/sec
list_primitive_sparse_99pct_null/cdc 1.00 22.7±0.11ms 1645.6 MB/sec 1.09 24.7±0.07ms 1510.1 MB/sec
list_primitive_sparse_99pct_null/default 1.00 10.6±0.08ms 3.5 GB/sec 1.21 12.8±0.65ms 2.9 GB/sec
list_primitive_sparse_99pct_null/parquet_2 1.00 10.6±0.12ms 3.4 GB/sec 1.19 12.7±0.02ms 2.9 GB/sec
list_primitive_sparse_99pct_null/zstd 1.00 12.5±0.11ms 2.9 GB/sec 1.16 14.5±0.03ms 2.5 GB/sec
list_primitive_sparse_99pct_null/zstd_parquet_2 1.00 10.8±0.08ms 3.4 GB/sec 1.18 12.8±0.03ms 2.8 GB/sec

I also reproduced this regression on my local env. Then I tried to change this back to the previous plain if-else chain flavor, and the regression seems gone. I wonder if this three-value enum breaks a tight if(false) loop that can be optimized/executed better 🥲

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, so this path can be optimized

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've make some enhancement, this might due to two state becomes three states, making all nulls / all empty a bit slower. I've do some boundary check optimization. After some optimization, it's still 15% slower. This is benchmark vs prev

list_primitive_nullable:    -43%  (2.03 ms)
list_primitive_non_null:    -56%  (1.03 ms)
list_primitive_99pct_null:  +13%  (194 µs,  +23 µs)
nested_list_nullable:       -21%  (3.85 ms)
list_struct_nullable:       -33%  (2.95 ms)

enum SlotKind {
Null,
Empty,
NonEmpty,
}

let num_slots = offsets.len() - 1;
if num_slots == 0 {
return;
}

macro_rules! classify {
($i:expr, $nulls:expr) => {
if !$nulls.is_valid($i + null_offset) {
SlotKind::Null
} else if offsets[$i] == offsets[$i + 1] {
SlotKind::Empty
} else {
SlotKind::NonEmpty
}
};
}

macro_rules! flush_run {
($kind:expr, $start:expr, $end:expr) => {
match $kind {
SlotKind::Null => emit_nulls(child, $end - $start),
SlotKind::Empty => emit_empties(child, $end - $start),
SlotKind::NonEmpty => emit_non_empty_run(child, &offsets[$start..$end + 1]),
}
};
}

match nulls {
Some(nulls) => {
let mut run_kind = classify!(0, nulls);
let mut run_start: usize = 0;
for i in 1..num_slots {
let kind = classify!(i, nulls);
if kind != run_kind {
flush_run!(run_kind, run_start, i);
run_kind = kind;
run_start = i;
}
}
flush_run!(run_kind, run_start, num_slots);
}
None => {
let mut run_kind = if offsets[0] == offsets[1] {
SlotKind::Empty
} else {
SlotKind::NonEmpty
};
let mut run_start: usize = 0;
for i in 1..num_slots {
let kind = if offsets[i] == offsets[i + 1] {
SlotKind::Empty
} else {
SlotKind::NonEmpty
};
if kind != run_kind {
flush_run!(run_kind, run_start, i);
run_kind = kind;
run_start = i;
}
}
flush_run!(run_kind, run_start, num_slots);
}
}
}

/// Write `range` elements from ListViewArray `array`
fn write_list_view<O: OffsetSizeTrait>(
child: &mut LevelInfoBuilder,
Expand Down Expand Up @@ -734,8 +903,8 @@ impl LevelInfoBuilder {
fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) {
match self {
LevelInfoBuilder::Primitive(info) => visit(info),
LevelInfoBuilder::List(c, _, _, _)
| LevelInfoBuilder::LargeList(c, _, _, _)
LevelInfoBuilder::List(c, _, _, _, _)
| LevelInfoBuilder::LargeList(c, _, _, _, _)
| LevelInfoBuilder::FixedSizeList(c, _, _, _)
| LevelInfoBuilder::ListView(c, _, _, _, _)
| LevelInfoBuilder::LargeListView(c, _, _, _, _) => c.visit_leaves(visit),
Expand Down
Loading