diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java index 4bb235b811d0..a56c4e0ca6ed 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java @@ -62,6 +62,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.StructLike; import org.apache.iceberg.Table; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; @@ -450,6 +451,11 @@ void createIcebergTable(ObjectPath tablePath, ResolvedCatalogTable table, boolea } } + String comment = table.getComment(); + if (comment != null && !comment.isEmpty()) { + properties.put(TableProperties.COMMENT, comment); + } + try { icebergCatalog.createTable( toIdentifier(tablePath), icebergSchema, spec, location, properties.build()); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java index f7848a5d22ef..9abf68230800 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java @@ -48,6 +48,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; @@ -243,6 +244,24 @@ public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException { .containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps); } + @TestTemplate + public void testCreateTableWithTableComment() { + // create table with comment + sql("CREATE TABLE tl(id BIGINT) COMMENT 'table comment'"); + assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT, "table comment"); + } + + @TestTemplate + public void testAlterTableModifyTableComment() { + // create table with comment + sql("CREATE TABLE tl(id BIGINT) COMMENT 'table comment'"); + assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT, "table comment"); + + // alter table comment + sql("ALTER TABLE tl SET('comment' = 'new comment')"); + assertThat(table("tl").properties()).containsEntry(TableProperties.COMMENT, "new comment"); + } + @TestTemplate public void testCreateTableLocation() { assumeThat(isHadoopCatalog)