-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Data: Enable Parquet variant shredding for Record writes #16370
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
67bfc1a
ce90bc0
03ff0d2
788a74c
ef24bfa
4f6cd1f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,89 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.data; | ||
|
|
||
| import java.util.List; | ||
| import java.util.Map; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.parquet.VariantShreddingAnalyzer; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.types.Types.NestedField; | ||
| import org.apache.iceberg.variants.Variant; | ||
| import org.apache.iceberg.variants.VariantValue; | ||
| import org.apache.parquet.schema.Type; | ||
|
|
||
| /** | ||
| * Variant shredding analyzer for generic {@link Record} types. | ||
| * | ||
| * <p>This analyzer extracts {@link Variant} values from {@link Record} objects and determines | ||
| * optimal shredding schemas by analyzing data distributions across buffered rows. The analyzer is | ||
| * used by Kafka Connect and other tools that work with generic Record types to enable automatic | ||
| * variant shredding for Parquet writes. | ||
| * | ||
| * <p>Shredding extracts frequently-occurring fields from variant data into typed Parquet columns | ||
| * for improved query performance while maintaining the full variant data in the raw value field. | ||
| */ | ||
| class RecordVariantShreddingAnalyzer extends VariantShreddingAnalyzer<Record, Void> { | ||
|
|
||
| /** | ||
| * For generic {@link Record} rows, top-level field order matches {@link Schema#columns()}. {@link | ||
| * #resolveColumnIndex} is unused ({@code Void} engine schema); using it always produced {@code | ||
| * -1}, so variant columns were never analyzed and Parquet shredding never activated for Kafka | ||
| * Connect and other Record-based writers. | ||
| */ | ||
| @Override | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This javadoc describes pre-fix state ("is unused", "always produced -1", "never activated") rather than what the method
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This override duplicates the loop from |
||
| public Map<Integer, Type> analyzeVariantColumns( | ||
| List<Record> bufferedRows, Schema icebergSchema, Void engineSchema) { | ||
| Map<Integer, Type> shreddedTypes = Maps.newHashMap(); | ||
| List<NestedField> cols = icebergSchema.columns(); | ||
| for (int rowIndex = 0; rowIndex < cols.size(); rowIndex++) { | ||
| NestedField col = cols.get(rowIndex); | ||
| if (col.type().isVariantType()) { | ||
| Type typed = analyzeAndCreateSchema(bufferedRows, rowIndex); | ||
| if (typed != null) { | ||
| shreddedTypes.put(col.fieldId(), typed); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return shreddedTypes; | ||
| } | ||
|
|
||
| @Override | ||
| protected List<VariantValue> extractVariantValues( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| List<Record> bufferedRows, int variantFieldIndex) { | ||
| List<VariantValue> values = Lists.newArrayList(); | ||
| for (Record record : bufferedRows) { | ||
| Object fieldValue = record.get(variantFieldIndex); | ||
| if (fieldValue instanceof Variant) { | ||
| Variant variant = (Variant) fieldValue; | ||
| values.add(variant.value()); | ||
| } | ||
| } | ||
| return values; | ||
| } | ||
|
|
||
| @Override | ||
| protected int resolveColumnIndex(Void engineSchema, String columnName) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this getting called? |
||
| // For Record types, schema resolution happens at the Iceberg level, not engine level | ||
| // Column indices are managed by the Record structure itself | ||
| return -1; | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,193 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one | ||
| * or more contributor license agreements. See the NOTICE file | ||
| * distributed with this work for additional information | ||
| * regarding copyright ownership. The ASF licenses this file | ||
| * to you under the Apache License, Version 2.0 (the | ||
| * "License"); you may not use this file except in compliance | ||
| * with the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.apache.iceberg.data; | ||
|
|
||
| import static org.assertj.core.api.Assertions.assertThat; | ||
|
|
||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
| import java.nio.file.Path; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import org.apache.iceberg.FileFormat; | ||
| import org.apache.iceberg.Files; | ||
| import org.apache.iceberg.InternalTestHelpers; | ||
| import org.apache.iceberg.PartitionSpec; | ||
| import org.apache.iceberg.Schema; | ||
| import org.apache.iceberg.TableProperties; | ||
| import org.apache.iceberg.encryption.EncryptedFiles; | ||
| import org.apache.iceberg.encryption.EncryptedOutputFile; | ||
| import org.apache.iceberg.formats.FileWriterBuilder; | ||
| import org.apache.iceberg.formats.FormatModelRegistry; | ||
| import org.apache.iceberg.io.CloseableIterable; | ||
| import org.apache.iceberg.io.DataWriter; | ||
| import org.apache.iceberg.io.OutputFile; | ||
| import org.apache.iceberg.parquet.Parquet; | ||
| import org.apache.iceberg.parquet.ParquetFileTestUtils; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Lists; | ||
| import org.apache.iceberg.types.Types; | ||
| import org.apache.iceberg.variants.Variant; | ||
| import org.apache.iceberg.variants.VariantMetadata; | ||
| import org.apache.iceberg.variants.VariantTestUtil; | ||
| import org.apache.iceberg.variants.Variants; | ||
| import org.apache.parquet.example.data.Group; | ||
| import org.apache.parquet.hadoop.ParquetFileReader; | ||
| import org.apache.parquet.hadoop.ParquetReader; | ||
| import org.apache.parquet.hadoop.example.GroupReadSupport; | ||
| import org.apache.parquet.schema.GroupType; | ||
| import org.apache.parquet.schema.MessageType; | ||
| import org.apache.parquet.schema.Type; | ||
| import org.junit.jupiter.api.BeforeEach; | ||
| import org.junit.jupiter.api.Test; | ||
| import org.junit.jupiter.api.io.TempDir; | ||
|
|
||
| public class TestRecordVariantShreddingAnalyzer { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No test covers a record with a |
||
|
|
||
| private static final Schema VARIANT_AFTER_ID_SCHEMA = | ||
| new Schema( | ||
| Types.NestedField.required(1, "id", Types.LongType.get()), | ||
| Types.NestedField.optional(2, "v", Types.VariantType.get())); | ||
|
|
||
| private static final Schema VARIANT_BEFORE_ID_SCHEMA = | ||
| new Schema( | ||
| Types.NestedField.optional(1, "v", Types.VariantType.get()), | ||
| Types.NestedField.required(2, "id", Types.LongType.get())); | ||
|
|
||
| private Variant variant; | ||
| private List<Record> records; | ||
|
|
||
| @TempDir private Path temp; | ||
|
|
||
| @BeforeEach | ||
| public void before() { | ||
| ByteBuffer metadataBuffer = VariantTestUtil.createMetadata(ImmutableList.of("a", "b"), true); | ||
| VariantMetadata metadata = Variants.metadata(metadataBuffer); | ||
| ByteBuffer objectBuffer = | ||
| VariantTestUtil.createObject( | ||
| metadataBuffer, | ||
| ImmutableMap.of( | ||
| "a", Variants.of(42), | ||
| "b", Variants.of("hello"))); | ||
| variant = Variant.of(metadata, Variants.value(metadata, objectBuffer)); | ||
|
|
||
| GenericRecord record = GenericRecord.create(VARIANT_AFTER_ID_SCHEMA); | ||
| records = | ||
| ImmutableList.of( | ||
| record.copy(ImmutableMap.of("id", 1L, "v", variant)), | ||
| record.copy(ImmutableMap.of("id", 2L, "v", variant)), | ||
| record.copy(ImmutableMap.of("id", 3L, "v", variant))); | ||
| } | ||
|
|
||
| @Test | ||
| public void testAnalyzeVariantColumnsUsesIcebergColumnOrder() { | ||
| RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); | ||
|
|
||
| Map<Integer, Type> shreddedTypes = | ||
| analyzer.analyzeVariantColumns(records, VARIANT_AFTER_ID_SCHEMA, null); | ||
|
|
||
| assertThat(shreddedTypes).containsOnlyKeys(2); | ||
| GroupType typedValue = shreddedTypes.get(2).asGroupType(); | ||
| assertThat(typedValue.getName()).isEqualTo("typed_value"); | ||
| assertThat(typedValue.containsField("a")).isTrue(); | ||
| assertThat(typedValue.containsField("b")).isTrue(); | ||
| } | ||
|
|
||
| @Test | ||
| public void testAnalyzeVariantColumnsWhenVariantIsFirstColumn() { | ||
| GenericRecord record = GenericRecord.create(VARIANT_BEFORE_ID_SCHEMA); | ||
| List<Record> variantFirstRecords = | ||
| ImmutableList.of( | ||
| record.copy(ImmutableMap.of("v", variant, "id", 1L)), | ||
| record.copy(ImmutableMap.of("v", variant, "id", 2L))); | ||
|
|
||
| RecordVariantShreddingAnalyzer analyzer = new RecordVariantShreddingAnalyzer(); | ||
| Map<Integer, Type> shreddedTypes = | ||
| analyzer.analyzeVariantColumns(variantFirstRecords, VARIANT_BEFORE_ID_SCHEMA, null); | ||
|
|
||
| assertThat(shreddedTypes).containsOnlyKeys(1); | ||
| assertThat(shreddedTypes.get(1).asGroupType().containsField("a")).isTrue(); | ||
| } | ||
|
|
||
| @Test | ||
| public void testFormatModelRegistryShreddingRoundTrip() throws IOException { | ||
| OutputFile outputFile = Files.localOutput(temp.resolve("variant-shredded.parquet").toFile()); | ||
| EncryptedOutputFile encryptedOutputFile = EncryptedFiles.plainAsEncryptedOutput(outputFile); | ||
|
|
||
| FileWriterBuilder<DataWriter<Record>, Object> writeBuilder = | ||
| FormatModelRegistry.dataWriteBuilder(FileFormat.PARQUET, Record.class, encryptedOutputFile); | ||
|
|
||
| try (DataWriter<Record> writer = | ||
| writeBuilder | ||
| .schema(VARIANT_AFTER_ID_SCHEMA) | ||
| .spec(PartitionSpec.unpartitioned()) | ||
| .setAll( | ||
| ImmutableMap.of( | ||
| TableProperties.PARQUET_SHRED_VARIANTS, "true", | ||
| TableProperties.PARQUET_VARIANT_BUFFER_SIZE, "2")) | ||
| .build()) { | ||
| for (Record rec : records) { | ||
| writer.write(rec); | ||
| } | ||
| } | ||
|
|
||
| try (ParquetFileReader reader = | ||
| ParquetFileReader.open(ParquetFileTestUtils.file(outputFile.toInputFile()))) { | ||
| MessageType parquetSchema = reader.getFooter().getFileMetaData().getSchema(); | ||
| GroupType variantGroup = parquetSchema.getType("v").asGroupType(); | ||
| assertThat(variantGroup.containsField("typed_value")).isTrue(); | ||
|
|
||
| GroupType typedValue = variantGroup.getType("typed_value").asGroupType(); | ||
| assertThat(typedValue.containsField("a")).isTrue(); | ||
| assertThat(typedValue.containsField("b")).isTrue(); | ||
| } | ||
|
|
||
| try (ParquetReader<Group> rawReader = | ||
| ParquetReader.builder( | ||
| new GroupReadSupport(), new org.apache.hadoop.fs.Path(outputFile.location())) | ||
| .build()) { | ||
| Group row = rawReader.read(); | ||
| Group variantData = row.getGroup("v", 0); | ||
| assertThat(variantData.getFieldRepetitionCount("value")).isEqualTo(0); | ||
|
|
||
| Group typedValue = variantData.getGroup("typed_value", 0); | ||
| assertThat(typedValue.getGroup("a", 0).getInteger("typed_value", 0)).isEqualTo(42); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Two things on this round-trip test. The high-level read-back via More importantly: there's no test that goes through |
||
| assertThat(typedValue.getGroup("b", 0).getString("typed_value", 0)).isEqualTo("hello"); | ||
| } | ||
|
|
||
| List<Record> writtenRecords; | ||
| try (CloseableIterable<Record> reader = | ||
| Parquet.read(outputFile.toInputFile()) | ||
| .project(VARIANT_AFTER_ID_SCHEMA) | ||
| .createReaderFunc( | ||
| fileSchema -> | ||
| org.apache.iceberg.data.parquet.GenericParquetReaders.buildReader( | ||
| VARIANT_AFTER_ID_SCHEMA, fileSchema)) | ||
| .build()) { | ||
| writtenRecords = Lists.newArrayList(reader); | ||
| } | ||
|
|
||
| assertThat(writtenRecords).hasSameSizeAs(records); | ||
| for (int i = 0; i < records.size(); i++) { | ||
| InternalTestHelpers.assertEquals( | ||
| VARIANT_AFTER_ID_SCHEMA.asStruct(), records.get(i), writtenRecords.get(i)); | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few issues to note in the doc string here:
VariantShreddingAnalyzer, not the Record-specific subclass. I don't think this is needed here.Record.get(int)).SparkVariantShreddingAnalyzerThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks a lot for reviewing this. I’ll make the necessary changes.