-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Implement per column compression #3396
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -44,6 +44,7 @@ | |
| import org.apache.parquet.column.values.bloomfilter.BloomFilter; | ||
| import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; | ||
| import org.apache.parquet.column.values.bloomfilter.BloomFilterWriter; | ||
| import org.apache.parquet.compression.CompressionCodecFactory; | ||
| import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; | ||
| import org.apache.parquet.crypto.AesCipher; | ||
| import org.apache.parquet.crypto.InternalColumnEncryptionSetup; | ||
|
|
@@ -672,6 +673,83 @@ public ColumnChunkPageWriteStore( | |
| } | ||
| } | ||
|
|
||
| /** | ||
| * Construct a page write store with per-column compression support. | ||
| * Each column's compression codec is resolved from {@code props} via | ||
| * {@link ParquetProperties#getCompressionCodec(ColumnDescriptor)}. | ||
| * | ||
| * @param codecFactory factory to create compressors for each codec | ||
| * @param props properties containing per-column compression configuration | ||
| * @param schema the message schema | ||
| * @param allocator byte buffer allocator | ||
| * @param columnIndexTruncateLength truncate length for column indexes | ||
| * @param pageWriteChecksumEnabled whether to write page checksums | ||
| * @param fileEncryptor file encryptor (null if not encrypted) | ||
| * @param rowGroupOrdinal row group ordinal | ||
| */ | ||
| public ColumnChunkPageWriteStore( | ||
| CompressionCodecFactory codecFactory, | ||
| ParquetProperties props, | ||
| MessageType schema, | ||
| ByteBufferAllocator allocator, | ||
| int columnIndexTruncateLength, | ||
| boolean pageWriteChecksumEnabled, | ||
| InternalFileEncryptor fileEncryptor, | ||
| int rowGroupOrdinal) { | ||
| this.schema = schema; | ||
| if (null == fileEncryptor) { | ||
| for (ColumnDescriptor path : schema.getColumns()) { | ||
| BytesInputCompressor compressor = codecFactory.getCompressor(props.getCompressionCodec(path)); | ||
| writers.put( | ||
| path, | ||
| new ColumnChunkPageWriter( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this copy and paste from other constructors, I wonder if there is some refactoring that can be done to avoid duplication? (I wonder if we should have a ColumnChunkPageWriterBuilder?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually opted for some other way of reducing duplication let me know if its ok to you, or if the builder is preferable.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 to add a builder pattern if we don't need to touch a lot of files.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| path, | ||
| compressor, | ||
| allocator, | ||
| columnIndexTruncateLength, | ||
| pageWriteChecksumEnabled, | ||
| null, | ||
| null, | ||
| null, | ||
| -1, | ||
| -1)); | ||
| } | ||
| return; | ||
| } | ||
|
|
||
| // Encrypted file | ||
| int columnOrdinal = -1; | ||
| byte[] fileAAD = fileEncryptor.getFileAAD(); | ||
| for (ColumnDescriptor path : schema.getColumns()) { | ||
| columnOrdinal++; | ||
| BlockCipher.Encryptor headerBlockEncryptor = null; | ||
| BlockCipher.Encryptor pageBlockEncryptor = null; | ||
| ColumnPath columnPath = ColumnPath.get(path.getPath()); | ||
|
|
||
| BytesInputCompressor compressor = codecFactory.getCompressor(props.getCompressionCodec(path)); | ||
|
|
||
| InternalColumnEncryptionSetup columnSetup = fileEncryptor.getColumnSetup(columnPath, true, columnOrdinal); | ||
| if (columnSetup.isEncrypted()) { | ||
| headerBlockEncryptor = columnSetup.getMetaDataEncryptor(); | ||
| pageBlockEncryptor = columnSetup.getDataEncryptor(); | ||
| } | ||
|
|
||
| writers.put( | ||
| path, | ||
| new ColumnChunkPageWriter( | ||
| path, | ||
| compressor, | ||
| allocator, | ||
| columnIndexTruncateLength, | ||
| pageWriteChecksumEnabled, | ||
| headerBlockEncryptor, | ||
| pageBlockEncryptor, | ||
| fileAAD, | ||
| rowGroupOrdinal, | ||
| columnOrdinal)); | ||
| } | ||
| } | ||
|
|
||
| @Override | ||
| public PageWriter getPageWriter(ColumnDescriptor path) { | ||
| return writers.get(path); | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -28,6 +28,7 @@ | |
| import org.apache.parquet.column.ColumnWriteStore; | ||
| import org.apache.parquet.column.ParquetProperties; | ||
| import org.apache.parquet.column.values.bloomfilter.BloomFilterWriteStore; | ||
| import org.apache.parquet.compression.CompressionCodecFactory; | ||
| import org.apache.parquet.compression.CompressionCodecFactory.BytesInputCompressor; | ||
| import org.apache.parquet.crypto.InternalFileEncryptor; | ||
| import org.apache.parquet.hadoop.api.WriteSupport; | ||
|
|
@@ -52,6 +53,7 @@ class InternalParquetRecordWriter<T> { | |
| private final int rowGroupRecordCountThreshold; | ||
| private long nextRowGroupSize; | ||
| private final BytesInputCompressor compressor; | ||
| private final CompressionCodecFactory codecFactory; | ||
| private final boolean validating; | ||
| private final ParquetProperties props; | ||
|
|
||
|
|
@@ -77,7 +79,9 @@ class InternalParquetRecordWriter<T> { | |
| * @param extraMetaData extra meta data to write in the footer of the file | ||
| * @param rowGroupSize the size of a block in the file (this will be approximate) | ||
| * @param compressor the codec used to compress | ||
| * @deprecated Use {@link #InternalParquetRecordWriter(ParquetFileWriter, WriteSupport, MessageType, Map, long, CompressionCodecFactory, boolean, ParquetProperties)} for per-column compression support | ||
| */ | ||
| @Deprecated | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it better not to deprecate this constructor? |
||
| public InternalParquetRecordWriter( | ||
| ParquetFileWriter parquetFileWriter, | ||
| WriteSupport<T> writeSupport, | ||
|
|
@@ -95,6 +99,41 @@ public InternalParquetRecordWriter( | |
| this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit(); | ||
| this.nextRowGroupSize = rowGroupSizeThreshold; | ||
| this.compressor = compressor; | ||
| this.codecFactory = null; | ||
| this.validating = validating; | ||
| this.props = props; | ||
| this.fileEncryptor = parquetFileWriter.getEncryptor(); | ||
| this.rowGroupOrdinal = 0; | ||
| initStore(); | ||
| recordCountForNextMemCheck = props.getMinRowCountForPageSizeCheck(); | ||
| } | ||
|
|
||
| /** | ||
| * @param parquetFileWriter the file to write to | ||
| * @param writeSupport the class to convert incoming records | ||
| * @param schema the schema of the records | ||
| * @param extraMetaData extra meta data to write in the footer of the file | ||
| * @param rowGroupSize the size of a block in the file (this will be approximate) | ||
| * @param codecFactory the codec factory for per-column compression | ||
| */ | ||
| public InternalParquetRecordWriter( | ||
| ParquetFileWriter parquetFileWriter, | ||
| WriteSupport<T> writeSupport, | ||
| MessageType schema, | ||
| Map<String, String> extraMetaData, | ||
| long rowGroupSize, | ||
| CompressionCodecFactory codecFactory, | ||
| boolean validating, | ||
| ParquetProperties props) { | ||
| this.parquetFileWriter = parquetFileWriter; | ||
| this.writeSupport = Objects.requireNonNull(writeSupport, "writeSupport cannot be null"); | ||
| this.schema = schema; | ||
| this.extraMetaData = extraMetaData; | ||
| this.rowGroupSizeThreshold = rowGroupSize; | ||
| this.rowGroupRecordCountThreshold = props.getRowGroupRowCountLimit(); | ||
| this.nextRowGroupSize = rowGroupSizeThreshold; | ||
| this.compressor = null; | ||
| this.codecFactory = codecFactory; | ||
| this.validating = validating; | ||
| this.props = props; | ||
| this.fileEncryptor = parquetFileWriter.getEncryptor(); | ||
|
|
@@ -108,14 +147,27 @@ public ParquetMetadata getFooter() { | |
| } | ||
|
|
||
| private void initStore() { | ||
| ColumnChunkPageWriteStore columnChunkPageWriteStore = new ColumnChunkPageWriteStore( | ||
| compressor, | ||
| schema, | ||
| props.getAllocator(), | ||
| props.getColumnIndexTruncateLength(), | ||
| props.getPageWriteChecksumEnabled(), | ||
| fileEncryptor, | ||
| rowGroupOrdinal); | ||
| ColumnChunkPageWriteStore columnChunkPageWriteStore; | ||
| if (codecFactory != null) { | ||
|
emkornfield marked this conversation as resolved.
|
||
| columnChunkPageWriteStore = new ColumnChunkPageWriteStore( | ||
| codecFactory, | ||
| props, | ||
| schema, | ||
| props.getAllocator(), | ||
| props.getColumnIndexTruncateLength(), | ||
| props.getPageWriteChecksumEnabled(), | ||
| fileEncryptor, | ||
| rowGroupOrdinal); | ||
| } else { | ||
| columnChunkPageWriteStore = new ColumnChunkPageWriteStore( | ||
| compressor, | ||
| schema, | ||
| props.getAllocator(), | ||
| props.getColumnIndexTruncateLength(), | ||
| props.getPageWriteChecksumEnabled(), | ||
| fileEncryptor, | ||
| rowGroupOrdinal); | ||
| } | ||
| pageStore = columnChunkPageWriteStore; | ||
| bloomFilterWriteStore = columnChunkPageWriteStore; | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -201,15 +201,11 @@ public ParquetRecordWriter( | |
| MemoryManager memoryManager, | ||
| Configuration conf) { | ||
| this.codecFactory = new CodecFactory(conf, props.getPageSizeThreshold()); | ||
| // Ensure the default compression codec from ParquetOutputFormat is set in props | ||
| ParquetProperties propsWithCodec = | ||
| ParquetProperties.copy(props).withCompressionCodec(codec).build(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. does risk overwriting an already set compression codec?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. addressed |
||
| internalWriter = new InternalParquetRecordWriter<T>( | ||
| w, | ||
| writeSupport, | ||
| schema, | ||
| extraMetaData, | ||
| blockSize, | ||
| codecFactory.getCompressor(codec), | ||
| validating, | ||
| props); | ||
| w, writeSupport, schema, extraMetaData, blockSize, codecFactory, validating, propsWithCodec); | ||
| this.memoryManager = Objects.requireNonNull(memoryManager, "memoryManager cannot be null"); | ||
| memoryManager.addWriter(internalWriter, blockSize); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -395,7 +395,6 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport | |
| fileWriter.start(); | ||
|
|
||
| this.codecFactory = codecFactory; | ||
| CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(compressionCodecName); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
|
||
| final Map<String, String> extraMetadata; | ||
| if (encodingProps.getExtraMetaData() == null | ||
|
|
@@ -418,7 +417,7 @@ public ParquetWriter(Path file, Configuration conf, WriteSupport<T> writeSupport | |
| } | ||
|
|
||
| this.writer = new InternalParquetRecordWriter<T>( | ||
| fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, compressor, validating, encodingProps); | ||
| fileWriter, writeSupport, schema, extraMetadata, rowGroupSize, codecFactory, validating, encodingProps); | ||
| } | ||
|
|
||
| public void write(T object) throws IOException { | ||
|
|
@@ -559,13 +558,29 @@ public SELF withWriteMode(ParquetFileWriter.Mode mode) { | |
|
|
||
| /** | ||
| * Set the {@link CompressionCodecName compression codec} used by the | ||
| * constructed writer. | ||
| * constructed writer. This sets the default compression codec for all columns. | ||
| * Per-column overrides can be set with {@link #withCompressionCodec(String, CompressionCodecName)}. | ||
| * | ||
| * @param codecName a {@code CompressionCodecName} | ||
| * @return this builder for method chaining. | ||
| */ | ||
| public SELF withCompressionCodec(CompressionCodecName codecName) { | ||
| this.codecName = codecName; | ||
| encodingPropsBuilder.withCompressionCodec(codecName); | ||
| return self(); | ||
| } | ||
|
|
||
| /** | ||
| * Set the {@link CompressionCodecName compression codec} for a specific column. | ||
| * Columns not explicitly configured will use the default codec set by | ||
| * {@link #withCompressionCodec(CompressionCodecName)}. | ||
| * | ||
| * @param columnPath the path of the column (dot-string) | ||
| * @param codecName the compression codec to use for the column | ||
| * @return this builder for method chaining. | ||
| */ | ||
| public SELF withCompressionCodec(String columnPath, CompressionCodecName codecName) { | ||
| encodingPropsBuilder.withCompressionCodec(columnPath, codecName); | ||
| return self(); | ||
| } | ||
|
|
||
|
|
||

Uh oh!
There was an error while loading. Please reload this page.