diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java index 2bbc9cf208fe..35ada6106fd2 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java @@ -124,20 +124,7 @@ public static void applySchemaChanges( UpdateSchema pendingUpdate, List schemaChanges) { for (TableChange change : schemaChanges) { if (change instanceof TableChange.AddColumn) { - TableChange.AddColumn addColumn = (TableChange.AddColumn) change; - Column flinkColumn = addColumn.getColumn(); - Preconditions.checkArgument( - FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn), - "Unsupported table change: Adding computed column %s.", - flinkColumn.getName()); - Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType()); - if (flinkColumn.getDataType().getLogicalType().isNullable()) { - pendingUpdate.addColumn( - flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null)); - } else { - pendingUpdate.addRequiredColumn( - flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null)); - } + applyAddColumn(pendingUpdate, (TableChange.AddColumn) change); } else if (change instanceof TableChange.ModifyColumn) { TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) change; applyModifyColumn(pendingUpdate, modifyColumn); @@ -164,6 +151,31 @@ public static void applySchemaChanges( } } + private static void applyAddColumn(UpdateSchema pendingUpdate, TableChange.AddColumn addColumn) { + Column flinkColumn = addColumn.getColumn(); + Preconditions.checkArgument( + FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn), + "Unsupported table change: Adding computed column %s.", + flinkColumn.getName()); + + Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType()); + + if (flinkColumn.getDataType().getLogicalType().isNullable()) { + pendingUpdate.addColumn( + flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null)); + } else { + pendingUpdate.addRequiredColumn( + flinkColumn.getName(), icebergType, flinkColumn.getComment().orElse(null)); + } + + if (addColumn.getPosition() != null) { + TableChange.ColumnPosition position = addColumn.getPosition(); + TableChange.ModifyColumnPosition modifyColumnPosition = + new TableChange.ModifyColumnPosition(addColumn.getColumn(), position); + applyModifyColumnPosition(pendingUpdate, modifyColumnPosition); + } + } + /** * Applies a list of Flink table property changes to an {@link UpdateProperties} operation. * diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index f7848a5d22ef..091a7b67b4ba 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -404,6 +404,31 @@ public void testAlterTableAddColumn() { .hasMessageContaining("Try to add a column `id` which already exists in the table."); } + @TestTemplate + public void testAlterTableAddColumnPosition() { + sql("CREATE TABLE tl(id BIGINT, name STRING)"); + Schema schemaBefore = table("tl").schema(); + assertThat(schemaBefore.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())) + .asStruct()); + + sql("ALTER TABLE tl ADD (col1 STRING FIRST)"); + sql("ALTER TABLE tl ADD (col2 INT AFTER id)"); + + Schema schemaAfter = table("tl").schema(); + assertThat(schemaAfter.asStruct()) + .isEqualTo( + new Schema( + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(4, "col2", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get())) + .asStruct()); + } + @TestTemplate public void testAlterTableDropColumn() { sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)");