Skip to content
Merged
46 changes: 46 additions & 0 deletions parquet-variant-compute/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
name = "parquet-variant-compute"
# This package is still in development and thus the version does
# not follow the versions of the rest of the crates in this repo.
version = "0.1.0"
license = { workspace = true }
description = "Apache Parquet Variant Batch Processing"
homepage = { workspace = true }
repository = { workspace = true }
authors = { workspace = true }
keywords = ["arrow", "parquet", "variant"]
readme = "README.md"
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb Let me know if I should create a readme.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a placeholder that says "part of arrow-rs" and what it contains is probably enough

I would basically follow along the model of other crates in the repo

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just removed the readme line like some other crates in the repo.

edition = { workspace = true }
# needs a newer version than workspace due to
# rror: `Option::<T>::unwrap` is not yet stable as a const fn
rust-version = "1.83"
Comment thread
harshmotw-db marked this conversation as resolved.


[dependencies]
arrow = { workspace = true }
arrow-schema = { workspace = true }
parquet-variant = { path = "../parquet-variant" }
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please make this "workspace=true" (otherwise cargo publish gets angsty)?

Suggested change
parquet-variant = { path = "../parquet-variant" }
parquet-variant = { workspace = true }

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

error: failed to parse manifest at `/Users/harsh.motwani/arrow-rs/parquet-variant-compute/Cargo.toml`

Caused by:
  error inheriting `parquet-variant` from workspace root manifest's `workspace.dependencies.parquet-variant`

Caused by:
  `dependency.parquet-variant` was not found in `workspace.dependencies`

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding parquet-variant = { path = "./parquet-variant" } to the root Cargo.toml gets it working, so that's what I will do for now.



[lib]
name = "parquet_variant_compute"
bench = false

[dev-dependencies]
136 changes: 136 additions & 0 deletions parquet-variant-compute/src/from_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use arrow::array::{
Array, ArrayRef, BinaryBuilder, BooleanBufferBuilder, StringArray, StructArray,
};
use arrow::buffer::NullBuffer;
use arrow::datatypes::{DataType, Field};
use arrow_schema::ArrowError;
use parquet_variant::{json_to_variant, VariantBuilder};

fn variant_arrow_repr() -> DataType {
let metadata_field = Field::new("metadata", DataType::Binary, true);
let value_field = Field::new("value", DataType::Binary, true);
let fields = vec![metadata_field, value_field];
DataType::Struct(fields.into())
}

pub fn batch_json_string_to_variant(input: &ArrayRef) -> Result<StructArray, ArrowError> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should definitely add some docs / example to this kernel

I also might suggest calling it cast_to_variant but that is more of a personal preference

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parse_json is a name we could consider for this. It's the name of the function that does this (json string to variant) in spark/databricks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine we'll eventually want a top-level cast_to_variant that converts strong types to variant? And passing a string array to such a function should produce a variant column full of string (or short-string) variant values?

This method here is json-parsing strings and casting the result to variant, not casting strings directly to variant?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree with @scovich. As for parse_json, it makes sense to name it that way if it is a well defined SQL function. However, from a library point-of-view, I think parse_json is too vague.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well technically, it would be parquet_variant_compute::parse_json, which is a little clearer? Maybe we should add some nested module structure like arrow-compute does, so we get e.g. parquet_variant_compute::variant::parse_json. People would use that as variant::parse_json or parse_json if they prefer parsimony?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can call it cast_json_to_variant and cast_variant_to_json 🤔

let input_string_array = match input.as_any().downcast_ref::<StringArray>() {
Some(string_array) => Ok(string_array),
None => Err(ArrowError::CastError(
"Expected reference to StringArray as input".into(),
)),
}?;
Comment on lines +42 to +47
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use ok_or_else?

Suggested change
let input_string_array = match input.as_any().downcast_ref::<StringArray>() {
Some(string_array) => Ok(string_array),
None => Err(ArrowError::CastError(
"Expected reference to StringArray as input".into(),
)),
}?;
let input_string_array = input
.as_any()
.downcast_ref::<StringArray>()
.ok_or_else(|| ArrowError::CastError("Expected a StringArray as input".into()))?;


let mut metadata_builder = BinaryBuilder::new();
let mut value_builder = BinaryBuilder::new();
let mut validity = BooleanBufferBuilder::new(input.len());
for i in 0..input.len() {
if input.is_null(i) {
metadata_builder.append_null();
value_builder.append_null();
validity.append(false);
} else {
let mut vb = VariantBuilder::new();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will pattern will cause the variant values to be copied twice -- once into the builder's buffers and then once into the output binary builder, which is probably ok for the first version;

With some care I think we will be able to avoid copying the values, though it will take using the lower level APIs (and building offsets directly)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, so just to be clear the two copies you are referring to are metadata_builder.append_value(&metadata); and metadata_builder.finish() right? If so, I'll take care of it in this PR itself.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I followed that one -- maybe it's a second issue?

I was just referring to all the try_new_with_metadata calls. Today VariantMetadata is Copy, so it's easy to forget that each such call is quietly copying an ~80 byte object. That cost could perhaps add up when iterating over hundreds/thousands of child objects? Or maybe the compiler is really good at making it ~free, since really only one meaningfully exists at a time?

Copy link
Copy Markdown
Contributor Author

@harshmotw-db harshmotw-db Jul 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, would this resolve the issue you're talking about:

pub fn try_new(metadata: &'m [u8], value: &'v [u8]) -> Result<Self, ArrowError> {
      // let metadata = VariantMetadata::try_new(metadata)?;
      Self::try_new_with_metadata(VariantMetadata::try_new(metadata)?, value)
  }

Edit: Oh no never mind. There are more copies downstream. I suppose that is more of a library issue that can potentially be fixed separately

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have reduced copying in this function by manually constructing binary buffers.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above would definitely not help, because each VariantMetadata::try_new would re-validate the same byte buffer!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, so just to be clear the two copies you are referring to are metadata_builder.append_value(&metadata); and metadata_builder.finish() right? If so, I'll take care of it in this PR itself.

What I was really imagining was updating VariantBuilder so it could take a pre-existing buffer (Vec) and append to it, rather than writing into a new buffer and then copying that into the output bytes.

json_to_variant(input_string_array.value(i), &mut vb)?;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please add not a happy-path test, for example, with invalid json, so this function should fail and return some well-known message to a user?

let (metadata, value) = vb.finish();
metadata_builder.append_value(&metadata);
value_builder.append_value(&value);
validity.append(true);
}
}
let struct_fields: Vec<ArrayRef> = vec![
Arc::new(metadata_builder.finish()),
Arc::new(value_builder.finish()),
];
let variant_fields = match variant_arrow_repr() {
DataType::Struct(fields) => fields,
_ => unreachable!("variant_arrow_repr is hard-coded and must match the expected schema"),
};
Comment on lines +108 to +111
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should variant_arrow_repr just return Fields instead of DataType? Then we don't have to unpack it.

It's also a 4-line method with a single call site (3 lines if we don't convert it to a DataType, so we might also consider removing the method entirely? Could also make the relationship with the StructArray more explicit by something like:

Suggested change
let variant_fields = match variant_arrow_repr() {
DataType::Struct(fields) => fields,
_ => unreachable!("variant_arrow_repr is hard-coded and must match the expected schema"),
};
let metadata_field = Field::new("metadata", metadata_array.data_type(), false);
let value_field = Field::new("value", value_array.data_type(), false);
let variant_fields = vec![metadata_field, value_field].into();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should variant_arrow_repr just return Fields instead of DataType? Then we don't have to unpack it.

Yes, please

let null_buffer = NullBuffer::new(validity.finish());
Ok(StructArray::new(
variant_fields,
struct_fields,
Some(null_buffer),
))
}

#[cfg(test)]
mod test {
use crate::batch_json_string_to_variant;
use arrow::array::{Array, ArrayRef, BinaryArray, StringArray};
use arrow_schema::ArrowError;
use parquet_variant::{Variant, VariantBuilder};
use std::sync::Arc;

#[test]
fn test_batch_json_string_to_variant() -> Result<(), ArrowError> {
let input = StringArray::from(vec![
Some("1"),
None,
Some("{\"a\": 32}"),
Some("null"),
None,
]);
Comment on lines +130 to +136
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we please cover more cases here?
I mean all the types that JSON supports. I see you already added int, nulls, and simple dict here with string and int.

If it's test for happy-path can we please add:

  • default values (e.g, 0 for int because some engines can represent NULL as a default value).
  • booleans (both true/false)
  • more nested json, not only 1-level nested json, e.g. "{{{{true: false}, "-1": "+1"}, 0: 1}}"
  • some long strings to ensure we don't have any string size-based logic

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default values

Not quite sure what you mean by "default values" or how an engine's NULL handling relates to string (json) -> variant parsing?

"{{{{true: false}, "-1": "+1"}, 0: 1}}"

I'm pretty sure JSON objects requires string field names?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gotocoding-DB The batch functions in this PR just run some underlying scalar functions on a whole batch of data. The underlying scalar functions have been validated on all sorts of inputs (this PR). I don't think the logical breadth of JSON test cases needs to be tested again.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree full coverage here is redundant -- maybe we can add a comment that says "full json parsing coverage is handled by the tests for json_to_variant"

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@harshmotw-db You're right. If the internally used parsing function is tested, we don't need to duplicate the tests here. I'm personally using tests as another example of function usage (if it's not fully covered in function docs).

let array_ref: ArrayRef = Arc::new(input);
let output = batch_json_string_to_variant(&array_ref).unwrap();

let struct_array = &output;
let metadata_array = struct_array
.column(0)
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();
let value_array = struct_array
.column(1)
.as_any()
.downcast_ref::<BinaryArray>()
.unwrap();

assert_eq!(struct_array.is_null(0), false);
assert_eq!(struct_array.is_null(1), true);
assert_eq!(struct_array.is_null(2), false);
assert_eq!(struct_array.is_null(3), false);
assert_eq!(struct_array.is_null(4), true);

assert_eq!(metadata_array.value(0), &[1, 0, 0]);
assert_eq!(value_array.value(0), &[12, 1]);

{
let mut vb = VariantBuilder::new();
let mut ob = vb.new_object();
ob.insert("a", Variant::Int8(32));
ob.finish()?;
let (object_metadata, object_value) = vb.finish();
assert_eq!(metadata_array.value(2), &object_metadata);
assert_eq!(value_array.value(2), &object_value);
}

assert_eq!(metadata_array.value(3), &[1, 0, 0]);
assert_eq!(value_array.value(3), &[0]);

assert!(metadata_array.is_null(1));
assert!(value_array.is_null(1));
assert!(metadata_array.is_null(4));
assert!(value_array.is_null(4));
Ok(())
}
}
27 changes: 27 additions & 0 deletions parquet-variant-compute/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

mod from_json;
mod to_json;

/// Parse a batch of JSON strings into a batch of Variants represented as
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically these comments would go on the function itself, not its pub use

/// STRUCT<metadata: BINARY, value: BINARY> where nulls are preserved. The JSON strings in the input
/// must be valid.
pub use from_json::batch_json_string_to_variant;
/// Transform a batch of Variant represented as STRUCT<metadata: BINARY, value: BINARY> to a batch
/// of JSON strings where nulls are preserved. The JSON strings in the input must be valid.
pub use to_json::batch_variant_to_json_string;
178 changes: 178 additions & 0 deletions parquet-variant-compute/src/to_json.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Module for transforming a batch of Variants represented as
//! STRUCT<metadata: BINARY, value: BINARY> into a batch of JSON strings.

use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray};
use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::DataType;
use arrow_schema::ArrowError;
use parquet_variant::{variant_to_json, Variant};

pub fn batch_variant_to_json_string(input: &ArrayRef) -> Result<StringArray, ArrowError> {
let struct_array = input
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| ArrowError::CastError("Expected StructArray as input".into()))?;

// Validate field types
let data_type = struct_array.data_type();
match data_type {
DataType::Struct(inner_fields) => {
if inner_fields.len() != 2
|| inner_fields[0].data_type() != &DataType::Binary
|| inner_fields[1].data_type() != &DataType::Binary
{
return Err(ArrowError::CastError(
"Expected struct with two binary fields".into(),
));
}
}
_ => {
return Err(ArrowError::CastError(
"Expected StructArray with known fields".into(),
))
}
}

let metadata_array = struct_array
.column(0)
.as_any()
.downcast_ref::<BinaryArray>()
.ok_or_else(|| ArrowError::CastError("Expected BinaryArray for 'metadata'".into()))?;

let value_array = struct_array
.column(1)
.as_any()
.downcast_ref::<BinaryArray>()
.ok_or_else(|| ArrowError::CastError("Expected BinaryArray for 'value'".into()))?;

// Zero-copy builder
// The size per JSON string is assumed to be 128 bytes. If this holds true, resizing could be
// minimized to improve performance.
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I've tried to minimize copying here. Took help from AI figuring this out. Please see if this looks good to you.

let mut json_buffer: Vec<u8> = Vec::with_capacity(struct_array.len() * 128);
let mut offsets: Vec<i32> = Vec::with_capacity(struct_array.len() + 1);
let mut validity = BooleanBufferBuilder::new(struct_array.len());
let mut current_offset: i32 = 0;
offsets.push(current_offset);

for i in 0..struct_array.len() {
if struct_array.is_null(i) {
validity.append(false);
offsets.push(current_offset);
} else {
let metadata = metadata_array.value(i);
let value = value_array.value(i);
let variant = Variant::new(metadata, value);
let start_len = json_buffer.len();
variant_to_json(&mut json_buffer, &variant)?;
let written = (json_buffer.len() - start_len) as i32;
current_offset += written;
offsets.push(current_offset);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks good to me

validity.append(true);
}
}

let offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets));
let value_buffer = Buffer::from_vec(json_buffer);
let null_buffer = NullBuffer::new(validity.finish());

Ok(StringArray::new(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think if possible we should consider using StringViewArray here instead of StringArray

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know about that. According to this documentation, the string must be stored in the offsets itself if it is short (<= 12 bytes). But how would we know that in advance? I think StringArray can more readily be implemented in a zero-copy fashion.

offsets_buffer,
value_buffer,
Some(null_buffer),
))
}

#[cfg(test)]
mod test {
use crate::batch_variant_to_json_string;
use arrow::array::{Array, ArrayRef, BinaryBuilder, BooleanBufferBuilder, StructArray};
use arrow::buffer::NullBuffer;
use arrow::datatypes::DataType;
use arrow::datatypes::Field;
use arrow_schema::Fields;
use std::sync::Arc;

#[test]
fn test_batch_variant_to_json_string() {
let mut metadata_builder = BinaryBuilder::new();
let mut value_builder = BinaryBuilder::new();

// Row 0: [1, 0, 0], [12, 0]
metadata_builder.append_value(&[1, 0, 0]);
value_builder.append_value(&[12, 0]);

// Row 1: null
metadata_builder.append_null();
value_builder.append_null();

// Row 2: [1, 1, 0, 1, 97], [2, 1, 0, 0, 1, 32]
metadata_builder.append_value(&[1, 1, 0, 1, 97]);
value_builder.append_value(&[2, 1, 0, 0, 2, 12, 32]);

// Row 3: [1, 0, 0], [0]
metadata_builder.append_value(&[1, 0, 0]);
value_builder.append_value(&[0]);

// Row 4: null
metadata_builder.append_null();
value_builder.append_null();

let metadata_array = Arc::new(metadata_builder.finish()) as ArrayRef;
let value_array = Arc::new(value_builder.finish()) as ArrayRef;

let fields: Fields = vec![
Field::new("metadata", DataType::Binary, true),
Field::new("value", DataType::Binary, true),
]
.into();

let mut validity = BooleanBufferBuilder::new(value_array.len());
for i in 0..value_array.len() {
let is_valid = value_array.is_valid(i) && metadata_array.is_valid(i);
validity.append(is_valid);
}
let null_buffer = NullBuffer::new(validity.finish());

let struct_array = StructArray::new(
fields,
vec![metadata_array.clone(), value_array.clone()],
Some(null_buffer), // Null bitmap (let Arrow infer from children)
);

let input = Arc::new(struct_array) as ArrayRef;

let result = batch_variant_to_json_string(&input).unwrap();

// Expected output: ["0", null, "{\"a\":32}", "null", null]
let expected = vec![Some("0"), None, Some("{\"a\":32}"), Some("null"), None];

let result_vec: Vec<Option<&str>> = (0..result.len())
.map(|i| {
if result.is_null(i) {
None
} else {
Some(result.value(i))
}
})
.collect();

assert_eq!(result_vec, expected);
}
}
Loading