diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index 10f90f707c08..e577cf73a807 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -118,6 +118,7 @@ enum LevelInfoBuilder { LevelContext, // Context OffsetBuffer, // Offsets Option, // Nulls + bool, // is_last_level (child has no nested rep) ), /// A large list array LargeList( @@ -125,6 +126,7 @@ enum LevelInfoBuilder { LevelContext, // Context OffsetBuffer, // Offsets Option, // Nulls + bool, // is_last_level (child has no nested rep) ), /// A fixed size list array FixedSizeList( @@ -223,22 +225,31 @@ impl LevelInfoBuilder { DataType::List(_) => { let list = array.as_list(); let child = Self::try_new(child.as_ref(), ctx, list.values())?; + let is_last = child.child_has_no_nested_rep(); let offsets = list.offsets().clone(); - Self::List(Box::new(child), ctx, offsets, list.nulls().cloned()) + Self::List( + Box::new(child), + ctx, + offsets, + list.nulls().cloned(), + is_last, + ) } DataType::LargeList(_) => { let list = array.as_list(); let child = Self::try_new(child.as_ref(), ctx, list.values())?; + let is_last = child.child_has_no_nested_rep(); let offsets = list.offsets().clone(); let nulls = list.nulls().cloned(); - Self::LargeList(Box::new(child), ctx, offsets, nulls) + Self::LargeList(Box::new(child), ctx, offsets, nulls, is_last) } DataType::Map(_, _) => { let map = array.as_map(); let entries = Arc::new(map.entries().clone()) as ArrayRef; let child = Self::try_new(child.as_ref(), ctx, &entries)?; + let is_last = child.child_has_no_nested_rep(); let offsets = map.offsets().clone(); - Self::List(Box::new(child), ctx, offsets, map.nulls().cloned()) + Self::List(Box::new(child), ctx, offsets, map.nulls().cloned(), is_last) } DataType::FixedSizeList(_, size) => { let list = array.as_fixed_size_list(); @@ -274,8 +285,8 @@ impl LevelInfoBuilder { fn finish(self) -> Vec { match self { LevelInfoBuilder::Primitive(v) => vec![v], - LevelInfoBuilder::List(v, _, _, _) - | LevelInfoBuilder::LargeList(v, _, _, _) + LevelInfoBuilder::List(v, _, _, _, _) + | LevelInfoBuilder::LargeList(v, _, _, _, _) | LevelInfoBuilder::FixedSizeList(v, _, _, _) | LevelInfoBuilder::ListView(v, _, _, _, _) | LevelInfoBuilder::LargeListView(v, _, _, _, _) => v.finish(), @@ -287,11 +298,11 @@ impl LevelInfoBuilder { fn write(&mut self, range: Range) { match self { LevelInfoBuilder::Primitive(info) => Self::write_leaf(info, range), - LevelInfoBuilder::List(child, ctx, offsets, nulls) => { - Self::write_list(child, ctx, offsets, nulls.as_ref(), range) + LevelInfoBuilder::List(child, ctx, offsets, nulls, is_last) => { + Self::write_list(child, ctx, offsets, nulls.as_ref(), range, *is_last) } - LevelInfoBuilder::LargeList(child, ctx, offsets, nulls) => { - Self::write_list(child, ctx, offsets, nulls.as_ref(), range) + LevelInfoBuilder::LargeList(child, ctx, offsets, nulls, is_last) => { + Self::write_list(child, ctx, offsets, nulls.as_ref(), range, *is_last) } LevelInfoBuilder::FixedSizeList(child, ctx, size, nulls) => { Self::write_fixed_size_list(child, ctx, *size, nulls.as_ref(), range) @@ -308,6 +319,19 @@ impl LevelInfoBuilder { } } + /// Returns `true` if the child contains no nested repetition levels, meaning + /// each child element produces exactly one rep_level entry in the leaf. + /// This is true for `Primitive` children and `Struct` trees with no list descendants. + fn child_has_no_nested_rep(&self) -> bool { + match self { + LevelInfoBuilder::Primitive(_) => true, + LevelInfoBuilder::Struct(children, _, _) => { + children.iter().all(|c| c.child_has_no_nested_rep()) + } + _ => false, + } + } + /// Write `range` elements from ListArray `array` /// /// Note: MapArrays are `ListArray` under the hood and so are dispatched to this method @@ -317,6 +341,7 @@ impl LevelInfoBuilder { offsets: &[O], nulls: Option<&NullBuffer>, range: Range, + is_last_level: bool, ) { // Fast path: entire list array is null; emit bulk null rep/def levels if nulls.is_some_and(|nulls| nulls.null_count() == nulls.len()) { @@ -327,6 +352,18 @@ impl LevelInfoBuilder { return; } + // Fast path for "last-level list": when the child has no nested rep_levels, + // each child element produces exactly one rep_level entry. We can batch + // contiguous non-empty list slots into a single child.write() call, then + // fix up the rep_levels at list-slot boundaries using offsets directly. + // + // Kept as a separate function so the compiler can optimize write_list's + // hot loop independently (function body size affects codegen quality). + if is_last_level { + Self::write_list_last_level(child, ctx, offsets, nulls, range); + return; + } + let offsets = &offsets[range.start..range.end + 1]; let write_non_null_slice = @@ -427,6 +464,138 @@ impl LevelInfoBuilder { } } + /// Optimized write path for lists whose child has no nested repetition levels. + /// + /// When the child is a leaf (or a struct of leaves), each child element maps to + /// exactly one rep_level entry. This lets us batch contiguous non-empty list + /// slots into a single `child.write()` call, then stamp the list-start markers + /// at positions computed directly from offsets — avoiding per-slot `write` + + /// reverse-scan overhead. + fn write_list_last_level( + child: &mut LevelInfoBuilder, + ctx: &LevelContext, + offsets: &[O], + nulls: Option<&NullBuffer>, + range: Range, + ) { + let null_offset = range.start; + let offsets = &offsets[range.start..range.end + 1]; + let list_start_rep = ctx.rep_level - 1; + + let emit_nulls = |child: &mut LevelInfoBuilder, count: usize| { + child.visit_leaves(|leaf| { + leaf.append_rep_level_run(list_start_rep, count); + leaf.append_def_level_run(ctx.def_level - 2, count); + }); + }; + + let emit_empties = |child: &mut LevelInfoBuilder, count: usize| { + child.visit_leaves(|leaf| { + leaf.append_rep_level_run(list_start_rep, count); + leaf.append_def_level_run(ctx.def_level - 1, count); + }); + }; + + let emit_non_empty_run = |child: &mut LevelInfoBuilder, run_offsets: &[O]| { + debug_assert!(run_offsets.len() >= 2); + let values_start = run_offsets[0].as_usize(); + let values_end = run_offsets[run_offsets.len() - 1].as_usize(); + debug_assert!(values_end > values_start); + + // Write all leaf values in one batch. Since the child has no nested + // rep, this emits (values_end - values_start) rep_levels all equal + // to ctx.rep_level (= "continuation within list"). + child.write(values_start..values_end); + + // The first element of each list slot needs rep_level = + // list_start_rep to mark a new list boundary. Because there's a 1:1 + // mapping between child elements and rep_level entries, the position + // of each slot's first element is directly computable from offsets. + child.visit_leaves(|leaf| { + let rep_levels = leaf.rep_levels.materialize_mut().unwrap(); + let batch_len = values_end - values_start; + let batch_base = rep_levels.len() - batch_len; + + for slot_offset in run_offsets.iter().take(run_offsets.len() - 1) { + let list_start_pos = batch_base + (slot_offset.as_usize() - values_start); + rep_levels[list_start_pos] = list_start_rep; + } + }); + }; + + // Classify each slot, detect run boundaries, flush on transition. + #[derive(Clone, Copy, PartialEq)] + enum SlotKind { + Null, + Empty, + NonEmpty, + } + + let num_slots = offsets.len() - 1; + if num_slots == 0 { + return; + } + + macro_rules! classify { + ($i:expr, $nulls:expr) => { + if !$nulls.is_valid($i + null_offset) { + SlotKind::Null + } else if offsets[$i] == offsets[$i + 1] { + SlotKind::Empty + } else { + SlotKind::NonEmpty + } + }; + } + + macro_rules! flush_run { + ($kind:expr, $start:expr, $end:expr) => { + match $kind { + SlotKind::Null => emit_nulls(child, $end - $start), + SlotKind::Empty => emit_empties(child, $end - $start), + SlotKind::NonEmpty => emit_non_empty_run(child, &offsets[$start..$end + 1]), + } + }; + } + + match nulls { + Some(nulls) => { + let mut run_kind = classify!(0, nulls); + let mut run_start: usize = 0; + for i in 1..num_slots { + let kind = classify!(i, nulls); + if kind != run_kind { + flush_run!(run_kind, run_start, i); + run_kind = kind; + run_start = i; + } + } + flush_run!(run_kind, run_start, num_slots); + } + None => { + let mut run_kind = if offsets[0] == offsets[1] { + SlotKind::Empty + } else { + SlotKind::NonEmpty + }; + let mut run_start: usize = 0; + for i in 1..num_slots { + let kind = if offsets[i] == offsets[i + 1] { + SlotKind::Empty + } else { + SlotKind::NonEmpty + }; + if kind != run_kind { + flush_run!(run_kind, run_start, i); + run_kind = kind; + run_start = i; + } + } + flush_run!(run_kind, run_start, num_slots); + } + } + } + /// Write `range` elements from ListViewArray `array` fn write_list_view( child: &mut LevelInfoBuilder, @@ -734,8 +903,8 @@ impl LevelInfoBuilder { fn visit_leaves(&mut self, visit: impl Fn(&mut ArrayLevels) + Copy) { match self { LevelInfoBuilder::Primitive(info) => visit(info), - LevelInfoBuilder::List(c, _, _, _) - | LevelInfoBuilder::LargeList(c, _, _, _) + LevelInfoBuilder::List(c, _, _, _, _) + | LevelInfoBuilder::LargeList(c, _, _, _, _) | LevelInfoBuilder::FixedSizeList(c, _, _, _) | LevelInfoBuilder::ListView(c, _, _, _, _) | LevelInfoBuilder::LargeListView(c, _, _, _, _) => c.visit_leaves(visit),