Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .palantir/revapi.yml
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,9 @@ acceptedBreaks:
old: "class org.apache.iceberg.encryption.EncryptingFileIO"
new: "class org.apache.iceberg.encryption.EncryptingFileIO"
justification: "New method for Manifest List reading"
- code: "java.method.addedToInterface"
new: "method void org.apache.iceberg.Transaction::abortTransaction()"
justification: "new API method with a default implementation"
org.apache.iceberg:iceberg-core:
- code: "java.class.defaultSerializationChanged"
old: "class org.apache.iceberg.avro.SupportsIndexProjection"
Expand Down
14 changes: 14 additions & 0 deletions api/src/main/java/org/apache/iceberg/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -183,4 +183,18 @@ default ManageSnapshots manageSnapshots() {
* @throws CommitFailedException If the updates cannot be committed due to conflicts.
*/
void commitTransaction();

/**
* Roll back all pending changes and clean up any uncommitted files written by this transaction.
*
* <p>This is a best-effort operation intended to be called when a transaction will not be
* committed (for example, in a {@code finally} or {@code catch} block of a failed atomic
* create/replace operation). It deletes uncommitted manifest lists, manifests, and other files
* staged by operations in this transaction. It does not throw if some files cannot be deleted.
*
* <p>After this method is called, the transaction must not be committed or reused.
*
* <p>The default implementation does nothing.
*/
default void abortTransaction() {}
}
5 changes: 5 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,11 @@ private void commitSimpleTransaction() {
}
}

@Override
public void abortTransaction() {
cleanUp();
}

protected void cleanUp() {
// the commit failed and no files were committed. clean up each update.
cleanAllUpdates();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,9 @@ public void commitTransaction() {
wrapped.commitTransaction();
callback.run();
}

@Override
public void abortTransaction() {
wrapped.abortTransaction();
}
}
24 changes: 24 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestCreateTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -253,4 +253,28 @@ public void testCreateTransactionConflict() throws IOException {

assertThat(listManifestFiles(tableDir)).isEmpty();
}

@TestTemplate
public void testAbortTransactionCleansUpUncommittedFiles() {
Transaction txn = TestTables.beginCreate(tableDir, "test_abort", SCHEMA, SPEC);

// append in the transaction to ensure a manifest and a manifest list are written
txn.newAppend().appendFile(FILE_A).appendFile(FILE_B).commit();

// the staged write produced files on disk, but the table itself is not created yet
assertThat(TestTables.readMetadata("test_abort")).isNull();
assertThat(listManifestFiles(tableDir)).hasSize(1);
assertThat(listManifestLists(tableDir)).hasSize(1);

// aborting the transaction removes the uncommitted manifests and manifest lists
txn.abortTransaction();

assertThat(listManifestFiles(tableDir)).isEmpty();
assertThat(listManifestLists(tableDir)).isEmpty();
assertThat(TestTables.readMetadata("test_abort")).isNull();

// abort is best-effort and idempotent: a second call must not throw
txn.abortTransaction();
assertThat(listManifestFiles(tableDir)).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ public void commitStagedChanges() {

@Override
public void abortStagedChanges() {
// TODO: clean up
transaction.abortTransaction();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.source;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
import java.net.URI;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.StagedTable;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.TestTemplate;

public class TestStagedSparkTable extends CatalogTestBase {

@TestTemplate
public void testAbortStagedChangesCleansUpUncommittedFiles() throws Exception {
CatalogPlugin plugin = spark.sessionState().catalogManager().catalog(catalogName);
// StagedSparkTable is only produced by SparkCatalog; the session catalog uses
// RollbackStagedTable
assumeThat(plugin).isInstanceOf(SparkCatalog.class);
SparkCatalog catalog = (SparkCatalog) plugin;

Identifier ident = Identifier.of(new String[] {"default"}, "abort_staged");
StructType schema =
new StructType()
.add("id", DataTypes.LongType, false)
.add("data", DataTypes.StringType, true);

StagedTable staged = catalog.stageCreate(ident, schema, new Transform[0], ImmutableMap.of());

// route an append into the staged (uncommitted) transaction so a manifest and a manifest list
// are written to the table location without the table being committed to the catalog
Table table = ((SparkTable) staged).table();
DataFile dataFile =
DataFiles.builder(table.spec())
.withPath("/path/to/data-abort.parquet")
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();
table.newAppend().appendFile(dataFile).commit();

File metadataDir = new File(URI.create(table.location()).getPath(), "metadata");
File[] stagedFiles = metadataDir.listFiles((dir, name) -> name.endsWith(".avro"));
assertThat(stagedFiles)
.as("staged write should produce manifest and manifest-list files")
.isNotNull()
.isNotEmpty();

staged.abortStagedChanges();

File[] remaining = metadataDir.listFiles((dir, name) -> name.endsWith(".avro"));
assertThat(remaining == null || remaining.length == 0)
.as("abort should delete all uncommitted manifest and manifest-list files")
.isTrue();
assertThat(catalog.tableExists(ident))
.as("staged table must not be created in the catalog after abort")
.isFalse();

// abort is best-effort and idempotent: a second call must not throw
staged.abortStagedChanges();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ public void commitStagedChanges() {

@Override
public void abortStagedChanges() {
// TODO: clean up
transaction.abortTransaction();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.source;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
import java.net.URI;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.StagedTable;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.TestTemplate;

public class TestStagedSparkTable extends CatalogTestBase {

@TestTemplate
public void testAbortStagedChangesCleansUpUncommittedFiles() throws Exception {
CatalogPlugin plugin = spark.sessionState().catalogManager().catalog(catalogName);
// StagedSparkTable is only produced by SparkCatalog; the session catalog uses
// RollbackStagedTable
assumeThat(plugin).isInstanceOf(SparkCatalog.class);
SparkCatalog catalog = (SparkCatalog) plugin;

Identifier ident = Identifier.of(new String[] {"default"}, "abort_staged");
StructType schema =
new StructType()
.add("id", DataTypes.LongType, false)
.add("data", DataTypes.StringType, true);

StagedTable staged = catalog.stageCreate(ident, schema, new Transform[0], ImmutableMap.of());

// route an append into the staged (uncommitted) transaction so a manifest and a manifest list
// are written to the table location without the table being committed to the catalog
Table table = ((SparkTable) staged).table();
DataFile dataFile =
DataFiles.builder(table.spec())
.withPath("/path/to/data-abort.parquet")
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();
table.newAppend().appendFile(dataFile).commit();

File metadataDir = new File(URI.create(table.location()).getPath(), "metadata");
File[] stagedFiles = metadataDir.listFiles((dir, name) -> name.endsWith(".avro"));
assertThat(stagedFiles)
.as("staged write should produce manifest and manifest-list files")
.isNotNull()
.isNotEmpty();

staged.abortStagedChanges();

File[] remaining = metadataDir.listFiles((dir, name) -> name.endsWith(".avro"));
assertThat(remaining == null || remaining.length == 0)
.as("abort should delete all uncommitted manifest and manifest-list files")
.isTrue();
assertThat(catalog.tableExists(ident))
.as("staged table must not be created in the catalog after abort")
.isFalse();

// abort is best-effort and idempotent: a second call must not throw
staged.abortStagedChanges();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,6 @@ public void commitStagedChanges() {

@Override
public void abortStagedChanges() {
// TODO: clean up
transaction.abortTransaction();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg.spark.source;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

import java.io.File;
import java.net.URI;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.CatalogTestBase;
import org.apache.iceberg.spark.SparkCatalog;
import org.apache.spark.sql.connector.catalog.CatalogPlugin;
import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.StagedTable;
import org.apache.spark.sql.connector.expressions.Transform;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.junit.jupiter.api.TestTemplate;

public class TestStagedSparkTable extends CatalogTestBase {

@TestTemplate
public void testAbortStagedChangesCleansUpUncommittedFiles() throws Exception {
CatalogPlugin plugin = spark.sessionState().catalogManager().catalog(catalogName);
// StagedSparkTable is only produced by SparkCatalog; the session catalog uses
// RollbackStagedTable
assumeThat(plugin).isInstanceOf(SparkCatalog.class);
SparkCatalog catalog = (SparkCatalog) plugin;

Identifier ident = Identifier.of(new String[] {"default"}, "abort_staged");
StructType schema =
new StructType()
.add("id", DataTypes.LongType, false)
.add("data", DataTypes.StringType, true);

StagedTable staged = catalog.stageCreate(ident, schema, new Transform[0], ImmutableMap.of());

// route an append into the staged (uncommitted) transaction so a manifest and a manifest list
// are written to the table location without the table being committed to the catalog
Table table = ((SparkTable) staged).table();
DataFile dataFile =
DataFiles.builder(table.spec())
.withPath("/path/to/data-abort.parquet")
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();
table.newAppend().appendFile(dataFile).commit();

File metadataDir = new File(URI.create(table.location()).getPath(), "metadata");
File[] stagedFiles = metadataDir.listFiles((dir, name) -> name.endsWith(".avro"));
assertThat(stagedFiles)
.as("staged write should produce manifest and manifest-list files")
.isNotNull()
.isNotEmpty();

staged.abortStagedChanges();

File[] remaining = metadataDir.listFiles((dir, name) -> name.endsWith(".avro"));
assertThat(remaining == null || remaining.length == 0)
.as("abort should delete all uncommitted manifest and manifest-list files")
.isTrue();
assertThat(catalog.tableExists(ident))
.as("staged table must not be created in the catalog after abort")
.isFalse();

// abort is best-effort and idempotent: a second call must not throw
staged.abortStagedChanges();
}
}