From 70ebfde2cbc2076f80364e6ce3f54038cbb3a2a8 Mon Sep 17 00:00:00 2001 From: Marig_Weizhi Date: Tue, 19 May 2026 17:41:26 +0800 Subject: [PATCH] Spark: Fix RewriteTablePathSparkAction failing on retry when rewriting position delete files. --- .../actions/RewriteTablePathSparkAction.java | 4 ++ .../actions/TestRewriteTablePathsAction.java | 47 +++++++++++++++++++ .../actions/RewriteTablePathSparkAction.java | 4 ++ .../actions/TestRewriteTablePathsAction.java | 47 +++++++++++++++++++ .../actions/RewriteTablePathSparkAction.java | 4 ++ .../actions/TestRewriteTablePathsAction.java | 47 +++++++++++++++++++ .../actions/RewriteTablePathSparkAction.java | 4 ++ .../actions/TestRewriteTablePathsAction.java | 47 +++++++++++++++++++ 8 files changed, 204 insertions(+) diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index aedb25e4a4a6..625ecc384fff 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -768,12 +768,14 @@ private static PositionDeleteWriter positionDeletesWriter( format, EncryptedFiles.plainAsEncryptedOutput(outputFile)) .partition(partition) .spec(spec) + .overwrite() .build(); } else { return switch (format) { case AVRO -> Avro.writeDeletes(outputFile) .createWriterFunc(DataWriter::create) + .overwrite() .withPartition(partition) .rowSchema(rowSchema) .withSpec(spec) @@ -781,6 +783,7 @@ private static PositionDeleteWriter positionDeletesWriter( case PARQUET -> Parquet.writeDeletes(outputFile) .createWriterFunc(GenericParquetWriter::create) + .overwrite() .withPartition(partition) .rowSchema(rowSchema) .withSpec(spec) @@ -788,6 +791,7 @@ private static PositionDeleteWriter positionDeletesWriter( case ORC -> ORC.writeDeletes(outputFile) .createWriterFunc(GenericOrcWriter::buildWriter) + .overwrite() .withPartition(partition) .rowSchema(rowSchema) .withSpec(spec) diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index dae721b1d73d..7b8c0555da60 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -426,6 +426,53 @@ public void testPositionDeletesOrc() throws Exception { runPositionDeletesTest("orc"); } + @TestTemplate + public void testRerunWithPositionDeletesReusingStagingLocation() throws Exception { + assumeThat(formatVersion).isEqualTo(2); + + Table tableWithPosDeletes = + createTableWithSnapshots( + tableDir.toFile().toURI().toString().concat("tableWithPosDeletesRerun"), + 2, + Map.of(TableProperties.DELETE_DEFAULT_FILE_FORMAT, "parquet")); + + List> deletes = + Lists.newArrayList( + Pair.of( + SnapshotChanges.builderFor(tableWithPosDeletes) + .build() + .addedDataFiles() + .iterator() + .next() + .location(), + 0L)); + File file = + new File( + removePrefix(tableWithPosDeletes.location() + "/data/deeply/nested/deletes.parquet")); + DeleteFile positionDeletes = + FileHelpers.writeDeleteFile( + tableWithPosDeletes, + tableWithPosDeletes.io().newOutputFile(file.toURI().toString()), + deletes, + formatVersion) + .first(); + tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit(); + + actions() + .rewriteTablePath(tableWithPosDeletes) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(tableWithPosDeletes.location(), targetTableLocation()) + .execute(); + + RewriteTablePath.Result rerun = + actions() + .rewriteTablePath(tableWithPosDeletes) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(tableWithPosDeletes.location(), targetTableLocation()) + .execute(); + assertThat(rerun.rewrittenDeleteFilePathsCount()).isEqualTo(1); + } + private void runPositionDeletesTest(String fileFormat) throws Exception { Table tableWithPosDeletes = createTableWithSnapshots( diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index aedb25e4a4a6..625ecc384fff 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -768,12 +768,14 @@ private static PositionDeleteWriter positionDeletesWriter( format, EncryptedFiles.plainAsEncryptedOutput(outputFile)) .partition(partition) .spec(spec) + .overwrite() .build(); } else { return switch (format) { case AVRO -> Avro.writeDeletes(outputFile) .createWriterFunc(DataWriter::create) + .overwrite() .withPartition(partition) .rowSchema(rowSchema) .withSpec(spec) @@ -781,6 +783,7 @@ private static PositionDeleteWriter positionDeletesWriter( case PARQUET -> Parquet.writeDeletes(outputFile) .createWriterFunc(GenericParquetWriter::create) + .overwrite() .withPartition(partition) .rowSchema(rowSchema) .withSpec(spec) @@ -788,6 +791,7 @@ private static PositionDeleteWriter positionDeletesWriter( case ORC -> ORC.writeDeletes(outputFile) .createWriterFunc(GenericOrcWriter::buildWriter) + .overwrite() .withPartition(partition) .rowSchema(rowSchema) .withSpec(spec) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index dae721b1d73d..7b8c0555da60 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -426,6 +426,53 @@ public void testPositionDeletesOrc() throws Exception { runPositionDeletesTest("orc"); } + @TestTemplate + public void testRerunWithPositionDeletesReusingStagingLocation() throws Exception { + assumeThat(formatVersion).isEqualTo(2); + + Table tableWithPosDeletes = + createTableWithSnapshots( + tableDir.toFile().toURI().toString().concat("tableWithPosDeletesRerun"), + 2, + Map.of(TableProperties.DELETE_DEFAULT_FILE_FORMAT, "parquet")); + + List> deletes = + Lists.newArrayList( + Pair.of( + SnapshotChanges.builderFor(tableWithPosDeletes) + .build() + .addedDataFiles() + .iterator() + .next() + .location(), + 0L)); + File file = + new File( + removePrefix(tableWithPosDeletes.location() + "/data/deeply/nested/deletes.parquet")); + DeleteFile positionDeletes = + FileHelpers.writeDeleteFile( + tableWithPosDeletes, + tableWithPosDeletes.io().newOutputFile(file.toURI().toString()), + deletes, + formatVersion) + .first(); + tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit(); + + actions() + .rewriteTablePath(tableWithPosDeletes) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(tableWithPosDeletes.location(), targetTableLocation()) + .execute(); + + RewriteTablePath.Result rerun = + actions() + .rewriteTablePath(tableWithPosDeletes) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(tableWithPosDeletes.location(), targetTableLocation()) + .execute(); + assertThat(rerun.rewrittenDeleteFilePathsCount()).isEqualTo(1); + } + private void runPositionDeletesTest(String fileFormat) throws Exception { Table tableWithPosDeletes = createTableWithSnapshots( diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index aedb25e4a4a6..625ecc384fff 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -768,12 +768,14 @@ private static PositionDeleteWriter positionDeletesWriter( format, EncryptedFiles.plainAsEncryptedOutput(outputFile)) .partition(partition) .spec(spec) + .overwrite() .build(); } else { return switch (format) { case AVRO -> Avro.writeDeletes(outputFile) .createWriterFunc(DataWriter::create) + .overwrite() .withPartition(partition) .rowSchema(rowSchema) .withSpec(spec) @@ -781,6 +783,7 @@ private static PositionDeleteWriter positionDeletesWriter( case PARQUET -> Parquet.writeDeletes(outputFile) .createWriterFunc(GenericParquetWriter::create) + .overwrite() .withPartition(partition) .rowSchema(rowSchema) .withSpec(spec) @@ -788,6 +791,7 @@ private static PositionDeleteWriter positionDeletesWriter( case ORC -> ORC.writeDeletes(outputFile) .createWriterFunc(GenericOrcWriter::buildWriter) + .overwrite() .withPartition(partition) .rowSchema(rowSchema) .withSpec(spec) diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index dae721b1d73d..7b8c0555da60 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -426,6 +426,53 @@ public void testPositionDeletesOrc() throws Exception { runPositionDeletesTest("orc"); } + @TestTemplate + public void testRerunWithPositionDeletesReusingStagingLocation() throws Exception { + assumeThat(formatVersion).isEqualTo(2); + + Table tableWithPosDeletes = + createTableWithSnapshots( + tableDir.toFile().toURI().toString().concat("tableWithPosDeletesRerun"), + 2, + Map.of(TableProperties.DELETE_DEFAULT_FILE_FORMAT, "parquet")); + + List> deletes = + Lists.newArrayList( + Pair.of( + SnapshotChanges.builderFor(tableWithPosDeletes) + .build() + .addedDataFiles() + .iterator() + .next() + .location(), + 0L)); + File file = + new File( + removePrefix(tableWithPosDeletes.location() + "/data/deeply/nested/deletes.parquet")); + DeleteFile positionDeletes = + FileHelpers.writeDeleteFile( + tableWithPosDeletes, + tableWithPosDeletes.io().newOutputFile(file.toURI().toString()), + deletes, + formatVersion) + .first(); + tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit(); + + actions() + .rewriteTablePath(tableWithPosDeletes) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(tableWithPosDeletes.location(), targetTableLocation()) + .execute(); + + RewriteTablePath.Result rerun = + actions() + .rewriteTablePath(tableWithPosDeletes) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(tableWithPosDeletes.location(), targetTableLocation()) + .execute(); + assertThat(rerun.rewrittenDeleteFilePathsCount()).isEqualTo(1); + } + private void runPositionDeletesTest(String fileFormat) throws Exception { Table tableWithPosDeletes = createTableWithSnapshots( diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java index aedb25e4a4a6..625ecc384fff 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteTablePathSparkAction.java @@ -768,12 +768,14 @@ private static PositionDeleteWriter positionDeletesWriter( format, EncryptedFiles.plainAsEncryptedOutput(outputFile)) .partition(partition) .spec(spec) + .overwrite() .build(); } else { return switch (format) { case AVRO -> Avro.writeDeletes(outputFile) .createWriterFunc(DataWriter::create) + .overwrite() .withPartition(partition) .rowSchema(rowSchema) .withSpec(spec) @@ -781,6 +783,7 @@ private static PositionDeleteWriter positionDeletesWriter( case PARQUET -> Parquet.writeDeletes(outputFile) .createWriterFunc(GenericParquetWriter::create) + .overwrite() .withPartition(partition) .rowSchema(rowSchema) .withSpec(spec) @@ -788,6 +791,7 @@ private static PositionDeleteWriter positionDeletesWriter( case ORC -> ORC.writeDeletes(outputFile) .createWriterFunc(GenericOrcWriter::buildWriter) + .overwrite() .withPartition(partition) .rowSchema(rowSchema) .withSpec(spec) diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java index dae721b1d73d..7b8c0555da60 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteTablePathsAction.java @@ -426,6 +426,53 @@ public void testPositionDeletesOrc() throws Exception { runPositionDeletesTest("orc"); } + @TestTemplate + public void testRerunWithPositionDeletesReusingStagingLocation() throws Exception { + assumeThat(formatVersion).isEqualTo(2); + + Table tableWithPosDeletes = + createTableWithSnapshots( + tableDir.toFile().toURI().toString().concat("tableWithPosDeletesRerun"), + 2, + Map.of(TableProperties.DELETE_DEFAULT_FILE_FORMAT, "parquet")); + + List> deletes = + Lists.newArrayList( + Pair.of( + SnapshotChanges.builderFor(tableWithPosDeletes) + .build() + .addedDataFiles() + .iterator() + .next() + .location(), + 0L)); + File file = + new File( + removePrefix(tableWithPosDeletes.location() + "/data/deeply/nested/deletes.parquet")); + DeleteFile positionDeletes = + FileHelpers.writeDeleteFile( + tableWithPosDeletes, + tableWithPosDeletes.io().newOutputFile(file.toURI().toString()), + deletes, + formatVersion) + .first(); + tableWithPosDeletes.newRowDelta().addDeletes(positionDeletes).commit(); + + actions() + .rewriteTablePath(tableWithPosDeletes) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(tableWithPosDeletes.location(), targetTableLocation()) + .execute(); + + RewriteTablePath.Result rerun = + actions() + .rewriteTablePath(tableWithPosDeletes) + .stagingLocation(stagingLocation()) + .rewriteLocationPrefix(tableWithPosDeletes.location(), targetTableLocation()) + .execute(); + assertThat(rerun.rewrittenDeleteFilePathsCount()).isEqualTo(1); + } + private void runPositionDeletesTest(String fileFormat) throws Exception { Table tableWithPosDeletes = createTableWithSnapshots(