-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Kafka Connect: Support VARIANT when record convert #15283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 30 commits
033f47f
a53af92
73ea656
2b3fcb1
47f6fe5
e8f57f1
b57e0d8
0117174
be09349
75ec7ce
c6e558a
787663d
75dc8a2
d9c8721
e517884
8bb8740
aa2bcce
5be9fa4
179b07a
c1d0be7
465fa92
06b7afe
a4c12b8
568f7b1
10bff3d
b8772ba
a99262e
ca47edf
7530597
a312eb0
818a515
b1f379e
1b849b7
2412a6b
f55df63
546dfca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -22,23 +22,28 @@ | |
| import java.io.IOException; | ||
| import java.io.UncheckedIOException; | ||
| import java.math.BigDecimal; | ||
| import java.math.BigInteger; | ||
| import java.math.RoundingMode; | ||
| import java.nio.ByteBuffer; | ||
| import java.nio.charset.StandardCharsets; | ||
| import java.time.Instant; | ||
| import java.time.LocalDate; | ||
| import java.time.LocalDateTime; | ||
| import java.time.LocalTime; | ||
| import java.time.OffsetDateTime; | ||
| import java.time.ZoneOffset; | ||
| import java.time.ZonedDateTime; | ||
| import java.time.format.DateTimeFormatter; | ||
| import java.time.format.DateTimeFormatterBuilder; | ||
| import java.time.format.DateTimeParseException; | ||
| import java.time.temporal.Temporal; | ||
| import java.util.Base64; | ||
| import java.util.Collection; | ||
| import java.util.Date; | ||
| import java.util.List; | ||
| import java.util.Locale; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.UUID; | ||
| import java.util.stream.Collectors; | ||
| import org.apache.iceberg.FileFormat; | ||
|
|
@@ -53,6 +58,7 @@ | |
| import org.apache.iceberg.mapping.NameMappingParser; | ||
| import org.apache.iceberg.relocated.com.google.common.base.Preconditions; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Maps; | ||
| import org.apache.iceberg.relocated.com.google.common.collect.Sets; | ||
| import org.apache.iceberg.types.Type; | ||
| import org.apache.iceberg.types.Type.PrimitiveType; | ||
| import org.apache.iceberg.types.Types.DecimalType; | ||
|
|
@@ -64,6 +70,13 @@ | |
| import org.apache.iceberg.util.ByteBuffers; | ||
| import org.apache.iceberg.util.DateTimeUtil; | ||
| import org.apache.iceberg.util.UUIDUtil; | ||
| import org.apache.iceberg.variants.ShreddedObject; | ||
| import org.apache.iceberg.variants.ValueArray; | ||
| import org.apache.iceberg.variants.Variant; | ||
| import org.apache.iceberg.variants.VariantMetadata; | ||
| import org.apache.iceberg.variants.VariantValue; | ||
| import org.apache.iceberg.variants.Variants; | ||
| import org.apache.kafka.connect.data.Field; | ||
| import org.apache.kafka.connect.data.Struct; | ||
| import org.apache.kafka.connect.errors.ConnectException; | ||
|
|
||
|
|
@@ -142,6 +155,8 @@ private Object convertValue( | |
| return convertTimeValue(value); | ||
| case TIMESTAMP: | ||
| return convertTimestampValue(value, (TimestampType) type); | ||
| case VARIANT: | ||
| return convertVariantValue(value); | ||
| } | ||
| throw new UnsupportedOperationException("Unsupported type: " + type.typeId()); | ||
| } | ||
|
|
@@ -464,6 +479,218 @@ protected Temporal convertTimestampValue(Object value, TimestampType type) { | |
| return convertLocalDateTime(value); | ||
| } | ||
|
|
||
| protected Variant convertVariantValue(Object value) { | ||
|
seokyun-ha-toss marked this conversation as resolved.
|
||
| if (value instanceof ByteBuffer) { | ||
| return Variant.from((ByteBuffer) value); | ||
|
seokyun-ha-toss marked this conversation as resolved.
Outdated
|
||
| } | ||
|
|
||
| Set<String> fieldNames = Sets.newHashSet(); | ||
| collectFieldNames(value, fieldNames); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not have
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is mainly a design choice. Passing a I’m not good at Java internals, but my intuition is that repeated set merging could be more expensive than simply adding elements into a single top-level set. That said, I’m open to changing the approach—would appreciate your thoughts.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not typical convention. It's better practice to not modify parameters, so I would push this into the method and return the list of field names. Also this can be combined with the collect/sort below.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I agree with you. I’ve updated the implementation to return a Set from For now, I’m using a Set to naturally handle deduplication, and applying sorting separately. |
||
| List<String> allFieldNames = fieldNames.stream().sorted().collect(Collectors.toList()); | ||
| VariantMetadata metadata = Variants.metadata(allFieldNames); | ||
| VariantValue variantValue = objectToVariantValue(value, metadata, null); | ||
| return Variant.of(metadata, variantValue); | ||
| } | ||
|
|
||
| /** | ||
| * Collects all field names (map keys) from the entire object tree into the given set. Used to | ||
| * build a single VariantMetadata for the whole Variant (required for nested maps). | ||
| */ | ||
| private static void collectFieldNames(Object value, Set<String> names) { | ||
| if (value == null) { | ||
| return; | ||
| } | ||
| if (value instanceof Collection) { | ||
| for (Object element : (Collection<?>) value) { | ||
| collectFieldNames(element, names); | ||
| } | ||
| return; | ||
| } | ||
| if (value instanceof Map) { | ||
| Map<?, ?> map = (Map<?, ?>) value; | ||
| for (Map.Entry<?, ?> entry : map.entrySet()) { | ||
| Object key = entry.getKey(); | ||
| if (key != null && key instanceof String) { | ||
|
seokyun-ha-toss marked this conversation as resolved.
Outdated
|
||
| names.add((String) key); | ||
| collectFieldNames(entry.getValue(), names); | ||
| } | ||
|
seokyun-ha-toss marked this conversation as resolved.
Outdated
|
||
| } | ||
| return; | ||
| } | ||
| if (value instanceof Struct) { | ||
| Struct struct = (Struct) value; | ||
|
seokyun-ha-toss marked this conversation as resolved.
Outdated
|
||
| for (Field field : struct.schema().fields()) { | ||
| names.add(field.name()); | ||
| collectFieldNames(struct.get(field), names); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Recursively converts a Java object to a VariantValue using the given shared metadata for all | ||
| * nested maps. Handles primitives, List (array), and Map (object); map keys become field names. | ||
| */ | ||
| private static VariantValue objectToVariantValue( | ||
| Object value, VariantMetadata metadata, org.apache.kafka.connect.data.Schema schema) { | ||
| if (value == null) { | ||
| return Variants.ofNull(); | ||
| } | ||
| VariantValue primitive = primitiveToVariantValue(value, schema); | ||
| if (primitive != null) { | ||
| return primitive; | ||
| } | ||
| if (value instanceof Collection) { | ||
| ValueArray array = Variants.array(); | ||
| org.apache.kafka.connect.data.Schema elementSchema = | ||
| schema != null ? schema.valueSchema() : null; | ||
| for (Object element : (Collection<?>) value) { | ||
| array.add(objectToVariantValue(element, metadata, elementSchema)); | ||
| } | ||
| return array; | ||
| } | ||
| if (value instanceof Map) { | ||
| return mapToVariantValue(value, metadata, schema); | ||
| } | ||
| if (value instanceof Struct) { | ||
| Struct struct = (Struct) value; | ||
|
seokyun-ha-toss marked this conversation as resolved.
Outdated
|
||
| ShreddedObject object = Variants.object(metadata); | ||
| for (Field field : struct.schema().fields()) { | ||
| object.put(field.name(), objectToVariantValue(struct.get(field), metadata, field.schema())); | ||
| } | ||
| return object; | ||
| } | ||
| throw new IllegalArgumentException("Cannot convert to variant: " + value.getClass().getName()); | ||
| } | ||
|
|
||
| private static VariantValue mapToVariantValue( | ||
| Object value, VariantMetadata metadata, org.apache.kafka.connect.data.Schema schema) { | ||
| Map<?, ?> map = (Map<?, ?>) value; | ||
| ShreddedObject object = Variants.object(metadata); | ||
| org.apache.kafka.connect.data.Schema mapValueSchema = | ||
| schema != null ? schema.valueSchema() : null; | ||
| map.forEach( | ||
| (key, val) -> { | ||
| if (key != null && key instanceof String) { | ||
|
seokyun-ha-toss marked this conversation as resolved.
Outdated
|
||
| object.put((String) key, objectToVariantValue(val, metadata, mapValueSchema)); | ||
| } else { | ||
| throw new IllegalArgumentException( | ||
| "Cannot convert map to variant: keys must be non-null strings, was: " | ||
| + (key == null ? "null" : key.getClass().getName())); | ||
| } | ||
| }); | ||
| return object; | ||
| } | ||
|
|
||
| /** | ||
| * Converts a primitive or primitive-like value to VariantValue; returns null if not supported. | ||
| * The optional schema is used to disambiguate java.util.Date which Kafka Connect uses for Date, | ||
| * Time, and Timestamp logical types. | ||
| */ | ||
| private static VariantValue primitiveToVariantValue( | ||
| Object value, org.apache.kafka.connect.data.Schema schema) { | ||
| if (value instanceof Boolean) { | ||
| return Variants.of((Boolean) value); | ||
| } | ||
| VariantValue temporal = temporalObjectToVariantValue(value, schema); | ||
| if (temporal != null) { | ||
| return temporal; | ||
| } | ||
| if (value instanceof Number) { | ||
| return numberToVariantValue((Number) value); | ||
| } | ||
| if (value instanceof String) { | ||
| return Variants.of((String) value); | ||
| } | ||
| if (value instanceof ByteBuffer) { | ||
| return Variants.of((ByteBuffer) value); | ||
| } | ||
| if (value instanceof byte[]) { | ||
| return Variants.of(ByteBuffer.wrap((byte[]) value)); | ||
| } | ||
| if (value instanceof UUID) { | ||
| return Variants.ofUUID((UUID) value); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| /** | ||
| * Converts java.time values and java.util.Date (with Connect logical type from the optional | ||
| * schema) to VariantValue; returns null if the value is not a supported temporal representation. | ||
| */ | ||
| private static VariantValue temporalObjectToVariantValue( | ||
| Object value, org.apache.kafka.connect.data.Schema schema) { | ||
| if (value instanceof Instant) { | ||
| return Variants.ofTimestamptz(DateTimeUtil.microsFromInstant((Instant) value)); | ||
| } | ||
| if (value instanceof OffsetDateTime) { | ||
| return Variants.ofTimestamptz(DateTimeUtil.microsFromTimestamptz((OffsetDateTime) value)); | ||
| } | ||
| if (value instanceof ZonedDateTime) { | ||
| return Variants.ofTimestamptz( | ||
| DateTimeUtil.microsFromTimestamptz(((ZonedDateTime) value).toOffsetDateTime())); | ||
| } | ||
| if (value instanceof LocalDateTime) { | ||
| return Variants.ofTimestampntz(DateTimeUtil.microsFromTimestamp((LocalDateTime) value)); | ||
|
seokyun-ha-toss marked this conversation as resolved.
Outdated
|
||
| } | ||
| if (value instanceof LocalDate) { | ||
| return Variants.ofDate(DateTimeUtil.daysFromDate((LocalDate) value)); | ||
| } | ||
| if (value instanceof LocalTime) { | ||
| return Variants.ofTime(DateTimeUtil.microsFromTime((LocalTime) value)); | ||
| } | ||
| if (value instanceof Date) { | ||
| String logicalName = schema != null ? schema.name() : null; | ||
| // Connect represents Timestamp, Time, and Date logical types as java.util.Date at runtime; | ||
| // normalize to Instant once, then interpret using the schema logical type name. | ||
| Instant connectInstant = ((Date) value).toInstant(); | ||
| if (org.apache.kafka.connect.data.Timestamp.LOGICAL_NAME.equals(logicalName)) { | ||
| return Variants.ofTimestamptz(DateTimeUtil.microsFromInstant(connectInstant)); | ||
| } | ||
| if (org.apache.kafka.connect.data.Time.LOGICAL_NAME.equals(logicalName)) { | ||
| LocalTime utcTime = connectInstant.atZone(ZoneOffset.UTC).toLocalTime(); | ||
| return Variants.ofTime(DateTimeUtil.microsFromTime(utcTime)); | ||
| } | ||
| if (org.apache.kafka.connect.data.Date.LOGICAL_NAME.equals(logicalName)) { | ||
| return Variants.ofDate(DateTimeUtil.daysFromInstant(connectInstant)); | ||
| } | ||
| throw new IllegalArgumentException( | ||
| "Cannot convert java.util.Date to variant without a recognized logical type schema" | ||
| + " (expected Timestamp, Time, or Date but got: " | ||
| + logicalName | ||
| + ")"); | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| private static VariantValue numberToVariantValue(Number value) { | ||
| if (value instanceof BigDecimal) { | ||
| return Variants.of((BigDecimal) value); | ||
| } | ||
| if (value instanceof BigInteger) { | ||
| return Variants.of(new BigDecimal((BigInteger) value)); | ||
| } | ||
| if (value instanceof Integer) { | ||
| return Variants.of((Integer) value); | ||
| } | ||
| if (value instanceof Long) { | ||
| return Variants.of((Long) value); | ||
| } | ||
| if (value instanceof Float) { | ||
| return Variants.of((Float) value); | ||
| } | ||
| if (value instanceof Double) { | ||
| return Variants.of((Double) value); | ||
| } | ||
| if (value instanceof Byte) { | ||
| return Variants.of((Byte) value); | ||
| } | ||
| if (value instanceof Short) { | ||
| return Variants.of((Short) value); | ||
| } | ||
|
seokyun-ha-toss marked this conversation as resolved.
seokyun-ha-toss marked this conversation as resolved.
|
||
| throw new IllegalArgumentException( | ||
| "Cannot convert Number to variant (unknown type): " + value.getClass().getName()); | ||
| } | ||
|
|
||
| @SuppressWarnings("JavaUtilDate") | ||
| private OffsetDateTime convertOffsetDateTime(Object value) { | ||
| if (value instanceof Number) { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.