diff --git a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java index 1a57a6444870..51f64a9d4b05 100644 --- a/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java +++ b/kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/data/RecordConverter.java @@ -22,23 +22,29 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.math.BigDecimal; +import java.math.BigInteger; import java.math.RoundingMode; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; +import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatterBuilder; import java.time.format.DateTimeParseException; import java.time.temporal.Temporal; import java.util.Base64; +import java.util.Collection; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.stream.Collectors; import org.apache.iceberg.FileFormat; @@ -53,6 +59,7 @@ import org.apache.iceberg.mapping.NameMappingParser; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Type.PrimitiveType; import org.apache.iceberg.types.Types.DecimalType; @@ -64,6 +71,13 @@ import org.apache.iceberg.util.ByteBuffers; import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.ShreddedObject; +import org.apache.iceberg.variants.ValueArray; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantMetadata; +import org.apache.iceberg.variants.VariantValue; +import org.apache.iceberg.variants.Variants; +import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; @@ -142,6 +156,8 @@ private Object convertValue( return convertTimeValue(value); case TIMESTAMP: return convertTimestampValue(value, (TimestampType) type); + case VARIANT: + return convertVariantValue(value); } throw new UnsupportedOperationException("Unsupported type: " + type.typeId()); } @@ -464,6 +480,234 @@ protected Temporal convertTimestampValue(Object value, TimestampType type) { return convertLocalDateTime(value); } + protected Variant convertVariantValue(Object value) { + if (value instanceof Variant variant) { + return variant; + } + + List sortedFieldNames = + collectFieldNames(value).stream().sorted().collect(Collectors.toList()); + VariantMetadata metadata = Variants.metadata(sortedFieldNames); + return Variant.of(metadata, objectToVariantValue(value, metadata, null)); + } + + /** + * Recursively collects field names from collections, maps, and structs. Returns an empty set for + * null, scalar values, and empty maps, lists, or structs. Map keys must be strings; non-string + * keys cause IllegalArgumentException. + */ + private static Set collectFieldNames(Object value) { + if (value == null) { + return Collections.emptySet(); + } + if (value instanceof Collection collection) { + if (collection.isEmpty()) { + return Collections.emptySet(); + } + Set names = Sets.newHashSet(); + collection.forEach(element -> names.addAll(collectFieldNames(element))); + return names; + } else if (value instanceof Map map) { + if (map.isEmpty()) { + return Collections.emptySet(); + } + Set names = Sets.newHashSet(); + map.forEach( + (key, val) -> { + if (key instanceof String keyStr) { + names.add(keyStr); + names.addAll(collectFieldNames(val)); + } else { + throw new IllegalArgumentException( + "Cannot convert map to variant: keys must be non-null strings, was: " + + (key == null ? "null" : key.getClass().getName())); + } + }); + return names; + } else if (value instanceof Struct struct) { + List fields = struct.schema().fields(); + if (fields.isEmpty()) { + return Collections.emptySet(); + } + Set names = Sets.newHashSet(); + fields.forEach( + field -> { + names.add(field.name()); + names.addAll(collectFieldNames(struct.get(field))); + }); + return names; + } + return Collections.emptySet(); + } + + /** + * Recursively converts a Java object to a VariantValue using the given shared metadata for all + * nested maps. Handles primitives, List (array), and Map (object); map keys become field names. + */ + private static VariantValue objectToVariantValue( + Object value, VariantMetadata metadata, org.apache.kafka.connect.data.Schema schema) { + if (value == null) { + return Variants.ofNull(); + } + VariantValue primitive = primitiveToVariantValue(value, schema); + if (primitive != null) { + return primitive; + } + if (value instanceof Collection collection) { + ValueArray array = Variants.array(); + org.apache.kafka.connect.data.Schema elementSchema = + schema != null ? schema.valueSchema() : null; + for (Object element : collection) { + array.add(objectToVariantValue(element, metadata, elementSchema)); + } + return array; + } + if (value instanceof Map map) { + return mapToVariantValue(map, metadata, schema); + } + if (value instanceof Struct struct) { + ShreddedObject object = Variants.object(metadata); + for (Field field : struct.schema().fields()) { + object.put(field.name(), objectToVariantValue(struct.get(field), metadata, field.schema())); + } + return object; + } + throw new IllegalArgumentException("Cannot convert to variant: " + value.getClass().getName()); + } + + /** Converts a Map to VariantValue; throw IllegalArgumentException if the key is not a string. */ + private static VariantValue mapToVariantValue( + Map map, VariantMetadata metadata, org.apache.kafka.connect.data.Schema schema) { + ShreddedObject object = Variants.object(metadata); + org.apache.kafka.connect.data.Schema mapValueSchema = + schema != null ? schema.valueSchema() : null; + map.forEach( + (key, val) -> { + if (key instanceof String keyStr) { + object.put(keyStr, objectToVariantValue(val, metadata, mapValueSchema)); + } else { + throw new IllegalArgumentException( + "Cannot convert map to variant: keys must be non-null strings, was: " + + (key == null ? "null" : key.getClass().getName())); + } + }); + return object; + } + + /** + * Converts a primitive or primitive-like value to VariantValue; returns null if not supported. + * The optional schema is used to disambiguate java.util.Date which Kafka Connect uses for Date, + * Time, and Timestamp logical types. + */ + private static VariantValue primitiveToVariantValue( + Object value, org.apache.kafka.connect.data.Schema schema) { + if (value instanceof Boolean booleanValue) { + return Variants.of(booleanValue); + } + VariantValue temporal = temporalObjectToVariantValue(value, schema); + if (temporal != null) { + return temporal; + } + if (value instanceof Number number) { + return numberToVariantValue(number); + } + if (value instanceof String stringValue) { + return Variants.of(stringValue); + } + if (value instanceof ByteBuffer byteBuffer) { + return Variants.of(byteBuffer); + } + if (value instanceof byte[] byteArray) { + return Variants.of(ByteBuffer.wrap(byteArray)); + } + if (value instanceof UUID uuid) { + return Variants.ofUUID(uuid); + } + return null; + } + + /** + * Converts java.time values and java.util.Date (with Connect logical type from the optional + * schema) to VariantValue; returns null if the value is not a supported temporal representation. + */ + private static VariantValue temporalObjectToVariantValue( + Object value, org.apache.kafka.connect.data.Schema schema) { + if (value instanceof Instant instant) { + return Variants.ofTimestamptz(DateTimeUtil.microsFromInstant(instant)); + } + if (value instanceof OffsetDateTime offsetDateTime) { + return Variants.ofTimestamptz(DateTimeUtil.microsFromTimestamptz(offsetDateTime)); + } + if (value instanceof ZonedDateTime zonedDateTime) { + return Variants.ofTimestamptz( + DateTimeUtil.microsFromTimestamptz(zonedDateTime.toOffsetDateTime())); + } + if (value instanceof LocalDateTime localDateTime) { + return Variants.ofTimestampntz(DateTimeUtil.microsFromTimestamp(localDateTime)); + } + if (value instanceof LocalDate localDate) { + return Variants.ofDate(DateTimeUtil.daysFromDate(localDate)); + } + if (value instanceof LocalTime localTime) { + return Variants.ofTime(DateTimeUtil.microsFromTime(localTime)); + } + if (value instanceof Date date) { + String logicalName = schema != null ? schema.name() : null; + // Connect represents Timestamp, Time, and Date logical types as java.util.Date at runtime; + // normalize to Instant once, then interpret using the schema logical type name. + Instant connectInstant = date.toInstant(); + if (org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME.equals(logicalName)) { + return Variants.ofTimestamptz(DateTimeUtil.microsFromInstant(connectInstant)); + } + if (org.apache.kafka.connect.data.Time.LOGICAL_NAME.equals(logicalName)) { + LocalTime utcTime = connectInstant.atZone(ZoneOffset.UTC).toLocalTime(); + return Variants.ofTime(DateTimeUtil.microsFromTime(utcTime)); + } + if (org.apache.kafka.connect.data.Date.LOGICAL_NAME.equals(logicalName)) { + return Variants.ofDate(DateTimeUtil.daysFromInstant(connectInstant)); + } + throw new IllegalArgumentException( + "Cannot convert java.util.Date to variant without a recognized logical type schema" + + " (expected Timestamp, Time, or Date but got: " + + logicalName + + ")"); + } + return null; + } + + /** + * Converts a Number to VariantValue; throw IllegalArgumentException if the value is not a + * supported number representation. + */ + private static VariantValue numberToVariantValue(Number number) { + if (number instanceof BigDecimal bigDecimal) { + return Variants.of(bigDecimal); + } + if (number instanceof BigInteger bigInteger) { + return Variants.of(new BigDecimal(bigInteger)); + } + if (number instanceof Integer integer) { + return Variants.of(integer); + } + if (number instanceof Long longValue) { + return Variants.of(longValue); + } + if (number instanceof Float floatValue) { + return Variants.of(floatValue); + } + if (number instanceof Double doubleValue) { + return Variants.of(doubleValue); + } + if (number instanceof Byte byteValue) { + return Variants.of(byteValue); + } + if (number instanceof Short shortValue) { + return Variants.of(shortValue); + } + throw new IllegalArgumentException( + "Cannot convert Number to variant (unknown type): " + number.getClass().getName()); + } + @SuppressWarnings("JavaUtilDate") private OffsetDateTime convertOffsetDateTime(Object value) { if (value instanceof Number) { diff --git a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java index 45d07f69591b..56a9b6e100ac 100644 --- a/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java +++ b/kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/data/TestRecordConverter.java @@ -32,6 +32,7 @@ import java.time.LocalTime; import java.time.OffsetDateTime; import java.time.ZoneOffset; +import java.time.ZonedDateTime; import java.time.temporal.Temporal; import java.util.Base64; import java.util.Collection; @@ -74,7 +75,12 @@ import org.apache.iceberg.types.Types.TimeType; import org.apache.iceberg.types.Types.TimestampType; import org.apache.iceberg.types.Types.UUIDType; +import org.apache.iceberg.types.Types.VariantType; +import org.apache.iceberg.util.DateTimeUtil; import org.apache.iceberg.util.UUIDUtil; +import org.apache.iceberg.variants.PhysicalType; +import org.apache.iceberg.variants.Variant; +import org.apache.iceberg.variants.VariantValue; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; @@ -152,6 +158,9 @@ public class TestRecordConverter { NestedField.required( 100, "stma", MapType.ofRequired(101, 102, StringType.get(), ID_SCHEMA.asStruct()))); + private static final org.apache.iceberg.Schema VARIANT_SCHEMA = + new org.apache.iceberg.Schema(NestedField.required(1, "v", VariantType.get())); + private static final Schema CONNECT_SCHEMA = SchemaBuilder.struct() .field("i", Schema.INT32_SCHEMA) @@ -881,6 +890,364 @@ public void testEvolveTypeDetectionStructNested() { assertThat(updateMap.get("st.ff").type()).isInstanceOf(DoubleType.class); } + private RecordConverter variantConverter() { + Table table = mock(Table.class); + when(table.schema()).thenReturn(VARIANT_SCHEMA); + return new RecordConverter(table, config); + } + + @Test + public void testConvertVariantValueFromNull() { + Variant variant = variantConverter().convertVariantValue(null); + assertThat(variant).isNotNull(); + assertThat(variant.value().type()).isEqualTo(PhysicalType.NULL); + } + + @Test + public void testConvertVariantValuePassThrough() { + Variant original = variantConverter().convertVariantValue("hello"); + assertThat(variantConverter().convertVariantValue(original)).isSameAs(original); + } + + @Test + public void testConvertVariantValueFromPrimitiveString() { + Variant variant = variantConverter().convertVariantValue("hello"); + assertThat(variant).isNotNull(); + assertThat(variant.metadata()).isNotNull(); + assertThat(variant.metadata().dictionarySize()).isEqualTo(0); + assertThat(variant.value().type()).isEqualTo(PhysicalType.STRING); + assertThat(variant.value().asPrimitive().get()).isEqualTo("hello"); + } + + @Test + public void testConvertVariantValueFromPrimitiveNumber() { + Variant variant = variantConverter().convertVariantValue(123); + assertThat(variant).isNotNull(); + assertThat(variant.metadata().dictionarySize()).isEqualTo(0); + assertThat(variant.value().type()).isEqualTo(PhysicalType.INT32); + assertThat(variant.value().asPrimitive().get()).isEqualTo(123); + } + + @Test + public void testConvertVariantValueFromBoolean() { + Variant variant = variantConverter().convertVariantValue(true); + assertThat(variant).isNotNull(); + assertThat(variant.value().type()).isEqualTo(PhysicalType.BOOLEAN_TRUE); + assertThat(variant.value().asPrimitive().get()).isEqualTo(true); + } + + @Test + public void testConvertVariantValueFromInstant() { + Instant instant = Instant.parse("2025-04-04T12:34:56.789Z"); + Variant variant = variantConverter().convertVariantValue(instant); + assertThat(variant).isNotNull(); + assertThat(variant.metadata().dictionarySize()).isEqualTo(0); + assertThat(variant.value().type()).isEqualTo(PhysicalType.TIMESTAMPTZ); + assertThat(variant.value().asPrimitive().get()) + .isEqualTo(DateTimeUtil.microsFromInstant(instant)); + } + + @Test + public void testConvertVariantValueFromOffsetDateTime() { + OffsetDateTime odt = OffsetDateTime.parse("2025-04-04T12:34:56.789+09:00"); + Variant variant = variantConverter().convertVariantValue(odt); + assertThat(variant).isNotNull(); + assertThat(variant.metadata().dictionarySize()).isEqualTo(0); + assertThat(variant.value().type()).isEqualTo(PhysicalType.TIMESTAMPTZ); + assertThat(variant.value().asPrimitive().get()) + .isEqualTo(DateTimeUtil.microsFromTimestamptz(odt)); + } + + @Test + public void testConvertVariantValueFromZonedDateTime() { + ZonedDateTime zdt = ZonedDateTime.parse("2025-04-04T12:34:56.789-05:00[America/New_York]"); + Variant variant = variantConverter().convertVariantValue(zdt); + assertThat(variant).isNotNull(); + assertThat(variant.metadata().dictionarySize()).isEqualTo(0); + assertThat(variant.value().type()).isEqualTo(PhysicalType.TIMESTAMPTZ); + assertThat(variant.value().asPrimitive().get()) + .isEqualTo(DateTimeUtil.microsFromTimestamptz(zdt.toOffsetDateTime())); + } + + @Test + public void testConvertVariantValueFromLocalDateTime() { + LocalDateTime ldt = LocalDateTime.parse("2025-04-04T12:34:56.789"); + Variant variant = variantConverter().convertVariantValue(ldt); + assertThat(variant).isNotNull(); + assertThat(variant.metadata().dictionarySize()).isEqualTo(0); + assertThat(variant.value().type()).isEqualTo(PhysicalType.TIMESTAMPNTZ); + assertThat(variant.value().asPrimitive().get()) + .isEqualTo(DateTimeUtil.microsFromTimestamp(ldt)); + } + + @Test + public void testConvertVariantValueFromLocalDate() { + LocalDate date = LocalDate.of(2025, 4, 4); + Variant variant = variantConverter().convertVariantValue(date); + assertThat(variant).isNotNull(); + assertThat(variant.metadata().dictionarySize()).isEqualTo(0); + assertThat(variant.value().type()).isEqualTo(PhysicalType.DATE); + assertThat(variant.value().asPrimitive().get()).isEqualTo(DateTimeUtil.daysFromDate(date)); + } + + @Test + public void testConvertVariantValueFromLocalTime() { + LocalTime time = LocalTime.of(12, 34, 56, 789_000_000); + Variant variant = variantConverter().convertVariantValue(time); + assertThat(variant).isNotNull(); + assertThat(variant.metadata().dictionarySize()).isEqualTo(0); + assertThat(variant.value().type()).isEqualTo(PhysicalType.TIME); + assertThat(variant.value().asPrimitive().get()).isEqualTo(DateTimeUtil.microsFromTime(time)); + } + + @Test + public void testConvertVariantValueFromList() { + // array with heterogeneous element types (string, int, boolean, double, null, nested array/map, + // java.time primitives). Note: java.util.Date is not supported without Connect logical schema. + Instant instant = Instant.parse("2025-04-04T12:34:56.789Z"); + OffsetDateTime offsetTs = OffsetDateTime.parse("2025-04-04T12:34:56.789+09:00"); + ZonedDateTime zonedTs = ZonedDateTime.parse("2025-04-04T12:34:56.789-05:00[America/New_York]"); + LocalDateTime localTs = LocalDateTime.parse("2025-04-04T12:34:56.789"); + LocalDate localDate = LocalDate.of(2025, 4, 4); + LocalTime localTime = LocalTime.of(12, 34, 56, 789_000_000); + + List input = + Lists.newArrayList( + "a", + 1, + true, + 2.5, + null, + ImmutableList.of("a", "b"), + ImmutableMap.of("key1", "value1", "key2", "value2"), + instant, + offsetTs, + zonedTs, + localTs, + localDate, + localTime); + Variant variant = variantConverter().convertVariantValue(input); + + assertThat(variant).isNotNull(); + assertThat(variant.value().type()).isEqualTo(PhysicalType.ARRAY); + assertThat(variant.value().asArray().numElements()).isEqualTo(13); + + assertThat(variant.value().asArray().get(0).type()).isEqualTo(PhysicalType.STRING); + assertThat(variant.value().asArray().get(0).asPrimitive().get()).isEqualTo("a"); + + assertThat(variant.value().asArray().get(1).type()).isEqualTo(PhysicalType.INT32); + assertThat(variant.value().asArray().get(1).asPrimitive().get()).isEqualTo(1); + + assertThat(variant.value().asArray().get(2).type()).isEqualTo(PhysicalType.BOOLEAN_TRUE); + assertThat(variant.value().asArray().get(2).asPrimitive().get()).isEqualTo(true); + + assertThat(variant.value().asArray().get(3).type()).isEqualTo(PhysicalType.DOUBLE); + assertThat(variant.value().asArray().get(3).asPrimitive().get()).isEqualTo(2.5); + + assertThat(variant.value().asArray().get(4).type()).isEqualTo(PhysicalType.NULL); + + assertThat(variant.value().asArray().get(5).type()).isEqualTo(PhysicalType.ARRAY); + assertThat(variant.value().asArray().get(5).asArray().numElements()).isEqualTo(2); + assertThat(variant.value().asArray().get(5).asArray().get(0).asPrimitive().get()) + .isEqualTo("a"); + assertThat(variant.value().asArray().get(5).asArray().get(1).asPrimitive().get()) + .isEqualTo("b"); + + assertThat(variant.value().asArray().get(6).type()).isEqualTo(PhysicalType.OBJECT); + assertThat(variant.value().asArray().get(6).asObject().numFields()).isEqualTo(2); + assertThat(variant.value().asArray().get(6).asObject().get("key1").asPrimitive().get()) + .isEqualTo("value1"); + assertThat(variant.value().asArray().get(6).asObject().get("key2").asPrimitive().get()) + .isEqualTo("value2"); + + assertThat(variant.value().asArray().get(7).type()).isEqualTo(PhysicalType.TIMESTAMPTZ); + assertThat(variant.value().asArray().get(7).asPrimitive().get()) + .isEqualTo(DateTimeUtil.microsFromInstant(instant)); + + assertThat(variant.value().asArray().get(8).type()).isEqualTo(PhysicalType.TIMESTAMPTZ); + assertThat(variant.value().asArray().get(8).asPrimitive().get()) + .isEqualTo(DateTimeUtil.microsFromTimestamptz(offsetTs)); + + assertThat(variant.value().asArray().get(9).type()).isEqualTo(PhysicalType.TIMESTAMPTZ); + assertThat(variant.value().asArray().get(9).asPrimitive().get()) + .isEqualTo(DateTimeUtil.microsFromTimestamptz(zonedTs.toOffsetDateTime())); + + assertThat(variant.value().asArray().get(10).type()).isEqualTo(PhysicalType.TIMESTAMPNTZ); + assertThat(variant.value().asArray().get(10).asPrimitive().get()) + .isEqualTo(DateTimeUtil.microsFromTimestamp(localTs)); + + assertThat(variant.value().asArray().get(11).type()).isEqualTo(PhysicalType.DATE); + assertThat(variant.value().asArray().get(11).asPrimitive().get()) + .isEqualTo(DateTimeUtil.daysFromDate(localDate)); + + assertThat(variant.value().asArray().get(12).type()).isEqualTo(PhysicalType.TIME); + assertThat(variant.value().asArray().get(12).asPrimitive().get()) + .isEqualTo(DateTimeUtil.microsFromTime(localTime)); + } + + @Test + public void testConvertVariantValueFromMap() { + // heterogeneous top-level values, nested map, java.time primitives; + // metadata shares one sorted dictionary for the whole tree + Instant instant = Instant.parse("2025-04-04T12:34:56.789Z"); + OffsetDateTime offsetTs = OffsetDateTime.parse("2025-04-04T12:34:56.789+09:00"); + ZonedDateTime zonedTs = ZonedDateTime.parse("2025-04-04T12:34:56.789-05:00[America/New_York]"); + LocalDateTime localTs = LocalDateTime.parse("2025-04-04T12:34:56.789"); + LocalDate localDate = LocalDate.of(2025, 4, 4); + LocalTime localTime = LocalTime.of(12, 34, 56, 789_000_000); + + Map input = Maps.newLinkedHashMap(); + input.put("s", "text"); + input.put("i", 1); + input.put("bool", true); + input.put("d", 2.5); + input.put("n", null); + input.put("hello", ImmutableMap.of("world", 1)); + input.put("tags", ImmutableList.of("a", "b")); + input.put("instant", instant); + input.put("odt", offsetTs); + input.put("zdt", zonedTs); + input.put("ldt", localTs); + input.put("ldate", localDate); + input.put("ltime", localTime); + + Variant variant = variantConverter().convertVariantValue(input); + + assertThat(variant).isNotNull(); + assertThat(variant.metadata().dictionarySize()).isEqualTo(14); + assertThat(variant.metadata().get(0)).isEqualTo("bool"); + assertThat(variant.metadata().get(1)).isEqualTo("d"); + assertThat(variant.metadata().get(2)).isEqualTo("hello"); + assertThat(variant.metadata().get(3)).isEqualTo("i"); + assertThat(variant.metadata().get(4)).isEqualTo("instant"); + assertThat(variant.metadata().get(5)).isEqualTo("ldate"); + assertThat(variant.metadata().get(6)).isEqualTo("ldt"); + assertThat(variant.metadata().get(7)).isEqualTo("ltime"); + assertThat(variant.metadata().get(8)).isEqualTo("n"); + assertThat(variant.metadata().get(9)).isEqualTo("odt"); + assertThat(variant.metadata().get(10)).isEqualTo("s"); + assertThat(variant.metadata().get(11)).isEqualTo("tags"); + assertThat(variant.metadata().get(12)).isEqualTo("world"); + assertThat(variant.metadata().get(13)).isEqualTo("zdt"); + + assertThat(variant.value().type()).isEqualTo(PhysicalType.OBJECT); + assertThat(variant.value().asObject().numFields()).isEqualTo(13); + + assertThat(variant.value().asObject().get("bool").type()).isEqualTo(PhysicalType.BOOLEAN_TRUE); + assertThat(variant.value().asObject().get("bool").asPrimitive().get()).isEqualTo(true); + + assertThat(variant.value().asObject().get("d").type()).isEqualTo(PhysicalType.DOUBLE); + assertThat(variant.value().asObject().get("d").asPrimitive().get()).isEqualTo(2.5); + + assertThat(variant.value().asObject().get("i").type()).isEqualTo(PhysicalType.INT32); + assertThat(variant.value().asObject().get("i").asPrimitive().get()).isEqualTo(1); + + assertThat(variant.value().asObject().get("n").type()).isEqualTo(PhysicalType.NULL); + + assertThat(variant.value().asObject().get("s").type()).isEqualTo(PhysicalType.STRING); + assertThat(variant.value().asObject().get("s").asPrimitive().get()).isEqualTo("text"); + + VariantValue tags = variant.value().asObject().get("tags"); + assertThat(tags.type()).isEqualTo(PhysicalType.ARRAY); + assertThat(tags.asArray().numElements()).isEqualTo(2); + assertThat(tags.asArray().get(0).asPrimitive().get()).isEqualTo("a"); + assertThat(tags.asArray().get(1).asPrimitive().get()).isEqualTo("b"); + + assertThat(variant.value().asObject().get("instant").type()) + .isEqualTo(PhysicalType.TIMESTAMPTZ); + assertThat(variant.value().asObject().get("instant").asPrimitive().get()) + .isEqualTo(DateTimeUtil.microsFromInstant(instant)); + + assertThat(variant.value().asObject().get("odt").type()).isEqualTo(PhysicalType.TIMESTAMPTZ); + assertThat(variant.value().asObject().get("odt").asPrimitive().get()) + .isEqualTo(DateTimeUtil.microsFromTimestamptz(offsetTs)); + + assertThat(variant.value().asObject().get("zdt").type()).isEqualTo(PhysicalType.TIMESTAMPTZ); + assertThat(variant.value().asObject().get("zdt").asPrimitive().get()) + .isEqualTo(DateTimeUtil.microsFromTimestamptz(zonedTs.toOffsetDateTime())); + + assertThat(variant.value().asObject().get("ldt").type()).isEqualTo(PhysicalType.TIMESTAMPNTZ); + assertThat(variant.value().asObject().get("ldt").asPrimitive().get()) + .isEqualTo(DateTimeUtil.microsFromTimestamp(localTs)); + + assertThat(variant.value().asObject().get("ldate").type()).isEqualTo(PhysicalType.DATE); + assertThat(variant.value().asObject().get("ldate").asPrimitive().get()) + .isEqualTo(DateTimeUtil.daysFromDate(localDate)); + + assertThat(variant.value().asObject().get("ltime").type()).isEqualTo(PhysicalType.TIME); + assertThat(variant.value().asObject().get("ltime").asPrimitive().get()) + .isEqualTo(DateTimeUtil.microsFromTime(localTime)); + + VariantValue nested = variant.value().asObject().get("hello"); + assertThat(nested.type()).isEqualTo(PhysicalType.OBJECT); + assertThat(nested.asObject().get("world").asPrimitive().get()).isEqualTo(1); + } + + @Test + public void testConvertVariantValueFromStruct() { + // Nested Connect struct: primitives, array, and Timestamp / Time / Date (java.util.Date + + // logical types) + // 2025-04-04 12:34:56.789 UTC (aligned with java.time variant tests) + long tsMillis = 1743770096789L; + long timeMillis = 45296789L; + long dateMillis = 20182L * 86_400_000; + + Schema innerSchema = + SchemaBuilder.struct() + .field("i", Schema.INT32_SCHEMA) + .field("str", Schema.STRING_SCHEMA) + .field("tags", SchemaBuilder.array(Schema.STRING_SCHEMA).build()) + .field("ts", Timestamp.SCHEMA) + .field("t", Time.SCHEMA) + .field("d", org.apache.kafka.connect.data.Date.SCHEMA) + .build(); + Schema outerSchema = + SchemaBuilder.struct().field("inner", innerSchema).field("id", Schema.INT64_SCHEMA).build(); + Struct inner = + new Struct(innerSchema) + .put("i", 1) + .put("str", "world") + .put("tags", ImmutableList.of("a", "b")) + .put("ts", new Date(tsMillis)) + .put("t", new Date(timeMillis)) + .put("d", new Date(dateMillis)); + Struct outer = new Struct(outerSchema).put("inner", inner).put("id", 100L); + + Variant variant = variantConverter().convertVariantValue(outer); + + assertThat(variant).isNotNull(); + assertThat(variant.metadata().dictionarySize()).isEqualTo(8); + assertThat(variant.metadata().get(0)).isEqualTo("d"); + assertThat(variant.metadata().get(1)).isEqualTo("i"); + assertThat(variant.metadata().get(2)).isEqualTo("id"); + assertThat(variant.metadata().get(3)).isEqualTo("inner"); + assertThat(variant.metadata().get(4)).isEqualTo("str"); + assertThat(variant.metadata().get(5)).isEqualTo("t"); + assertThat(variant.metadata().get(6)).isEqualTo("tags"); + assertThat(variant.metadata().get(7)).isEqualTo("ts"); + + assertThat(variant.value().type()).isEqualTo(PhysicalType.OBJECT); + assertThat(variant.value().asObject().get("id").asPrimitive().get()).isEqualTo(100L); + + VariantValue innerVal = variant.value().asObject().get("inner"); + assertThat(innerVal.type()).isEqualTo(PhysicalType.OBJECT); + assertThat(innerVal.asObject().get("i").asPrimitive().get()).isEqualTo(1); + assertThat(innerVal.asObject().get("str").asPrimitive().get()).isEqualTo("world"); + assertThat(innerVal.asObject().get("tags").type()).isEqualTo(PhysicalType.ARRAY); + assertThat(innerVal.asObject().get("tags").asArray().numElements()).isEqualTo(2); + assertThat(innerVal.asObject().get("tags").asArray().get(0).asPrimitive().get()).isEqualTo("a"); + assertThat(innerVal.asObject().get("tags").asArray().get(1).asPrimitive().get()).isEqualTo("b"); + + assertThat(innerVal.asObject().get("ts").type()).isEqualTo(PhysicalType.TIMESTAMPTZ); + assertThat(innerVal.asObject().get("ts").asPrimitive().get()).isEqualTo(tsMillis * 1000); + + assertThat(innerVal.asObject().get("t").type()).isEqualTo(PhysicalType.TIME); + assertThat(innerVal.asObject().get("t").asPrimitive().get()).isEqualTo(timeMillis * 1000); + + assertThat(innerVal.asObject().get("d").type()).isEqualTo(PhysicalType.DATE); + assertThat(innerVal.asObject().get("d").asPrimitive().get()).isEqualTo(20182); + } + public static Map createMapData() { return ImmutableMap.builder() .put("i", 1)