-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Convert JSON to VariantArray without copying (8 - 32% faster) #7911
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
fce19eb
236fb5e
23be455
99ea0c4
9920006
65714e5
f23e029
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,7 @@ | |
| use crate::VariantArray; | ||
| use arrow::array::{ArrayRef, BinaryViewArray, BinaryViewBuilder, NullBufferBuilder, StructArray}; | ||
| use arrow_schema::{DataType, Field, Fields}; | ||
| use parquet_variant::{Variant, VariantBuilder}; | ||
| use parquet_variant::{ListBuilder, ObjectBuilder, Variant, VariantBuilder, VariantBuilderExt}; | ||
| use std::sync::Arc; | ||
|
|
||
| /// A builder for [`VariantArray`] | ||
|
|
@@ -37,13 +37,13 @@ use std::sync::Arc; | |
| /// ## Example: | ||
| /// ``` | ||
| /// # use arrow::array::Array; | ||
| /// # use parquet_variant::{Variant, VariantBuilder}; | ||
| /// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt}; | ||
| /// # use parquet_variant_compute::VariantArrayBuilder; | ||
| /// // Create a new VariantArrayBuilder with a capacity of 100 rows | ||
| /// let mut builder = VariantArrayBuilder::new(100); | ||
| /// // append variant values | ||
| /// builder.append_variant(Variant::from(42)); | ||
| /// // append a null row | ||
| /// // append a null row (note not a Variant::Null) | ||
| /// builder.append_null(); | ||
| /// // append a pre-constructed metadata and value buffers | ||
| /// let (metadata, value) = { | ||
|
|
@@ -55,9 +55,14 @@ use std::sync::Arc; | |
| /// }; | ||
| /// builder.append_variant_buffers(&metadata, &value); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The new approach is more efficient. Do we still need to have Looks like the new
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It does seem preferable to have only one good way of doing things, rather than leaving a less efficient other way?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. removed in 99ea0c4 |
||
| /// | ||
| /// // Use `variant_builder` method to write values directly to the output array | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is the key new API -- a builder that can write directly to the correct output array location |
||
| /// let mut vb = builder.variant_builder(); | ||
| /// vb.append_value("Hello, World!"); | ||
| /// vb.finish(); // Note: call finish to write the variant to the buffers | ||
| /// | ||
| /// // create the final VariantArray | ||
| /// let variant_array = builder.build(); | ||
| /// assert_eq!(variant_array.len(), 3); | ||
| /// assert_eq!(variant_array.len(), 4); | ||
| /// // // Access the values | ||
| /// // row 1 is not null and is an integer | ||
| /// assert!(!variant_array.is_null(0)); | ||
|
|
@@ -67,6 +72,9 @@ use std::sync::Arc; | |
| /// // row 2 is not null and is an object | ||
| /// assert!(!variant_array.is_null(2)); | ||
| /// assert!(variant_array.value(2).as_object().is_some()); | ||
| /// // row 3 is a string | ||
| /// assert!(!variant_array.is_null(3)); | ||
| /// assert_eq!(variant_array.value(3), Variant::from("Hello, World!")); | ||
| /// ``` | ||
| #[derive(Debug)] | ||
| pub struct VariantArrayBuilder { | ||
|
|
@@ -147,11 +155,9 @@ impl VariantArrayBuilder { | |
|
|
||
| /// Append the [`Variant`] to the builder as the next row | ||
| pub fn append_variant(&mut self, variant: Variant) { | ||
| // TODO make this more efficient by avoiding the intermediate buffers | ||
| let mut variant_builder = VariantBuilder::new(); | ||
| variant_builder.append_value(variant); | ||
| let (metadata, value) = variant_builder.finish(); | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The whole point of this PR is to avoid this copy here and instead write directly into the output |
||
| self.append_variant_buffers(&metadata, &value); | ||
| let mut direct_builder = self.variant_builder(); | ||
| direct_builder.variant_builder.append_value(variant); | ||
| direct_builder.finish() | ||
| } | ||
|
|
||
| /// Append a metadata and values buffer to the builder | ||
|
|
@@ -168,7 +174,171 @@ impl VariantArrayBuilder { | |
| self.value_buffer.extend_from_slice(value); | ||
| } | ||
|
|
||
| // TODO: Return a Variant builder that will write to the underlying buffers (TODO) | ||
| /// Return a `VariantArrayVariantBuilder` that writes directly to the | ||
| /// buffers of this builder. | ||
| /// | ||
| /// You must call [`VariantArrayVariantBuilder::finish`] to complete the builder | ||
| /// | ||
| /// # Example | ||
| /// ``` | ||
| /// # use parquet_variant::{Variant, VariantBuilder, VariantBuilderExt}; | ||
| /// # use parquet_variant_compute::{VariantArray, VariantArrayBuilder}; | ||
| /// let mut array_builder = VariantArrayBuilder::new(10); | ||
| /// | ||
| /// // First row has a string | ||
| /// let mut variant_builder = array_builder.variant_builder(); | ||
| /// variant_builder.append_value("Hello, World!"); | ||
| /// // must call finish to write the variant to the buffers | ||
| /// variant_builder.finish(); | ||
| /// | ||
| /// // Second row is an object | ||
| /// let mut variant_builder = array_builder.variant_builder(); | ||
| /// variant_builder | ||
| /// .new_object() | ||
| /// .with_field("my_field", 42i64) | ||
| /// .finish() | ||
| /// .unwrap(); | ||
| /// variant_builder.finish(); | ||
| /// | ||
| /// // finalize the array | ||
| /// let variant_array: VariantArray = array_builder.build(); | ||
| /// | ||
| /// // verify what we wrote is still there | ||
| /// assert_eq!(variant_array.value(0), Variant::from("Hello, World!")); | ||
| /// assert!(variant_array.value(1).as_object().is_some()); | ||
| /// ``` | ||
| pub fn variant_builder(&mut self) -> VariantArrayVariantBuilder { | ||
| // append directly into the metadata and value buffers | ||
| let metadata_buffer = std::mem::take(&mut self.metadata_buffer); | ||
| let value_buffer = std::mem::take(&mut self.value_buffer); | ||
| VariantArrayVariantBuilder::new(self, metadata_buffer, value_buffer) | ||
| } | ||
| } | ||
|
|
||
| /// A `VariantBuilder` that writes directly to the buffers of a `VariantArrayBuilder`. | ||
|
alamb marked this conversation as resolved.
Outdated
|
||
| /// | ||
| /// Note this struct implements [`VariantBuilderExt`], so it can be used | ||
| /// as a drop-in replacement for [`VariantBuilder`] in most cases. | ||
|
alamb marked this conversation as resolved.
Outdated
|
||
| /// | ||
| /// If [`Self::finish`] is not called, any changes will be rolled back | ||
| /// | ||
| /// See [`VariantArrayBuilder::variant_builder`] for an example | ||
| pub struct VariantArrayVariantBuilder<'a> { | ||
| /// was finish called? | ||
| finished: bool, | ||
| /// starting offset in the variant_builder's `metadata` buffer | ||
| metadata_offset: usize, | ||
| /// starting offset in the variant_builder's `value` buffer | ||
| value_offset: usize, | ||
| /// Parent array builder that this variant builder writes to. Buffers | ||
| /// have been moved into the variant builder, and must be returned on | ||
| /// drop | ||
| array_builder: &'a mut VariantArrayBuilder, | ||
| /// Builder for the in progress variant value, temporarily owns the buffers | ||
| /// from `array_builder` | ||
| variant_builder: VariantBuilder, | ||
| } | ||
|
|
||
| impl<'a> VariantBuilderExt for VariantArrayVariantBuilder<'a> { | ||
| fn append_value<'m, 'v>(&mut self, value: impl Into<Variant<'m, 'v>>) { | ||
| self.variant_builder.append_value(value); | ||
| } | ||
|
|
||
| fn new_list(&mut self) -> ListBuilder { | ||
| self.variant_builder.new_list() | ||
| } | ||
|
|
||
| fn new_object(&mut self) -> ObjectBuilder { | ||
| self.variant_builder.new_object() | ||
| } | ||
| } | ||
|
|
||
| impl<'a> VariantArrayVariantBuilder<'a> { | ||
| /// Constructs a new VariantArrayVariantBuilder | ||
| /// | ||
| /// Note this is not public as this is a structure that is logically | ||
| /// part of the [`VariantArrayBuilder`] and relies on its internal structure | ||
| fn new( | ||
| array_builder: &'a mut VariantArrayBuilder, | ||
| metadata_buffer: Vec<u8>, | ||
| value_buffer: Vec<u8>, | ||
| ) -> Self { | ||
| let metadata_offset = metadata_buffer.len(); | ||
| let value_offset = value_buffer.len(); | ||
| VariantArrayVariantBuilder { | ||
| finished: false, | ||
| metadata_offset, | ||
| value_offset, | ||
| variant_builder: VariantBuilder::new_with_buffers(metadata_buffer, value_buffer), | ||
| array_builder, | ||
| } | ||
| } | ||
|
|
||
| /// Return a reference to the underlying `VariantBuilder` | ||
| pub fn inner(&self) -> &VariantBuilder { | ||
| &self.variant_builder | ||
| } | ||
|
|
||
| /// Return a mutable reference to the underlying `VariantBuilder` | ||
| pub fn inner_mut(&mut self) -> &mut VariantBuilder { | ||
| &mut self.variant_builder | ||
| } | ||
|
|
||
| /// Called to finish the in progress variant and write it to the underlying | ||
| /// buffers | ||
| /// | ||
| /// Note if you do not call finish, on drop any changes made to the | ||
| /// underlying buffers will be rolled back. | ||
| pub fn finish(mut self) { | ||
| self.finished = true; | ||
| // Note: buffers are returned and replaced in the drop impl | ||
| } | ||
| } | ||
|
|
||
| impl<'a> Drop for VariantArrayVariantBuilder<'a> { | ||
| /// If the builder was not finished, roll back any changes made to the | ||
| /// underlying buffers (by truncating them) | ||
| fn drop(&mut self) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I really like this approach. I was thinking over the weekend that we may want to rework the other builders to follow a similar approach:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That is an excellent point
That sounds like a great way to avoid the extra allocation
This is also a great idea 🤯
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As a follow up, @klion26 has a PR up to implement this:
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That other PR is nice improvement, but the In order to not shift bytes at all, we'd have to pre-allocate exactly the right number of header bytes before recursing into the field values. And then the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the only way to do this is add some API in the ObjectBuilder somehow to pre-allocate this space ( |
||
| let metadata_offset = self.metadata_offset; | ||
| let value_offset = self.value_offset; | ||
|
|
||
| // get the buffers back from the variant builder | ||
| let (mut metadata_buffer, mut value_buffer) = | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Cool, we can transfer the ownership with this way. |
||
| std::mem::take(&mut self.variant_builder).finish(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Recall the discussion in the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am on the fence about this. My natural inclination would also be to put finishing logic in As long as it's infallible, and gated by a finished check, I don't think it makes any meaningful difference? We need to track the finished flag either way, in order for drop to roll changes back correctly when finish wasn't called. Which was the problem previously -- unconditionally finishing on drop, even if the failure to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One potential reason to move the finish logic out of Tho I guess if there were any observable side effects like that, the corresponding
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
@scovich 's rationale about the symmetry is what lead me to this approach. However I agree moving the code out of drop is a good idea and I will do it I looked briefly into doing this -- one challenge I found is that there is no API currently to get back the underlying buffer from a VariantBuilder (aka the equivalent of into_inner() or something). I can add this.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤔 I played around with it and I agree it was a good change Among other things this prevents the metadata builder from writing bytes into the metadata builder just to have to roll them back
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done in 65714e5 |
||
|
|
||
| // Sanity Check: if the buffers got smaller, something went wrong (previous data was lost) | ||
| let metadata_len = metadata_buffer | ||
| .len() | ||
| .checked_sub(metadata_offset) | ||
| .expect("metadata length decreased unexpectedly"); | ||
| let value_len = value_buffer | ||
| .len() | ||
| .checked_sub(value_offset) | ||
| .expect("value length decreased unexpectedly"); | ||
|
Comment on lines
+313
to
+328
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These assertions should be impossible to trigger, no? And if anyway the result is a panic, is the unchecked integer underflow panic somehow worse than a failed expect? (if we think these could actually trigger in practice, that seems like a reason to move the checks to a fallible
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (meanwhile, we should probably move those inside the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, they should be "impossible" to trigger
I think an integer underflow only panics in debug builds on rust (they silently underflow in release builds) If the buffers have somehow been truncated prior to where they started, I think we should panic as soon as possible as something is seriously wrong / there is a serious bug somewhere. |
||
|
|
||
| if self.finished { | ||
| // if the object was finished, commit the changes by putting the | ||
| // offsets and lengths into the parent array builder. | ||
| self.array_builder | ||
| .metadata_locations | ||
| .push((metadata_offset, metadata_len)); | ||
| self.array_builder | ||
| .value_locations | ||
| .push((value_offset, value_len)); | ||
| self.array_builder.nulls.append_non_null(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm, these (sanity check, location updates) are finish-related operations, why don't put int |
||
| } else { | ||
| // if the object was not finished, truncate the buffers to the | ||
| // original offsets to roll back any changes. Note this is fast | ||
| // because truncate doesn't free any memory: it just has to drop | ||
| // elements (and u8 doesn't have a destructor) | ||
| metadata_buffer.truncate(metadata_offset); | ||
| value_buffer.truncate(value_offset); | ||
| } | ||
|
|
||
| // put the buffers back into the array builder | ||
| self.array_builder.metadata_buffer = metadata_buffer; | ||
| self.array_builder.value_buffer = value_buffer; | ||
| } | ||
| } | ||
|
|
||
| fn binary_view_array_from_buffers( | ||
|
|
@@ -220,4 +390,91 @@ mod test { | |
| ); | ||
| } | ||
| } | ||
|
|
||
| /// Test using sub builders to append variants | ||
| #[test] | ||
| fn test_variant_array_builder_variant_builder() { | ||
| let mut builder = VariantArrayBuilder::new(10); | ||
| builder.append_null(); // should not panic | ||
| builder.append_variant(Variant::from(42i32)); | ||
|
|
||
| // let's make a sub-object in the next row | ||
| let mut sub_builder = builder.variant_builder(); | ||
| sub_builder | ||
| .new_object() | ||
| .with_field("foo", "bar") | ||
| .finish() | ||
| .unwrap(); | ||
| sub_builder.finish(); // must call finish to write the variant to the buffers | ||
|
|
||
| // append a new list | ||
| let mut sub_builder = builder.variant_builder(); | ||
| sub_builder | ||
| .new_list() | ||
| .with_value(Variant::from(1i32)) | ||
| .with_value(Variant::from(2i32)) | ||
| .finish(); | ||
| sub_builder.finish(); | ||
| let variant_array = builder.build(); | ||
|
|
||
| assert_eq!(variant_array.len(), 4); | ||
| assert!(variant_array.is_null(0)); | ||
| assert!(!variant_array.is_null(1)); | ||
| assert_eq!(variant_array.value(1), Variant::from(42i32)); | ||
| assert!(!variant_array.is_null(2)); | ||
| let variant = variant_array.value(2); | ||
| let variant = variant.as_object().expect("variant to be an object"); | ||
| assert_eq!(variant.get("foo").unwrap(), Variant::from("bar")); | ||
| assert!(!variant_array.is_null(3)); | ||
| let variant = variant_array.value(3); | ||
| let list = variant.as_list().expect("variant to be a list"); | ||
| assert_eq!(list.len(), 2); | ||
| } | ||
|
|
||
| /// Test using non-finished sub builders to append variants | ||
| #[test] | ||
| fn test_variant_array_builder_variant_builder_reset() { | ||
| let mut builder = VariantArrayBuilder::new(10); | ||
|
|
||
| // make a sub-object in the first row | ||
| let mut sub_builder = builder.variant_builder(); | ||
| sub_builder | ||
| .new_object() | ||
| .with_field("foo", 1i32) | ||
| .finish() | ||
| .unwrap(); | ||
| sub_builder.finish(); // must call finish to write the variant to the buffers | ||
|
|
||
| // start appending an object but don't finish | ||
| let mut sub_builder = builder.variant_builder(); | ||
| sub_builder | ||
| .new_object() | ||
| .with_field("bar", 2i32) | ||
| .finish() | ||
| .unwrap(); | ||
| drop(sub_builder); // drop the sub builder without finishing it | ||
|
|
||
| // make a third sub-object (this should reset the previous unfinished object) | ||
| let mut sub_builder = builder.variant_builder(); | ||
| sub_builder | ||
| .new_object() | ||
| .with_field("baz", 3i32) | ||
| .finish() | ||
| .unwrap(); | ||
| sub_builder.finish(); // must call finish to write the variant to the buffers | ||
|
|
||
| let variant_array = builder.build(); | ||
|
|
||
| // only the two finished objects should be present | ||
| assert_eq!(variant_array.len(), 2); | ||
| assert!(!variant_array.is_null(0)); | ||
| let variant = variant_array.value(0); | ||
| let variant = variant.as_object().expect("variant to be an object"); | ||
| assert_eq!(variant.get("foo").unwrap(), Variant::from(1i32)); | ||
|
|
||
| assert!(!variant_array.is_null(1)); | ||
| let variant = variant_array.value(1); | ||
| let variant = variant.as_object().expect("variant to be an object"); | ||
| assert_eq!(variant.get("baz").unwrap(), Variant::from(3i32)); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The whole point if this PR is to avoid this copy / append