From d3eb8ca6f3e5570183671657aae337ccd488b0bc Mon Sep 17 00:00:00 2001 From: Vova Kolmakov Date: Mon, 18 May 2026 11:00:49 +0700 Subject: [PATCH] Core, Spark: Clean up uncommitted files when a staged table is aborted StagedSparkTable.abortStagedChanges() was an empty `// TODO: clean up`, so when an atomic CTAS/RTAS (or the snapshot/migrate actions) failed after the staged write but before commitStagedChanges(), the manifest list and manifests already written into the uncommitted transaction were leaked. For a staged CREATE the table is never registered, so those orphans are unreachable by removeOrphanFiles and leak permanently. This adds a best-effort `default void abortTransaction()` to the Transaction API, overridden in BaseTransaction to run the existing cleanUp() (cleanAllUpdates() + deleteUncommittedFiles()) and delegated by CommitCallbackTransaction. StagedSparkTable.abortStagedChanges() now calls it across all supported Spark versions (3.4, 3.5, 4.0, 4.1). The default is a no-op rather than throwing because abort runs from catch/finally blocks where a secondary exception would mask the original failure; the underlying deletion is already best-effort and idempotent, so calling abort after a failed commit is safe. Out of scope (unchanged, pre-existing): executor-written data files in the write-succeeds-then-commit-fails path, which match Iceberg's existing create-transaction behavior and are handled by SparkWrite.abort() on write-job failure. Tests: a new abort case in core TestCreateTransaction, and a byte-identical TestStagedSparkTable added to all four Spark version trees. Co-Authored-By: Claude Opus 4.7 (1M context) --- .palantir/revapi.yml | 3 + .../java/org/apache/iceberg/Transaction.java | 14 +++ .../org/apache/iceberg/BaseTransaction.java | 5 ++ .../iceberg/CommitCallbackTransaction.java | 5 ++ .../apache/iceberg/TestCreateTransaction.java | 24 +++++ .../spark/source/StagedSparkTable.java | 2 +- .../spark/source/TestStagedSparkTable.java | 89 +++++++++++++++++++ .../spark/source/StagedSparkTable.java | 2 +- .../spark/source/TestStagedSparkTable.java | 89 +++++++++++++++++++ .../spark/source/StagedSparkTable.java | 2 +- .../spark/source/TestStagedSparkTable.java | 89 +++++++++++++++++++ 11 files changed, 321 insertions(+), 3 deletions(-) create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStagedSparkTable.java create mode 100644 spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStagedSparkTable.java create mode 100644 spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStagedSparkTable.java diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index 80fa8f15f168..0a53abf723a7 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -410,6 +410,9 @@ acceptedBreaks: old: "class org.apache.iceberg.encryption.EncryptingFileIO" new: "class org.apache.iceberg.encryption.EncryptingFileIO" justification: "New method for Manifest List reading" + - code: "java.method.addedToInterface" + new: "method void org.apache.iceberg.Transaction::abortTransaction()" + justification: "new API method with a default implementation" org.apache.iceberg:iceberg-core: - code: "java.class.defaultSerializationChanged" old: "class org.apache.iceberg.avro.SupportsIndexProjection" diff --git a/api/src/main/java/org/apache/iceberg/Transaction.java b/api/src/main/java/org/apache/iceberg/Transaction.java index 77e19e45e451..078f83d48ba3 100644 --- a/api/src/main/java/org/apache/iceberg/Transaction.java +++ b/api/src/main/java/org/apache/iceberg/Transaction.java @@ -183,4 +183,18 @@ default ManageSnapshots manageSnapshots() { * @throws CommitFailedException If the updates cannot be committed due to conflicts. */ void commitTransaction(); + + /** + * Roll back all pending changes and clean up any uncommitted files written by this transaction. + * + *

This is a best-effort operation intended to be called when a transaction will not be + * committed (for example, in a {@code finally} or {@code catch} block of a failed atomic + * create/replace operation). It deletes uncommitted manifest lists, manifests, and other files + * staged by operations in this transaction. It does not throw if some files cannot be deleted. + * + *

After this method is called, the transaction must not be committed or reused. + * + *

The default implementation does nothing. + */ + default void abortTransaction() {} } diff --git a/core/src/main/java/org/apache/iceberg/BaseTransaction.java b/core/src/main/java/org/apache/iceberg/BaseTransaction.java index 9884ac297079..b8d9e49fa2e6 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTransaction.java +++ b/core/src/main/java/org/apache/iceberg/BaseTransaction.java @@ -417,6 +417,11 @@ private void commitSimpleTransaction() { } } + @Override + public void abortTransaction() { + cleanUp(); + } + protected void cleanUp() { // the commit failed and no files were committed. clean up each update. cleanAllUpdates(); diff --git a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java index e80f6bcdac95..564ebb5fb0f5 100644 --- a/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java +++ b/core/src/main/java/org/apache/iceberg/CommitCallbackTransaction.java @@ -126,4 +126,9 @@ public void commitTransaction() { wrapped.commitTransaction(); callback.run(); } + + @Override + public void abortTransaction() { + wrapped.abortTransaction(); + } } diff --git a/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java b/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java index 79b99f1f61c8..e4bfd26b69cc 100644 --- a/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java +++ b/core/src/test/java/org/apache/iceberg/TestCreateTransaction.java @@ -253,4 +253,28 @@ public void testCreateTransactionConflict() throws IOException { assertThat(listManifestFiles(tableDir)).isEmpty(); } + + @TestTemplate + public void testAbortTransactionCleansUpUncommittedFiles() { + Transaction txn = TestTables.beginCreate(tableDir, "test_abort", SCHEMA, SPEC); + + // append in the transaction to ensure a manifest and a manifest list are written + txn.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit(); + + // the staged write produced files on disk, but the table itself is not created yet + assertThat(TestTables.readMetadata("test_abort")).isNull(); + assertThat(listManifestFiles(tableDir)).hasSize(1); + assertThat(listManifestLists(tableDir)).hasSize(1); + + // aborting the transaction removes the uncommitted manifests and manifest lists + txn.abortTransaction(); + + assertThat(listManifestFiles(tableDir)).isEmpty(); + assertThat(listManifestLists(tableDir)).isEmpty(); + assertThat(TestTables.readMetadata("test_abort")).isNull(); + + // abort is best-effort and idempotent: a second call must not throw + txn.abortTransaction(); + assertThat(listManifestFiles(tableDir)).isEmpty(); + } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java index b92c02d2b536..78405788bac2 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java @@ -36,6 +36,6 @@ public void commitStagedChanges() { @Override public void abortStagedChanges() { - // TODO: clean up + transaction.abortTransaction(); } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStagedSparkTable.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStagedSparkTable.java new file mode 100644 index 000000000000..f2d31a2eb65d --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestStagedSparkTable.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.io.File; +import java.net.URI; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.StagedTable; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.TestTemplate; + +public class TestStagedSparkTable extends CatalogTestBase { + + @TestTemplate + public void testAbortStagedChangesCleansUpUncommittedFiles() throws Exception { + CatalogPlugin plugin = spark.sessionState().catalogManager().catalog(catalogName); + // StagedSparkTable is only produced by SparkCatalog; the session catalog uses + // RollbackStagedTable + assumeThat(plugin).isInstanceOf(SparkCatalog.class); + SparkCatalog catalog = (SparkCatalog) plugin; + + Identifier ident = Identifier.of(new String[] {"default"}, "abort_staged"); + StructType schema = + new StructType() + .add("id", DataTypes.LongType, false) + .add("data", DataTypes.StringType, true); + + StagedTable staged = catalog.stageCreate(ident, schema, new Transform[0], ImmutableMap.of()); + + // route an append into the staged (uncommitted) transaction so a manifest and a manifest list + // are written to the table location without the table being committed to the catalog + Table table = ((SparkTable) staged).table(); + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath("/path/to/data-abort.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + table.newAppend().appendFile(dataFile).commit(); + + File metadataDir = new File(URI.create(table.location()).getPath(), "metadata"); + File[] stagedFiles = metadataDir.listFiles((dir, name) -> name.endsWith(".avro")); + assertThat(stagedFiles) + .as("staged write should produce manifest and manifest-list files") + .isNotNull() + .isNotEmpty(); + + staged.abortStagedChanges(); + + File[] remaining = metadataDir.listFiles((dir, name) -> name.endsWith(".avro")); + assertThat(remaining == null || remaining.length == 0) + .as("abort should delete all uncommitted manifest and manifest-list files") + .isTrue(); + assertThat(catalog.tableExists(ident)) + .as("staged table must not be created in the catalog after abort") + .isFalse(); + + // abort is best-effort and idempotent: a second call must not throw + staged.abortStagedChanges(); + } +} diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java index b92c02d2b536..78405788bac2 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java @@ -36,6 +36,6 @@ public void commitStagedChanges() { @Override public void abortStagedChanges() { - // TODO: clean up + transaction.abortTransaction(); } } diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStagedSparkTable.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStagedSparkTable.java new file mode 100644 index 000000000000..f2d31a2eb65d --- /dev/null +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/source/TestStagedSparkTable.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.io.File; +import java.net.URI; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.StagedTable; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.TestTemplate; + +public class TestStagedSparkTable extends CatalogTestBase { + + @TestTemplate + public void testAbortStagedChangesCleansUpUncommittedFiles() throws Exception { + CatalogPlugin plugin = spark.sessionState().catalogManager().catalog(catalogName); + // StagedSparkTable is only produced by SparkCatalog; the session catalog uses + // RollbackStagedTable + assumeThat(plugin).isInstanceOf(SparkCatalog.class); + SparkCatalog catalog = (SparkCatalog) plugin; + + Identifier ident = Identifier.of(new String[] {"default"}, "abort_staged"); + StructType schema = + new StructType() + .add("id", DataTypes.LongType, false) + .add("data", DataTypes.StringType, true); + + StagedTable staged = catalog.stageCreate(ident, schema, new Transform[0], ImmutableMap.of()); + + // route an append into the staged (uncommitted) transaction so a manifest and a manifest list + // are written to the table location without the table being committed to the catalog + Table table = ((SparkTable) staged).table(); + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath("/path/to/data-abort.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + table.newAppend().appendFile(dataFile).commit(); + + File metadataDir = new File(URI.create(table.location()).getPath(), "metadata"); + File[] stagedFiles = metadataDir.listFiles((dir, name) -> name.endsWith(".avro")); + assertThat(stagedFiles) + .as("staged write should produce manifest and manifest-list files") + .isNotNull() + .isNotEmpty(); + + staged.abortStagedChanges(); + + File[] remaining = metadataDir.listFiles((dir, name) -> name.endsWith(".avro")); + assertThat(remaining == null || remaining.length == 0) + .as("abort should delete all uncommitted manifest and manifest-list files") + .isTrue(); + assertThat(catalog.tableExists(ident)) + .as("staged table must not be created in the catalog after abort") + .isFalse(); + + // abort is best-effort and idempotent: a second call must not throw + staged.abortStagedChanges(); + } +} diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java index d78f83a51140..52f61b852bc8 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StagedSparkTable.java @@ -36,6 +36,6 @@ public void commitStagedChanges() { @Override public void abortStagedChanges() { - // TODO: clean up + transaction.abortTransaction(); } } diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStagedSparkTable.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStagedSparkTable.java new file mode 100644 index 000000000000..f2d31a2eb65d --- /dev/null +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/source/TestStagedSparkTable.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assumptions.assumeThat; + +import java.io.File; +import java.net.URI; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.Table; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.spark.CatalogTestBase; +import org.apache.iceberg.spark.SparkCatalog; +import org.apache.spark.sql.connector.catalog.CatalogPlugin; +import org.apache.spark.sql.connector.catalog.Identifier; +import org.apache.spark.sql.connector.catalog.StagedTable; +import org.apache.spark.sql.connector.expressions.Transform; +import org.apache.spark.sql.types.DataTypes; +import org.apache.spark.sql.types.StructType; +import org.junit.jupiter.api.TestTemplate; + +public class TestStagedSparkTable extends CatalogTestBase { + + @TestTemplate + public void testAbortStagedChangesCleansUpUncommittedFiles() throws Exception { + CatalogPlugin plugin = spark.sessionState().catalogManager().catalog(catalogName); + // StagedSparkTable is only produced by SparkCatalog; the session catalog uses + // RollbackStagedTable + assumeThat(plugin).isInstanceOf(SparkCatalog.class); + SparkCatalog catalog = (SparkCatalog) plugin; + + Identifier ident = Identifier.of(new String[] {"default"}, "abort_staged"); + StructType schema = + new StructType() + .add("id", DataTypes.LongType, false) + .add("data", DataTypes.StringType, true); + + StagedTable staged = catalog.stageCreate(ident, schema, new Transform[0], ImmutableMap.of()); + + // route an append into the staged (uncommitted) transaction so a manifest and a manifest list + // are written to the table location without the table being committed to the catalog + Table table = ((SparkTable) staged).table(); + DataFile dataFile = + DataFiles.builder(table.spec()) + .withPath("/path/to/data-abort.parquet") + .withFileSizeInBytes(10) + .withRecordCount(1) + .build(); + table.newAppend().appendFile(dataFile).commit(); + + File metadataDir = new File(URI.create(table.location()).getPath(), "metadata"); + File[] stagedFiles = metadataDir.listFiles((dir, name) -> name.endsWith(".avro")); + assertThat(stagedFiles) + .as("staged write should produce manifest and manifest-list files") + .isNotNull() + .isNotEmpty(); + + staged.abortStagedChanges(); + + File[] remaining = metadataDir.listFiles((dir, name) -> name.endsWith(".avro")); + assertThat(remaining == null || remaining.length == 0) + .as("abort should delete all uncommitted manifest and manifest-list files") + .isTrue(); + assertThat(catalog.tableExists(ident)) + .as("staged table must not be created in the catalog after abort") + .isFalse(); + + // abort is best-effort and idempotent: a second call must not throw + staged.abortStagedChanges(); + } +}