diff --git a/crates/iceberg/src/avro/schema.rs b/crates/iceberg/src/avro/schema.rs index fdbc680977..1fc9802ceb 100644 --- a/crates/iceberg/src/avro/schema.rs +++ b/crates/iceberg/src/avro/schema.rs @@ -34,12 +34,83 @@ use crate::{Error, ErrorKind, Result, ensure_data_valid}; const ELEMENT_ID: &str = "element-id"; const FIELD_ID_PROP: &str = "field-id"; +const ICEBERG_FIELD_NAME_PROP: &str = "iceberg-field-name"; const KEY_ID: &str = "key-id"; const VALUE_ID: &str = "value-id"; const MAP_LOGICAL_TYPE: &str = "map"; // This const may better to maintain in avro-rs. const LOGICAL_TYPE: &str = "logicalType"; +fn is_valid_avro_name(name: &str) -> bool { + let mut chars = name.chars(); + match chars.next() { + None => false, + Some(first) => { + (first.is_ascii_alphabetic() || first == '_') + && chars.all(|c| c.is_ascii_alphanumeric() || c == '_') + } + } +} + +/// Sanitizes an Iceberg field name to a valid Avro field name. +/// +/// Matches Java `AvroSchemaUtil.sanitize()` semantics, operating on UTF-16 +/// code units (to match Java's `String.charAt()`). Characters that are not +/// ASCII letters, ASCII digits, or underscore are escaped as `_x` where +/// HEX is the uppercase hexadecimal representation of the UTF-16 code unit +/// with no leading zeros. +/// +/// Special handling for the first character: +/// - ASCII digit: prefix with `_`, digit is preserved (e.g., `1foo` -> `_1foo`) +/// - Non-letter, non-underscore: escaped as `_x` (e.g., `.foo` -> `_x2Efoo`) +/// +/// For supplementary characters (above U+FFFF), each surrogate half is escaped +/// independently (e.g., U+1F600 -> `_xD83D_xDE00`), matching Java's behavior +/// of iterating over `char` (UTF-16 code unit) values. +fn sanitize_avro_name(name: &str) -> String { + let utf16_units: Vec = name.encode_utf16().collect(); + if utf16_units.is_empty() { + return String::new(); + } + + let mut result = String::with_capacity(name.len() + 16); + + let first = utf16_units[0]; + if is_ascii_alpha_u16(first) || first == b'_' as u16 { + result.push(first as u8 as char); + } else if is_ascii_digit_u16(first) { + result.push('_'); + result.push(first as u8 as char); + } else { + result.push_str(&format!("_x{:X}", first)); + } + + for &unit in &utf16_units[1..] { + if is_ascii_alphanum_u16(unit) || unit == b'_' as u16 { + result.push(unit as u8 as char); + } else { + result.push_str(&format!("_x{:X}", unit)); + } + } + + result +} + +#[inline] +fn is_ascii_alpha_u16(c: u16) -> bool { + matches!(c, 0x41..=0x5A | 0x61..=0x7A) +} + +#[inline] +fn is_ascii_digit_u16(c: u16) -> bool { + matches!(c, 0x30..=0x39) +} + +#[inline] +fn is_ascii_alphanum_u16(c: u16) -> bool { + is_ascii_alpha_u16(c) || is_ascii_digit_u16(c) +} + struct SchemaToAvroSchema { schema: String, } @@ -86,8 +157,14 @@ impl SchemaVisitor for SchemaToAvroSchema { None }; + let (avro_name, original_name) = if is_valid_avro_name(&field.name) { + (field.name.clone(), None) + } else { + (sanitize_avro_name(&field.name), Some(field.name.clone())) + }; + let mut avro_record_field = AvroRecordField { - name: field.name.clone(), + name: avro_name, schema: field_schema, order: RecordFieldOrder::Ignore, position: 0, @@ -102,6 +179,12 @@ impl SchemaVisitor for SchemaToAvroSchema { Value::Number(Number::from(field.id)), ); + if let Some(name) = original_name { + avro_record_field + .custom_attributes + .insert(ICEBERG_FIELD_NAME_PROP.to_string(), Value::String(name)); + } + Ok(Either::Right(avro_record_field)) } @@ -442,8 +525,15 @@ impl AvroSchemaVisitor for AvroSchemaToSchema { let optional = is_avro_optional(&avro_field.schema); - let mut field = - NestedField::new(field_id, &avro_field.name, field_type.unwrap(), !optional); + // Prefer the original Iceberg field name stored during sanitization, + // falling back to the Avro field name if the property is absent. + let field_name = avro_field + .custom_attributes + .get(ICEBERG_FIELD_NAME_PROP) + .and_then(|v| v.as_str()) + .unwrap_or(&avro_field.name); + + let mut field = NestedField::new(field_id, field_name, field_type.unwrap(), !optional); if let Some(doc) = &avro_field.doc { field = field.with_doc(doc); @@ -1212,4 +1302,107 @@ mod tests { converter.primitive(&avro_schema).unwrap().unwrap() ); } + + #[test] + fn test_is_valid_avro_name() { + assert!(is_valid_avro_name("hello")); + assert!(is_valid_avro_name("_private")); + assert!(is_valid_avro_name("field_123")); + assert!(!is_valid_avro_name("123field")); + assert!(!is_valid_avro_name("field.name")); + assert!(!is_valid_avro_name("has space")); + assert!(!is_valid_avro_name("")); + } + + #[test] + fn test_sanitize_avro_name() { + assert_eq!(sanitize_avro_name("valid_name"), "valid_name"); + assert_eq!(sanitize_avro_name("123field"), "_123field"); + assert_eq!(sanitize_avro_name("field.name"), "field_x2Ename"); + assert_eq!(sanitize_avro_name("has space"), "has_x20space"); + assert_eq!(sanitize_avro_name(".dotfirst"), "_x2Edotfirst"); + assert_eq!(sanitize_avro_name("a-b"), "a_x2Db"); + } + + #[test] + fn test_sanitize_avro_name_unicode() { + // Non-ASCII BMP character: U+00E9 = 0xE9 in UTF-16 + assert_eq!(sanitize_avro_name("a\u{00E9}"), "a_xE9"); + // CJK character: U+4E2D = 0x4E2D in UTF-16 + assert_eq!(sanitize_avro_name("\u{4E2D}"), "_x4E2D"); + // Supplementary character U+1F600 = surrogate pair D83D, DE00 + assert_eq!(sanitize_avro_name("a\u{1F600}b"), "a_xD83D_xDE00b"); + // Supplementary character at start + assert_eq!(sanitize_avro_name("\u{1F600}"), "_xD83D_xDE00"); + } + + #[test] + fn test_sanitize_avro_name_empty() { + assert_eq!(sanitize_avro_name(""), ""); + } + + #[test] + fn test_sanitization_round_trip() { + let iceberg_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "123column", PrimitiveType::String.into()).into(), + NestedField::required(2, "normal_field", PrimitiveType::Int.into()).into(), + NestedField::optional(3, "field.with.dots", PrimitiveType::Long.into()).into(), + ]) + .build() + .unwrap(); + + let avro_schema = schema_to_avro_schema("test_schema", &iceberg_schema).unwrap(); + + if let AvroSchema::Record(record) = &avro_schema { + assert_eq!(record.fields[0].name, "_123column"); + assert_eq!( + record.fields[0].custom_attributes.get("iceberg-field-name"), + Some(&Value::String("123column".to_string())) + ); + assert_eq!(record.fields[1].name, "normal_field"); + assert_eq!( + record.fields[1].custom_attributes.get("iceberg-field-name"), + None + ); + assert_eq!(record.fields[2].name, "field_x2Ewith_x2Edots"); + assert_eq!( + record.fields[2].custom_attributes.get("iceberg-field-name"), + Some(&Value::String("field.with.dots".to_string())) + ); + } else { + panic!("Expected record schema"); + } + + let converted_back = avro_schema_to_schema(&avro_schema).unwrap(); + assert_eq!(iceberg_schema, converted_back); + } + + #[test] + fn test_avro_to_iceberg_uses_iceberg_field_name_property() { + // Simulate reading an Avro schema written by Java with sanitized names + let avro_json = r#"{ + "type": "record", + "name": "test_schema", + "fields": [ + { + "name": "_123column", + "type": "string", + "field-id": 1, + "iceberg-field-name": "123column" + }, + { + "name": "normal_field", + "type": "int", + "field-id": 2 + } + ] + }"#; + let avro_schema = AvroSchema::parse_str(avro_json).unwrap(); + let iceberg_schema = avro_schema_to_schema(&avro_schema).unwrap(); + + let fields = iceberg_schema.as_struct().fields(); + assert_eq!(fields[0].name, "123column"); + assert_eq!(fields[1].name, "normal_field"); + } }