diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java index f5dc8b42ee1a..aa6811d43f9e 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java @@ -174,8 +174,8 @@ public UTF8String getUTF8String(int ordinal) { } private UTF8String getUTF8StringInternal(int ordinal) { - CharSequence seq = struct.get(ordinal, CharSequence.class); - return UTF8String.fromString(seq.toString()); + Object value = struct.get(ordinal, Object.class); + return UTF8String.fromString(value.toString()); } @Override diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java index 923bd9a37fa8..ccd2aa932b70 100644 --- a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java @@ -168,6 +168,41 @@ public void testJoinsWithBucketingOnDecimalColumn() throws NoSuchTableException checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)"); } + @TestTemplate + public void testJoinsWithBucketingOnStringColumn() throws NoSuchTableException { + checkJoin("string_col", "STRING", "bucket(8, string_col)"); + } + + @TestTemplate + public void testJoinsWithIdentityAndBucketOnStringColumn() throws NoSuchTableException { + String createTableStmt = + "CREATE TABLE %s (id BIGINT, dep STRING, user_id STRING)" + + "USING iceberg " + + "PARTITIONED BY (dep, bucket(8, user_id))" + + "TBLPROPERTIES (%s)"; + + sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES)); + sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES)); + + sql( + "INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (3, 'software', 'user3')", + tableName); + sql( + "INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (4, 'software', 'user4')", + tableName(OTHER_TABLE_NAME)); + + assertPartitioningAwarePlan( + 1, /* expected num of shuffles with SPJ */ + 3, /* expected num of shuffles without SPJ */ + "SELECT t1.id, t1.dep, t1.user_id " + + "FROM %s t1 " + + "INNER JOIN %s t2 " + + "ON t1.id = t2.id AND t1.dep = t2.dep AND t1.user_id = t2.user_id " + + "ORDER BY t1.id, t1.dep, t1.user_id", + tableName, + tableName(OTHER_TABLE_NAME)); + } + @TestTemplate public void testJoinsWithBucketingOnBinaryColumn() throws NoSuchTableException { checkJoin("binary_col", "BINARY", "bucket(8, binary_col)"); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java index f5dc8b42ee1a..aa6811d43f9e 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java @@ -174,8 +174,8 @@ public UTF8String getUTF8String(int ordinal) { } private UTF8String getUTF8StringInternal(int ordinal) { - CharSequence seq = struct.get(ordinal, CharSequence.class); - return UTF8String.fromString(seq.toString()); + Object value = struct.get(ordinal, Object.class); + return UTF8String.fromString(value.toString()); } @Override diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java index 2515454401a3..1a4fe1fe43c0 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java @@ -168,6 +168,41 @@ public void testJoinsWithBucketingOnDecimalColumn() throws NoSuchTableException checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)"); } + @TestTemplate + public void testJoinsWithBucketingOnStringColumn() throws NoSuchTableException { + checkJoin("string_col", "STRING", "bucket(8, string_col)"); + } + + @TestTemplate + public void testJoinsWithIdentityAndBucketOnStringColumn() throws NoSuchTableException { + String createTableStmt = + "CREATE TABLE %s (id BIGINT, dep STRING, user_id STRING)" + + "USING iceberg " + + "PARTITIONED BY (dep, bucket(8, user_id))" + + "TBLPROPERTIES (%s)"; + + sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES)); + sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES)); + + sql( + "INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (3, 'software', 'user3')", + tableName); + sql( + "INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (4, 'software', 'user4')", + tableName(OTHER_TABLE_NAME)); + + assertPartitioningAwarePlan( + 1, /* expected num of shuffles with SPJ */ + 3, /* expected num of shuffles without SPJ */ + "SELECT t1.id, t1.dep, t1.user_id " + + "FROM %s t1 " + + "INNER JOIN %s t2 " + + "ON t1.id = t2.id AND t1.dep = t2.dep AND t1.user_id = t2.user_id " + + "ORDER BY t1.id, t1.dep, t1.user_id", + tableName, + tableName(OTHER_TABLE_NAME)); + } + @TestTemplate public void testJoinsWithBucketingOnBinaryColumn() throws NoSuchTableException { checkJoin("binary_col", "BINARY", "bucket(8, binary_col)"); diff --git a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java index 2d3c917e58f9..cca6801d42fb 100644 --- a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java +++ b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java @@ -178,8 +178,8 @@ public UTF8String getUTF8String(int ordinal) { } private UTF8String getUTF8StringInternal(int ordinal) { - CharSequence seq = struct.get(ordinal, CharSequence.class); - return UTF8String.fromString(seq.toString()); + Object value = struct.get(ordinal, Object.class); + return UTF8String.fromString(value.toString()); } @Override diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java index 8788dff15806..3382b36321b1 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java @@ -175,6 +175,41 @@ public void testJoinsWithBucketingOnDecimalColumn() throws NoSuchTableException checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)"); } + @TestTemplate + public void testJoinsWithBucketingOnStringColumn() throws NoSuchTableException { + checkJoin("string_col", "STRING", "bucket(8, string_col)"); + } + + @TestTemplate + public void testJoinsWithIdentityAndBucketOnStringColumn() throws NoSuchTableException { + String createTableStmt = + "CREATE TABLE %s (id BIGINT, dep STRING, user_id STRING)" + + "USING iceberg " + + "PARTITIONED BY (dep, bucket(8, user_id))" + + "TBLPROPERTIES (%s)"; + + sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES)); + sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES)); + + sql( + "INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (3, 'software', 'user3')", + tableName); + sql( + "INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (4, 'software', 'user4')", + tableName(OTHER_TABLE_NAME)); + + assertPartitioningAwarePlan( + 1, /* expected num of shuffles with SPJ */ + 3, /* expected num of shuffles without SPJ */ + "SELECT t1.id, t1.dep, t1.user_id " + + "FROM %s t1 " + + "INNER JOIN %s t2 " + + "ON t1.id = t2.id AND t1.dep = t2.dep AND t1.user_id = t2.user_id " + + "ORDER BY t1.id, t1.dep, t1.user_id", + tableName, + tableName(OTHER_TABLE_NAME)); + } + @TestTemplate public void testJoinsWithBucketingOnBinaryColumn() throws NoSuchTableException { checkJoin("binary_col", "BINARY", "bucket(8, binary_col)"); diff --git a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java index 074f04d03468..7b9f8d702b46 100644 --- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java +++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/StructInternalRow.java @@ -180,8 +180,8 @@ public UTF8String getUTF8String(int ordinal) { } private UTF8String getUTF8StringInternal(int ordinal) { - CharSequence seq = struct.get(ordinal, CharSequence.class); - return UTF8String.fromString(seq.toString()); + Object value = struct.get(ordinal, Object.class); + return UTF8String.fromString(value.toString()); } @Override diff --git a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java index 8788dff15806..269458a78878 100644 --- a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java +++ b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestStoragePartitionedJoins.java @@ -175,6 +175,44 @@ public void testJoinsWithBucketingOnDecimalColumn() throws NoSuchTableException checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)"); } + @TestTemplate + public void testJoinsWithBucketingOnStringColumn() throws NoSuchTableException { + checkJoin("string_col", "STRING", "bucket(8, string_col)"); + } + + @TestTemplate + public void testJoinsWithIdentityAndBucketOnStringColumn() throws NoSuchTableException { + // Regression test for GitHub issue #15349: + // bucket transform on a String column produces Integer partition values, + // but StructInternalRow.getUTF8StringInternal assumed CharSequence + String createTableStmt = + "CREATE TABLE %s (id BIGINT, dep STRING, user_id STRING)" + + "USING iceberg " + + "PARTITIONED BY (dep, bucket(8, user_id))" + + "TBLPROPERTIES (%s)"; + + sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES)); + sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES)); + + sql( + "INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (3, 'software', 'user3')", + tableName); + sql( + "INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (4, 'software', 'user4')", + tableName(OTHER_TABLE_NAME)); + + assertPartitioningAwarePlan( + 1, /* expected num of shuffles with SPJ */ + 3, /* expected num of shuffles without SPJ */ + "SELECT t1.id, t1.dep, t1.user_id " + + "FROM %s t1 " + + "INNER JOIN %s t2 " + + "ON t1.id = t2.id AND t1.dep = t2.dep AND t1.user_id = t2.user_id " + + "ORDER BY t1.id, t1.dep, t1.user_id", + tableName, + tableName(OTHER_TABLE_NAME)); + } + @TestTemplate public void testJoinsWithBucketingOnBinaryColumn() throws NoSuchTableException { checkJoin("binary_col", "BINARY", "bucket(8, binary_col)");