-
Notifications
You must be signed in to change notification settings - Fork 476
fix: sanitize Avro field names on write, respect iceberg-field-name on read #2540
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,12 +34,83 @@ use crate::{Error, ErrorKind, Result, ensure_data_valid}; | |
|
|
||
| const ELEMENT_ID: &str = "element-id"; | ||
| const FIELD_ID_PROP: &str = "field-id"; | ||
| const ICEBERG_FIELD_NAME_PROP: &str = "iceberg-field-name"; | ||
| const KEY_ID: &str = "key-id"; | ||
| const VALUE_ID: &str = "value-id"; | ||
| const MAP_LOGICAL_TYPE: &str = "map"; | ||
| // This const may better to maintain in avro-rs. | ||
| const LOGICAL_TYPE: &str = "logicalType"; | ||
|
|
||
| fn is_valid_avro_name(name: &str) -> bool { | ||
| let mut chars = name.chars(); | ||
| match chars.next() { | ||
| None => false, | ||
| Some(first) => { | ||
| (first.is_ascii_alphabetic() || first == '_') | ||
| && chars.all(|c| c.is_ascii_alphanumeric() || c == '_') | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Sanitizes an Iceberg field name to a valid Avro field name. | ||
| /// | ||
| /// Matches Java `AvroSchemaUtil.sanitize()` semantics, operating on UTF-16 | ||
| /// code units (to match Java's `String.charAt()`). Characters that are not | ||
| /// ASCII letters, ASCII digits, or underscore are escaped as `_x<HEX>` where | ||
| /// HEX is the uppercase hexadecimal representation of the UTF-16 code unit | ||
| /// with no leading zeros. | ||
| /// | ||
| /// Special handling for the first character: | ||
| /// - ASCII digit: prefix with `_`, digit is preserved (e.g., `1foo` -> `_1foo`) | ||
| /// - Non-letter, non-underscore: escaped as `_x<HEX>` (e.g., `.foo` -> `_x2Efoo`) | ||
| /// | ||
| /// For supplementary characters (above U+FFFF), each surrogate half is escaped | ||
| /// independently (e.g., U+1F600 -> `_xD83D_xDE00`), matching Java's behavior | ||
| /// of iterating over `char` (UTF-16 code unit) values. | ||
| fn sanitize_avro_name(name: &str) -> String { | ||
| let utf16_units: Vec<u16> = name.encode_utf16().collect(); | ||
| if utf16_units.is_empty() { | ||
| return String::new(); | ||
| } | ||
|
|
||
| let mut result = String::with_capacity(name.len() + 16); | ||
|
|
||
| let first = utf16_units[0]; | ||
| if is_ascii_alpha_u16(first) || first == b'_' as u16 { | ||
| result.push(first as u8 as char); | ||
| } else if is_ascii_digit_u16(first) { | ||
| result.push('_'); | ||
| result.push(first as u8 as char); | ||
| } else { | ||
| result.push_str(&format!("_x{:X}", first)); | ||
| } | ||
|
|
||
| for &unit in &utf16_units[1..] { | ||
| if is_ascii_alphanum_u16(unit) || unit == b'_' as u16 { | ||
| result.push(unit as u8 as char); | ||
| } else { | ||
| result.push_str(&format!("_x{:X}", unit)); | ||
| } | ||
| } | ||
|
|
||
| result | ||
| } | ||
|
|
||
| #[inline] | ||
| fn is_ascii_alpha_u16(c: u16) -> bool { | ||
| matches!(c, 0x41..=0x5A | 0x61..=0x7A) | ||
| } | ||
|
|
||
| #[inline] | ||
| fn is_ascii_digit_u16(c: u16) -> bool { | ||
| matches!(c, 0x30..=0x39) | ||
| } | ||
|
Comment on lines
+105
to
+107
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. /**
* Determines if the specified character is a digit.
* <p>
* A character is a digit if its general category type, provided
* by {@code Character.getType(ch)}, is
* {@code DECIMAL_DIGIT_NUMBER}.
* <p>
* Some Unicode character ranges that contain digits:
* <ul>
* <li>{@code '\u005Cu0030'} through {@code '\u005Cu0039'},
* ISO-LATIN-1 digits ({@code '0'} through {@code '9'})
* <li>{@code '\u005Cu0660'} through {@code '\u005Cu0669'},
* Arabic-Indic digits
* <li>{@code '\u005Cu06F0'} through {@code '\u005Cu06F9'},
* Extended Arabic-Indic digits
* <li>{@code '\u005Cu0966'} through {@code '\u005Cu096F'},
* Devanagari digits
* <li>{@code '\u005CuFF10'} through {@code '\u005CuFF19'},
* Fullwidth digits
* </ul>
*
* Many other character ranges contain digits as well.
*
* <p><b>Note:</b> This method cannot handle <a
* href="#supplementary"> supplementary characters</a>. To support
* all Unicode characters, including supplementary characters, use
* the {@link #isDigit(int)} method.
*
* @param ch the character to be tested.
* @return {@code true} if the character is a digit;
* {@code false} otherwise.
* @see Character#digit(char, int)
* @see Character#forDigit(int, int)
* @see Character#getType(char)
*/
public static boolean isDigit(char ch) {
return isDigit((int)ch);
}https://github.com/apache/iceberg/blob/main/core/src/main/java/org/apache/iceberg/avro/AvroSchemaUtil.java#L551 Trying to reason about this in my head and I don't think it matters for interop that the representations are identical because both will correctly underscore the field and restore it from the map? Can you check my reasoning here? |
||
|
|
||
| #[inline] | ||
| fn is_ascii_alphanum_u16(c: u16) -> bool { | ||
| is_ascii_alpha_u16(c) || is_ascii_digit_u16(c) | ||
| } | ||
|
|
||
| struct SchemaToAvroSchema { | ||
| schema: String, | ||
| } | ||
|
|
@@ -86,8 +157,14 @@ impl SchemaVisitor for SchemaToAvroSchema { | |
| None | ||
| }; | ||
|
|
||
| let (avro_name, original_name) = if is_valid_avro_name(&field.name) { | ||
| (field.name.clone(), None) | ||
| } else { | ||
| (sanitize_avro_name(&field.name), Some(field.name.clone())) | ||
| }; | ||
|
|
||
| let mut avro_record_field = AvroRecordField { | ||
| name: field.name.clone(), | ||
| name: avro_name, | ||
| schema: field_schema, | ||
| order: RecordFieldOrder::Ignore, | ||
| position: 0, | ||
|
|
@@ -102,6 +179,12 @@ impl SchemaVisitor for SchemaToAvroSchema { | |
| Value::Number(Number::from(field.id)), | ||
| ); | ||
|
|
||
| if let Some(name) = original_name { | ||
| avro_record_field | ||
| .custom_attributes | ||
| .insert(ICEBERG_FIELD_NAME_PROP.to_string(), Value::String(name)); | ||
| } | ||
|
|
||
| Ok(Either::Right(avro_record_field)) | ||
| } | ||
|
|
||
|
|
@@ -442,8 +525,15 @@ impl AvroSchemaVisitor for AvroSchemaToSchema { | |
|
|
||
| let optional = is_avro_optional(&avro_field.schema); | ||
|
|
||
| let mut field = | ||
| NestedField::new(field_id, &avro_field.name, field_type.unwrap(), !optional); | ||
| // Prefer the original Iceberg field name stored during sanitization, | ||
| // falling back to the Avro field name if the property is absent. | ||
| let field_name = avro_field | ||
| .custom_attributes | ||
| .get(ICEBERG_FIELD_NAME_PROP) | ||
| .and_then(|v| v.as_str()) | ||
| .unwrap_or(&avro_field.name); | ||
|
|
||
| let mut field = NestedField::new(field_id, field_name, field_type.unwrap(), !optional); | ||
|
|
||
| if let Some(doc) = &avro_field.doc { | ||
| field = field.with_doc(doc); | ||
|
|
@@ -1212,4 +1302,107 @@ mod tests { | |
| converter.primitive(&avro_schema).unwrap().unwrap() | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_is_valid_avro_name() { | ||
| assert!(is_valid_avro_name("hello")); | ||
| assert!(is_valid_avro_name("_private")); | ||
| assert!(is_valid_avro_name("field_123")); | ||
| assert!(!is_valid_avro_name("123field")); | ||
| assert!(!is_valid_avro_name("field.name")); | ||
| assert!(!is_valid_avro_name("has space")); | ||
| assert!(!is_valid_avro_name("")); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_sanitize_avro_name() { | ||
| assert_eq!(sanitize_avro_name("valid_name"), "valid_name"); | ||
| assert_eq!(sanitize_avro_name("123field"), "_123field"); | ||
| assert_eq!(sanitize_avro_name("field.name"), "field_x2Ename"); | ||
| assert_eq!(sanitize_avro_name("has space"), "has_x20space"); | ||
| assert_eq!(sanitize_avro_name(".dotfirst"), "_x2Edotfirst"); | ||
| assert_eq!(sanitize_avro_name("a-b"), "a_x2Db"); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_sanitize_avro_name_unicode() { | ||
| // Non-ASCII BMP character: U+00E9 = 0xE9 in UTF-16 | ||
| assert_eq!(sanitize_avro_name("a\u{00E9}"), "a_xE9"); | ||
| // CJK character: U+4E2D = 0x4E2D in UTF-16 | ||
| assert_eq!(sanitize_avro_name("\u{4E2D}"), "_x4E2D"); | ||
| // Supplementary character U+1F600 = surrogate pair D83D, DE00 | ||
| assert_eq!(sanitize_avro_name("a\u{1F600}b"), "a_xD83D_xDE00b"); | ||
| // Supplementary character at start | ||
| assert_eq!(sanitize_avro_name("\u{1F600}"), "_xD83D_xDE00"); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_sanitize_avro_name_empty() { | ||
| assert_eq!(sanitize_avro_name(""), ""); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_sanitization_round_trip() { | ||
| let iceberg_schema = Schema::builder() | ||
| .with_fields(vec![ | ||
| NestedField::required(1, "123column", PrimitiveType::String.into()).into(), | ||
| NestedField::required(2, "normal_field", PrimitiveType::Int.into()).into(), | ||
| NestedField::optional(3, "field.with.dots", PrimitiveType::Long.into()).into(), | ||
| ]) | ||
| .build() | ||
| .unwrap(); | ||
|
|
||
| let avro_schema = schema_to_avro_schema("test_schema", &iceberg_schema).unwrap(); | ||
|
|
||
| if let AvroSchema::Record(record) = &avro_schema { | ||
| assert_eq!(record.fields[0].name, "_123column"); | ||
| assert_eq!( | ||
| record.fields[0].custom_attributes.get("iceberg-field-name"), | ||
| Some(&Value::String("123column".to_string())) | ||
| ); | ||
| assert_eq!(record.fields[1].name, "normal_field"); | ||
| assert_eq!( | ||
| record.fields[1].custom_attributes.get("iceberg-field-name"), | ||
| None | ||
| ); | ||
| assert_eq!(record.fields[2].name, "field_x2Ewith_x2Edots"); | ||
| assert_eq!( | ||
| record.fields[2].custom_attributes.get("iceberg-field-name"), | ||
| Some(&Value::String("field.with.dots".to_string())) | ||
| ); | ||
| } else { | ||
| panic!("Expected record schema"); | ||
| } | ||
|
|
||
| let converted_back = avro_schema_to_schema(&avro_schema).unwrap(); | ||
| assert_eq!(iceberg_schema, converted_back); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_avro_to_iceberg_uses_iceberg_field_name_property() { | ||
| // Simulate reading an Avro schema written by Java with sanitized names | ||
| let avro_json = r#"{ | ||
| "type": "record", | ||
| "name": "test_schema", | ||
| "fields": [ | ||
| { | ||
| "name": "_123column", | ||
| "type": "string", | ||
| "field-id": 1, | ||
| "iceberg-field-name": "123column" | ||
| }, | ||
| { | ||
| "name": "normal_field", | ||
| "type": "int", | ||
| "field-id": 2 | ||
| } | ||
| ] | ||
| }"#; | ||
| let avro_schema = AvroSchema::parse_str(avro_json).unwrap(); | ||
| let iceberg_schema = avro_schema_to_schema(&avro_schema).unwrap(); | ||
|
|
||
| let fields = iceberg_schema.as_struct().fields(); | ||
| assert_eq!(fields[0].name, "123column"); | ||
| assert_eq!(fields[1].name, "normal_field"); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if we could use something from https://github.com/apache/avro-rs/blob/4edb1ce1ae1ab5bd3fafb08ca3f622946c01c9fd/avro/src/validator.rs#L4 but it looks like
validate_record_field_nameispub(crate). Is it work filing an issue upstream to see if we could expose something that would allow us to validate against their implementation?