Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 214 additions & 0 deletions arrow-select/src/interleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ pub fn interleave(
DataType::Struct(fields) => interleave_struct(fields, values, indices),
DataType::List(field) => interleave_list::<i32>(values, indices, field),
DataType::LargeList(field) => interleave_list::<i64>(values, indices, field),
DataType::FixedSizeList(field, size) => interleave_fixed_size_list(values, indices, field, *size),
DataType::Map(field, ordered) => interleave_map(values, indices, field, *ordered),
DataType::RunEndEncoded(r, _) => match r.data_type() {
DataType::Int16 => interleave_run_end::<Int16Type>(values, indices),
DataType::Int32 => interleave_run_end::<Int32Type>(values, indices),
Expand Down Expand Up @@ -419,6 +421,77 @@ fn interleave_list<O: OffsetSizeTrait>(
Ok(Arc::new(list_array))
}

fn interleave_fixed_size_list(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might reusing an optimization with #10025 . Otherwise it might get slower in some case

values: &[&dyn Array],
indices: &[(usize, usize)],
field: &FieldRef,
size: i32,
) -> Result<ArrayRef, ArrowError> {
let interleaved = Interleave::<'_, FixedSizeListArray>::new(values, indices);
let value_length = size as usize;

let mut child_indices = Vec::with_capacity(indices.len() * value_length);
for &(array, row) in indices {
let offset = row * value_length;
child_indices.extend((offset..offset + value_length).map(|i| (array, i)));
}

let child_arrays: Vec<&dyn Array> = interleaved
.arrays
.iter()
.map(|a| a.values().as_ref())
.collect();

let interleaved_values = interleave(&child_arrays, &child_indices)?;

let array = FixedSizeListArray::new(field.clone(), size, interleaved_values, interleaved.nulls);
Ok(Arc::new(array))
}

fn interleave_map(
values: &[&dyn Array],
indices: &[(usize, usize)],
field: &FieldRef,
ordered: bool,
) -> Result<ArrayRef, ArrowError> {
let interleaved = Interleave::<'_, MapArray>::new(values, indices);

let mut capacity = 0usize;
let mut offsets = Vec::with_capacity(indices.len() + 1);
offsets.push(0i32);
for &(array, row) in indices {
let o = interleaved.arrays[array].value_offsets();
let element_len = (o[row + 1] - o[row]) as usize;
capacity += element_len;
offsets
.push(i32::try_from(capacity).map_err(|_| ArrowError::OffsetOverflowError(capacity))?);
}

let mut child_indices = Vec::with_capacity(capacity);
for &(array, row) in indices {
let o = interleaved.arrays[array].value_offsets();
let start = o[row] as usize;
let end = o[row + 1] as usize;
child_indices.extend((start..end).map(|i| (array, i)));
}

let entries_arrays: Vec<&dyn Array> = interleaved
.arrays
.iter()
.map(|m| m.entries() as &dyn Array)
.collect();
let interleaved_entries = interleave(&entries_arrays, &child_indices)?;

let offsets = OffsetBuffer::new(offsets.into());
let entries = interleaved_entries
.as_any()
.downcast_ref::<StructArray>()
.unwrap()
.clone();
let array = MapArray::new(field.clone(), offsets, entries, interleaved.nulls, ordered);
Ok(Arc::new(array))
}

/// Specialized [`interleave`] for [`RunArray`].
fn interleave_run_end<R: RunEndIndexType>(
values: &[&dyn Array],
Expand Down Expand Up @@ -1847,4 +1920,145 @@ mod tests {
&[3]
);
}

#[test]
fn test_interleave_fixed_size_list() {
// a: [[1, 2], [3, 4], [5, 6]]
let field = Arc::new(Field::new("item", DataType::Int32, false));
let a = FixedSizeListArray::new(
field.clone(),
2,
Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6])),
None,
);
// b: [[7, 8], [9, 10]]
let b = FixedSizeListArray::new(
field.clone(),
2,
Arc::new(Int32Array::from(vec![7, 8, 9, 10])),
None,
);

let result = interleave(&[&a, &b], &[(0, 2), (1, 0), (0, 0), (1, 1), (0, 1)]).unwrap();
let result = result.as_fixed_size_list();
assert_eq!(result.len(), 5);
assert_eq!(result.value_length(), 2);

let values = result.values().as_primitive::<Int32Type>();
// [[5,6], [7,8], [1,2], [9,10], [3,4]]
assert_eq!(values.values(), &[5, 6, 7, 8, 1, 2, 9, 10, 3, 4]);
}

#[test]
fn test_interleave_fixed_size_list_with_nulls() {
let field = Arc::new(Field::new("item", DataType::Int32, true));
// a: [[1, 2], null, [5, 6]]
let a = FixedSizeListArray::new(
field.clone(),
2,
Arc::new(Int32Array::from(vec![1, 2, 0, 0, 5, 6])),
Some(NullBuffer::from(&[true, false, true])),
);
// b: [null, [9, 10]]
let b = FixedSizeListArray::new(
field.clone(),
2,
Arc::new(Int32Array::from(vec![0, 0, 9, 10])),
Some(NullBuffer::from(&[false, true])),
);

let result = interleave(&[&a, &b], &[(0, 0), (0, 1), (1, 0), (1, 1), (0, 2)]).unwrap();
let result = result.as_fixed_size_list();
assert_eq!(result.len(), 5);

let validity: Vec<bool> = result.nulls().unwrap().iter().collect();
assert_eq!(validity, &[true, false, false, true, true]);
}

#[test]
fn test_interleave_map() {
use arrow_array::builder::MapBuilder;
use arrow_array::builder::StringBuilder;

// a: [{k1: 1, k2: 2}, {k3: 3}]
let mut a_builder = MapBuilder::new(None, StringBuilder::new(), Int32Builder::new());
a_builder.keys().append_value("k1");
a_builder.values().append_value(1);
a_builder.keys().append_value("k2");
a_builder.values().append_value(2);
a_builder.append(true).unwrap();
a_builder.keys().append_value("k3");
a_builder.values().append_value(3);
a_builder.append(true).unwrap();
let a = a_builder.finish();

// b: [{k4: 4}, {k5: 5, k6: 6, k7: 7}]
let mut b_builder = MapBuilder::new(None, StringBuilder::new(), Int32Builder::new());
b_builder.keys().append_value("k4");
b_builder.values().append_value(4);
b_builder.append(true).unwrap();
b_builder.keys().append_value("k5");
b_builder.values().append_value(5);
b_builder.keys().append_value("k6");
b_builder.values().append_value(6);
b_builder.keys().append_value("k7");
b_builder.values().append_value(7);
b_builder.append(true).unwrap();
let b = b_builder.finish();

let result = interleave(&[&a, &b], &[(1, 0), (0, 0), (0, 1), (1, 1)]).unwrap();
let result = result.as_map();
assert_eq!(result.len(), 4);

// Row 0: {k4: 4}
let row0 = result.value(0);
assert_eq!(row0.len(), 1);
assert_eq!(row0.column(0).as_string::<i32>().value(0), "k4");
assert_eq!(row0.column(1).as_primitive::<Int32Type>().value(0), 4);

// Row 1: {k1: 1, k2: 2}
let row1 = result.value(1);
assert_eq!(row1.len(), 2);

// Row 2: {k3: 3}
let row2 = result.value(2);
assert_eq!(row2.len(), 1);
assert_eq!(row2.column(0).as_string::<i32>().value(0), "k3");

// Row 3: {k5: 5, k6: 6, k7: 7}
let row3 = result.value(3);
assert_eq!(row3.len(), 3);
}

#[test]
fn test_interleave_map_with_nulls() {
use arrow_array::builder::MapBuilder;
use arrow_array::builder::StringBuilder;

// a: [{k1: 1}, null]
let mut a_builder = MapBuilder::new(None, StringBuilder::new(), Int32Builder::new());
a_builder.keys().append_value("k1");
a_builder.values().append_value(1);
a_builder.append(true).unwrap();
a_builder.append(false).unwrap();
let a = a_builder.finish();

// b: [null, {k2: 2}]
let mut b_builder = MapBuilder::new(None, StringBuilder::new(), Int32Builder::new());
b_builder.append(false).unwrap();
b_builder.keys().append_value("k2");
b_builder.values().append_value(2);
b_builder.append(true).unwrap();
let b = b_builder.finish();

let result = interleave(&[&a, &b], &[(0, 0), (1, 0), (0, 1), (1, 1)]).unwrap();
let result = result.as_map();
assert_eq!(result.len(), 4);

let validity: Vec<bool> = result.nulls().unwrap().iter().collect();
assert_eq!(validity, &[true, false, false, true]);

assert_eq!(result.value(0).len(), 1);
assert_eq!(result.value(3).len(), 1);
}
}
10 changes: 10 additions & 0 deletions arrow/benches/interleave_kernels.rs
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know should I extract benchmark to a separate pr ( containing this and concat benchmark?)

Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ fn add_benchmark(c: &mut Criterion) {
)
};

let fsl_i64 = create_primitive_fixed_size_list_array::<Int64Type>(8192, 0.0, 0.0, 5);
let fsl_i64_nulls = create_primitive_fixed_size_list_array::<Int64Type>(8192, 0.1, 0.1, 5);

let map_str_i64 = create_string_map_array::<Int64Type>(8192, 0.0, 5, 8);
let map_str_i64_nulls = create_string_map_array::<Int64Type>(8192, 0.1, 5, 8);

let ree_run_ends = Int32Array::from_iter_values((1..=64).map(|i| i * 16));
let ree_i64_values = create_primitive_array::<Int64Type>(64, 0.0);
let ree_i64 = RunArray::<Int32Type>::try_new(&ree_run_ends, &ree_i64_values).unwrap();
Expand Down Expand Up @@ -183,6 +189,10 @@ fn add_benchmark(c: &mut Criterion) {
("list_view<i64>(0.1,0.1,20)", &list_view_i64),
("list_view<i64>(0.0,0.0,20)", &list_view_i64_no_nulls),
("list_view_overlapping<i64>(80x,20)", &list_view_overlapping),
("fixed_size_list<i64,5>(0.0,0.0)", &fsl_i64),
("fixed_size_list<i64,5>(0.1,0.1)", &fsl_i64_nulls),
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I can also support large fsl, like vector<1024, f32>

("map<utf8,i64>(0.0,5,8)", &map_str_i64),
("map<utf8,i64>(0.1,5,8)", &map_str_i64_nulls),
("ree_i32<i64>(64 runs)", &ree_i64),
("ree_i32<dict<u32,utf8>>(64 runs)", &ree_dict),
];
Expand Down
84 changes: 82 additions & 2 deletions arrow/src/util/bench_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,19 @@
use crate::array::*;
use crate::datatypes::*;
use crate::util::test_util::seedable_rng;
use arrow_buffer::{Buffer, IntervalMonthDayNano};
use arrow_buffer::{Buffer, IntervalMonthDayNano, NullBuffer};
use arrow_schema::Field;
use half::f16;
use rand::Rng;
use rand::SeedableRng;
use rand::distr::uniform::SampleUniform;
use rand::rng;
use rand::{
distr::{Alphanumeric, Distribution, StandardUniform},
distr::{Alphanumeric, Distribution, SampleString, StandardUniform},
prelude::StdRng,
};
use std::ops::Range;
use std::sync::Arc;

/// Creates an random (but fixed-seeded) array of a given size and null density
pub fn create_primitive_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
Expand Down Expand Up @@ -871,3 +873,81 @@ pub fn create_f64_array_with_seed(size: usize, nan_density: f32, seed: u64) -> F
})
.collect()
}

/// Create a FixedSizeList array of primitive values
///
/// Arguments:
/// - `size`: number of fixed-size lists in the array
/// - `null_density`: density of nulls in the fixed-size list array (row-level nulls)
/// - `value_null_density`: density of nulls in the primitive values inside each list
/// - `list_size`: fixed size of each list element
pub fn create_primitive_fixed_size_list_array<T>(
size: usize,
null_density: f32,
value_null_density: f32,
list_size: i32,
) -> FixedSizeListArray
where
T: ArrowPrimitiveType,
StandardUniform: Distribution<T::Native>,
{
let mut rng = seedable_rng();
let list_size_usize = list_size as usize;
let values: PrimitiveArray<T> = (0..size * list_size_usize)
.map(|_| {
if rng.random::<f32>() < value_null_density {
None
} else {
Some(rng.random())
}
})
.collect();
let field = Arc::new(Field::new("item", T::DATA_TYPE, value_null_density > 0.0));
let nulls = (null_density > 0.0).then(|| {
NullBuffer::new(arrow_buffer::BooleanBuffer::collect_bool(size, |_| {
rng.random::<f32>() >= null_density
}))
});
FixedSizeListArray::new(field, list_size, Arc::new(values), nulls)
}

/// Create a Map array with string keys and primitive values
///
/// Arguments:
/// - `size`: number of map entries in the array
/// - `null_density`: density of nulls in the map array (row-level nulls)
/// - `max_map_size`: maximum number of key-value pairs per map entry
/// (actual size is random between 0 and max_map_size)
/// - `key_len`: length of each random string key
pub fn create_string_map_array<T>(
size: usize,
null_density: f32,
max_map_size: usize,
key_len: usize,
) -> MapArray
where
T: ArrowPrimitiveType,
StandardUniform: Distribution<T::Native>,
{
let mut rng = seedable_rng();
let mut builder = builder::MapBuilder::new(
None,
builder::StringBuilder::new(),
builder::PrimitiveBuilder::<T>::new(),
);
for _ in 0..size {
if rng.random::<f32>() < null_density {
builder.append(false).unwrap();
} else {
let n = rng.random_range(0..=max_map_size);
for _ in 0..n {
builder
.keys()
.append_value(Alphanumeric.sample_string(&mut rng, key_len));
builder.values().append_value(rng.random());
}
builder.append(true).unwrap();
}
}
builder.finish()
}
Loading