Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ public UTF8String getUTF8String(int ordinal) {
}

private UTF8String getUTF8StringInternal(int ordinal) {
CharSequence seq = struct.get(ordinal, CharSequence.class);
return UTF8String.fromString(seq.toString());
Object value = struct.get(ordinal, Object.class);
return UTF8String.fromString(value.toString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,41 @@ public void testJoinsWithBucketingOnDecimalColumn() throws NoSuchTableException
checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)");
}

@TestTemplate
public void testJoinsWithBucketingOnStringColumn() throws NoSuchTableException {
checkJoin("string_col", "STRING", "bucket(8, string_col)");
}

@TestTemplate
public void testJoinsWithIdentityAndBucketOnStringColumn() throws NoSuchTableException {
String createTableStmt =
"CREATE TABLE %s (id BIGINT, dep STRING, user_id STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (dep, bucket(8, user_id))"
+ "TBLPROPERTIES (%s)";

sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES));
sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));

sql(
"INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (3, 'software', 'user3')",
tableName);
sql(
"INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (4, 'software', 'user4')",
tableName(OTHER_TABLE_NAME));

assertPartitioningAwarePlan(
1, /* expected num of shuffles with SPJ */
3, /* expected num of shuffles without SPJ */
"SELECT t1.id, t1.dep, t1.user_id "
+ "FROM %s t1 "
+ "INNER JOIN %s t2 "
+ "ON t1.id = t2.id AND t1.dep = t2.dep AND t1.user_id = t2.user_id "
+ "ORDER BY t1.id, t1.dep, t1.user_id",
tableName,
tableName(OTHER_TABLE_NAME));
}

@TestTemplate
public void testJoinsWithBucketingOnBinaryColumn() throws NoSuchTableException {
checkJoin("binary_col", "BINARY", "bucket(8, binary_col)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ public UTF8String getUTF8String(int ordinal) {
}

private UTF8String getUTF8StringInternal(int ordinal) {
CharSequence seq = struct.get(ordinal, CharSequence.class);
return UTF8String.fromString(seq.toString());
Object value = struct.get(ordinal, Object.class);
return UTF8String.fromString(value.toString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,41 @@ public void testJoinsWithBucketingOnDecimalColumn() throws NoSuchTableException
checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)");
}

@TestTemplate
public void testJoinsWithBucketingOnStringColumn() throws NoSuchTableException {
checkJoin("string_col", "STRING", "bucket(8, string_col)");
}

@TestTemplate
public void testJoinsWithIdentityAndBucketOnStringColumn() throws NoSuchTableException {
String createTableStmt =
"CREATE TABLE %s (id BIGINT, dep STRING, user_id STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (dep, bucket(8, user_id))"
+ "TBLPROPERTIES (%s)";

sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES));
sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));

sql(
"INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (3, 'software', 'user3')",
tableName);
sql(
"INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (4, 'software', 'user4')",
tableName(OTHER_TABLE_NAME));

assertPartitioningAwarePlan(
1, /* expected num of shuffles with SPJ */
3, /* expected num of shuffles without SPJ */
"SELECT t1.id, t1.dep, t1.user_id "
+ "FROM %s t1 "
+ "INNER JOIN %s t2 "
+ "ON t1.id = t2.id AND t1.dep = t2.dep AND t1.user_id = t2.user_id "
+ "ORDER BY t1.id, t1.dep, t1.user_id",
tableName,
tableName(OTHER_TABLE_NAME));
}

@TestTemplate
public void testJoinsWithBucketingOnBinaryColumn() throws NoSuchTableException {
checkJoin("binary_col", "BINARY", "bucket(8, binary_col)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ public UTF8String getUTF8String(int ordinal) {
}

private UTF8String getUTF8StringInternal(int ordinal) {
CharSequence seq = struct.get(ordinal, CharSequence.class);
return UTF8String.fromString(seq.toString());
Object value = struct.get(ordinal, Object.class);
return UTF8String.fromString(value.toString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,41 @@ public void testJoinsWithBucketingOnDecimalColumn() throws NoSuchTableException
checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)");
}

@TestTemplate
public void testJoinsWithBucketingOnStringColumn() throws NoSuchTableException {
checkJoin("string_col", "STRING", "bucket(8, string_col)");
}

@TestTemplate
public void testJoinsWithIdentityAndBucketOnStringColumn() throws NoSuchTableException {
String createTableStmt =
"CREATE TABLE %s (id BIGINT, dep STRING, user_id STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (dep, bucket(8, user_id))"
+ "TBLPROPERTIES (%s)";

sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES));
sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));

sql(
"INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (3, 'software', 'user3')",
tableName);
sql(
"INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (4, 'software', 'user4')",
tableName(OTHER_TABLE_NAME));

assertPartitioningAwarePlan(
1, /* expected num of shuffles with SPJ */
3, /* expected num of shuffles without SPJ */
"SELECT t1.id, t1.dep, t1.user_id "
+ "FROM %s t1 "
+ "INNER JOIN %s t2 "
+ "ON t1.id = t2.id AND t1.dep = t2.dep AND t1.user_id = t2.user_id "
+ "ORDER BY t1.id, t1.dep, t1.user_id",
tableName,
tableName(OTHER_TABLE_NAME));
}

@TestTemplate
public void testJoinsWithBucketingOnBinaryColumn() throws NoSuchTableException {
checkJoin("binary_col", "BINARY", "bucket(8, binary_col)");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ public UTF8String getUTF8String(int ordinal) {
}

private UTF8String getUTF8StringInternal(int ordinal) {
CharSequence seq = struct.get(ordinal, CharSequence.class);
return UTF8String.fromString(seq.toString());
Object value = struct.get(ordinal, Object.class);
return UTF8String.fromString(value.toString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,44 @@ public void testJoinsWithBucketingOnDecimalColumn() throws NoSuchTableException
checkJoin("decimal_col", "DECIMAL(20, 2)", "bucket(8, decimal_col)");
}

@TestTemplate
public void testJoinsWithBucketingOnStringColumn() throws NoSuchTableException {
checkJoin("string_col", "STRING", "bucket(8, string_col)");
}

@TestTemplate
public void testJoinsWithIdentityAndBucketOnStringColumn() throws NoSuchTableException {
// Regression test for GitHub issue #15349:
// bucket transform on a String column produces Integer partition values,
// but StructInternalRow.getUTF8StringInternal assumed CharSequence
String createTableStmt =
"CREATE TABLE %s (id BIGINT, dep STRING, user_id STRING)"
+ "USING iceberg "
+ "PARTITIONED BY (dep, bucket(8, user_id))"
+ "TBLPROPERTIES (%s)";

sql(createTableStmt, tableName, tablePropsAsString(TABLE_PROPERTIES));
sql(createTableStmt, tableName(OTHER_TABLE_NAME), tablePropsAsString(TABLE_PROPERTIES));

sql(
"INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (3, 'software', 'user3')",
tableName);
sql(
"INSERT INTO %s VALUES (1, 'software', 'user1'), (2, 'hr', 'user2'), (4, 'software', 'user4')",
tableName(OTHER_TABLE_NAME));

assertPartitioningAwarePlan(
1, /* expected num of shuffles with SPJ */
3, /* expected num of shuffles without SPJ */
"SELECT t1.id, t1.dep, t1.user_id "
+ "FROM %s t1 "
+ "INNER JOIN %s t2 "
+ "ON t1.id = t2.id AND t1.dep = t2.dep AND t1.user_id = t2.user_id "
+ "ORDER BY t1.id, t1.dep, t1.user_id",
tableName,
tableName(OTHER_TABLE_NAME));
}

@TestTemplate
public void testJoinsWithBucketingOnBinaryColumn() throws NoSuchTableException {
checkJoin("binary_col", "BINARY", "bucket(8, binary_col)");
Expand Down