From 3aac60b0cd488b6ee296564f3495e89f92fe3d48 Mon Sep 17 00:00:00 2001 From: pchintar <89355405+pchintar@users.noreply.github.com> Date: Fri, 22 May 2026 11:35:51 +0100 Subject: [PATCH] perf(parquet): Improve cache locality in BYTE_STREAM_SPLIT decoding --- .../decoding/byte_stream_split_decoder.rs | 45 ++++++++++++++----- 1 file changed, 35 insertions(+), 10 deletions(-) diff --git a/parquet/src/encodings/decoding/byte_stream_split_decoder.rs b/parquet/src/encodings/decoding/byte_stream_split_decoder.rs index b72ae8f62c34..68cbed3c6cd7 100644 --- a/parquet/src/encodings/decoding/byte_stream_split_decoder.rs +++ b/parquet/src/encodings/decoding/byte_stream_split_decoder.rs @@ -47,18 +47,32 @@ impl ByteStreamSplitDecoder { // Here we assume src contains the full data (which it must, since we're // can only know where to split the streams once all data is collected), // but dst can be just a slice starting from the given index. -// We iterate over the output bytes and fill them in from their strided -// input byte locations. fn join_streams_const( src: &[u8], dst: &mut [u8], stride: usize, values_decoded: usize, ) { - let sub_src = &src[values_decoded..]; - for i in 0..dst.len() / TYPE_SIZE { - for j in 0..TYPE_SIZE { - dst[i * TYPE_SIZE + j] = sub_src[i + j * stride]; + // Process values in blocks to improve cache locality when rebuilding + // values from the split byte streams. + const BLOCK: usize = 32; + + let values = dst.len() / TYPE_SIZE; + let src = &src[values_decoded..]; + + for base in (0..values).step_by(BLOCK) { + // Handle the final partial block without branching in the inner loop. + let len = (values - base).min(BLOCK); + + for byte_idx in 0..TYPE_SIZE { + let src_start = byte_idx * stride + base; + let src_block = &src[src_start..src_start + len]; + + // Read contiguous bytes from each byte stream and write them + // back into the original value layout. + for (idx, value) in src_block.iter().copied().enumerate() { + dst[(base + idx) * TYPE_SIZE + byte_idx] = value; + } } } } @@ -71,10 +85,21 @@ fn join_streams_variable( type_size: usize, values_decoded: usize, ) { - let sub_src = &src[values_decoded..]; - for i in 0..dst.len() / type_size { - for j in 0..type_size { - dst[i * type_size + j] = sub_src[i + j * stride]; + const BLOCK: usize = 32; + + let values = dst.len() / type_size; + let src = &src[values_decoded..]; + + for base in (0..values).step_by(BLOCK) { + let len = (values - base).min(BLOCK); + + for byte_idx in 0..type_size { + let src_start = byte_idx * stride + base; + let src_block = &src[src_start..src_start + len]; + + for (idx, value) in src_block.iter().copied().enumerate() { + dst[(base + idx) * type_size + byte_idx] = value; + } } } }