Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
033f47f
feat: Implement support for VARIANT type in RecordConverter with conv…
seokyun-ha-toss Feb 10, 2026
a53af92
test: implement unit test for RecordConverter for Variant
seokyun-ha-toss Feb 10, 2026
73ea656
chore: lint spotlessApply
seokyun-ha-toss Feb 10, 2026
2b3fcb1
fix: there's no already Variant value, and use `Lists.of()`
seokyun-ha-toss Feb 11, 2026
47f6fe5
fix: handle `BigDecimal` type in `numberToVariantValue()`
seokyun-ha-toss Mar 12, 2026
e8f57f1
fix: explicit error when unknown numeric type
seokyun-ha-toss Mar 12, 2026
b57e0d8
refactor: optimize field name collection in RecordConverter to use a …
seokyun-ha-toss Mar 12, 2026
0117174
fix: ensure keys in map are non-null strings before processing in Rec…
seokyun-ha-toss Mar 12, 2026
be09349
refactor: loop entrySet once
seokyun-ha-toss Mar 12, 2026
75ec7ce
test: enhance unit tests for Variant conversion with additional cases…
seokyun-ha-toss Mar 13, 2026
c6e558a
lint: apply gh actions auto-review and lints, split Cyclomatic Comple…
seokyun-ha-toss Mar 13, 2026
787663d
lint: spotlessApply
seokyun-ha-toss Mar 13, 2026
75dc8a2
feat: support `kafka Struct` type, and align if checking ordering
seokyun-ha-toss Mar 17, 2026
d9c8721
test: implt unittest for Variant for Struct
seokyun-ha-toss Mar 17, 2026
e517884
lint: gradlew spotlessApply
seokyun-ha-toss Mar 17, 2026
8bb8740
feat: add support for BigInteger conversion to BigDecimal in RecordCo…
seokyun-ha-toss Mar 27, 2026
aa2bcce
feat: support for Date conversion to Variants
seokyun-ha-toss Mar 27, 2026
5be9fa4
feat: support additional Date, Datetime, time types
seokyun-ha-toss Mar 27, 2026
179b07a
lint: spotlessApply
seokyun-ha-toss Mar 27, 2026
c1d0be7
Fix java.util.Date variant conversion losing precision for Timestamp …
brandonstanleyappfolio Apr 1, 2026
465fa92
Merge pull request #2 from brandonstanleyappfolio/variant-date-fix
seokyun-ha-toss Apr 3, 2026
06b7afe
fix: explict throw exception when non-string key is used
seokyun-ha-toss Apr 3, 2026
a4c12b8
lint: spotlessApply
seokyun-ha-toss Apr 3, 2026
568f7b1
test: remove duplicated
seokyun-ha-toss Apr 3, 2026
10bff3d
test: integrate test convert variant value from list, map, struct
seokyun-ha-toss Apr 4, 2026
b8772ba
test: left only one unittest on `VariantValueFromStructWith` Date family
seokyun-ha-toss Apr 4, 2026
a99262e
test: add unittest for variant with timestamp family
seokyun-ha-toss Apr 4, 2026
ca47edf
Merge branch 'apache:main' into support-variant-for-sink-connector
seokyun-ha-toss Apr 4, 2026
7530597
fix: [JavaUtilDate] Date has a bad API that leads to bugs
seokyun-ha-toss Apr 4, 2026
a312eb0
fix: resolve Cyclomatic Complexity
seokyun-ha-toss Apr 4, 2026
818a515
fix: throw invalid key type
seokyun-ha-toss Apr 9, 2026
b1f379e
lint: spotlessApply
seokyun-ha-toss Apr 9, 2026
1b849b7
fix: remove key null checking & use JLS pattern variable
seokyun-ha-toss Apr 21, 2026
2412a6b
fix: remove Variant.from() ByteBuffer type, and pass-throught already…
seokyun-ha-toss Apr 21, 2026
f55df63
fix: change collect field names recursively from top-down to bottom-up
seokyun-ha-toss Apr 22, 2026
546dfca
fix: resolve pattern-matching instanceof
seokyun-ha-toss Apr 22, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.time.format.DateTimeParseException;
import java.time.temporal.Temporal;
import java.util.Base64;
import java.util.Collection;
import java.util.Date;
import java.util.List;
import java.util.Locale;
Expand All @@ -52,6 +53,7 @@
import org.apache.iceberg.mapping.NameMapping;
import org.apache.iceberg.mapping.NameMappingParser;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Type.PrimitiveType;
Expand All @@ -64,6 +66,12 @@
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.DateTimeUtil;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.iceberg.variants.ShreddedObject;
import org.apache.iceberg.variants.ValueArray;
import org.apache.iceberg.variants.Variant;
import org.apache.iceberg.variants.VariantMetadata;
import org.apache.iceberg.variants.VariantValue;
import org.apache.iceberg.variants.Variants;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;

Expand Down Expand Up @@ -142,6 +150,8 @@ private Object convertValue(
return convertTimeValue(value);
case TIMESTAMP:
return convertTimestampValue(value, (TimestampType) type);
case VARIANT:
return convertVariantValue(value);
}
throw new UnsupportedOperationException("Unsupported type: " + type.typeId());
}
Expand Down Expand Up @@ -464,6 +474,119 @@ protected Temporal convertTimestampValue(Object value, TimestampType type) {
return convertLocalDateTime(value);
}

protected Variant convertVariantValue(Object value) {
Comment thread
seokyun-ha-toss marked this conversation as resolved.
Comment thread
seokyun-ha-toss marked this conversation as resolved.
if (value instanceof Variant) {
Comment thread
seokyun-ha-toss marked this conversation as resolved.
Outdated
return (Variant) value;
} else if (value instanceof ByteBuffer) {
Comment thread
danielcweeks marked this conversation as resolved.
Outdated
return Variant.from((ByteBuffer) value);
Comment thread
seokyun-ha-toss marked this conversation as resolved.
Outdated
}

List<String> allFieldNames =
collectFieldNames(value).stream().distinct().sorted().collect(Collectors.toList());
VariantMetadata metadata =
allFieldNames.isEmpty() ? Variants.emptyMetadata() : Variants.metadata(allFieldNames);
VariantValue variantValue = objectToVariantValue(value, metadata);
return Variant.of(metadata, variantValue);
}

/**
* Collects all field names (map keys) from the entire object tree. Used to build a single
* VariantMetadata for the whole Variant (required for nested maps).
*/
private static List<String> collectFieldNames(Object value) {
Comment thread
seokyun-ha-toss marked this conversation as resolved.
Outdated
if (value == null) {
return Lists.newArrayList();
Comment thread
seokyun-ha-toss marked this conversation as resolved.
Outdated
}
if (value instanceof Map) {
Map<?, ?> map = (Map<?, ?>) value;
List<String> names = Lists.newArrayList();
map.keySet().stream().map(Object::toString).forEach(names::add);
Comment thread
seokyun-ha-toss marked this conversation as resolved.
Outdated
Comment thread
seokyun-ha-toss marked this conversation as resolved.
Outdated
for (Object v : map.values()) {
names.addAll(collectFieldNames(v));
}
return names;
}
if (value instanceof Collection) {
List<String> names = Lists.newArrayList();
for (Object element : (Collection<?>) value) {
names.addAll(collectFieldNames(element));
}
return names;
}
return Lists.newArrayList();
}

/**
* Recursively converts a Java object to a VariantValue using the given shared metadata for all
* nested maps. Handles primitives, List (array), and Map (object); map keys become field names.
*/
private static VariantValue objectToVariantValue(Object value, VariantMetadata metadata) {
if (value == null) {
return Variants.ofNull();
}
if (value instanceof Boolean) {
return Variants.of((Boolean) value);
}
if (value instanceof Number) {
return numberToVariantValue((Number) value);
}
if (value instanceof String) {
return Variants.of((String) value);
}
if (value instanceof ByteBuffer) {
return Variants.of((ByteBuffer) value);
}
if (value instanceof byte[]) {
return Variants.of(ByteBuffer.wrap((byte[]) value));
}
if (value instanceof BigDecimal) {
Comment thread
seokyun-ha-toss marked this conversation as resolved.
Outdated
return Variants.of((BigDecimal) value);
}
if (value instanceof UUID) {
return Variants.ofUUID((UUID) value);
}
if (value instanceof Collection) {
ValueArray array = Variants.array();
for (Object element : (Collection<?>) value) {
array.add(objectToVariantValue(element, metadata));
}
return array;
}
if (value instanceof Map) {
Map<?, ?> map = (Map<?, ?>) value;
ShreddedObject object = Variants.object(metadata);
map.forEach((k, v) -> object.put(k.toString(), objectToVariantValue(v, metadata)));
return object;
}
throw new IllegalArgumentException("Cannot convert to variant: " + value.getClass().getName());
}

private static VariantValue numberToVariantValue(Number value) {
if (value instanceof Integer) {
return Variants.of((Integer) value);
}
if (value instanceof Long) {
return Variants.of((Long) value);
}
if (value instanceof Float) {
return Variants.of((Float) value);
}
if (value instanceof Double) {
return Variants.of((Double) value);
}
if (value instanceof Byte) {
return Variants.of((Byte) value);
}
if (value instanceof Short) {
return Variants.of((Short) value);
}
Comment thread
seokyun-ha-toss marked this conversation as resolved.
Comment thread
seokyun-ha-toss marked this conversation as resolved.
Number num = value;
if (num.doubleValue() == num.longValue()) {
return Variants.of(num.longValue());
}
return Variants.of(num.doubleValue());
Comment thread
seokyun-ha-toss marked this conversation as resolved.
Outdated
}

@SuppressWarnings("JavaUtilDate")
private OffsetDateTime convertOffsetDateTime(Object value) {
if (value instanceof Number) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,10 @@
import org.apache.iceberg.types.Types.TimeType;
import org.apache.iceberg.types.Types.TimestampType;
import org.apache.iceberg.types.Types.UUIDType;
import org.apache.iceberg.types.Types.VariantType;
import org.apache.iceberg.util.UUIDUtil;
import org.apache.iceberg.variants.PhysicalType;
import org.apache.iceberg.variants.Variant;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
Expand Down Expand Up @@ -152,6 +155,9 @@ public class TestRecordConverter {
NestedField.required(
100, "stma", MapType.ofRequired(101, 102, StringType.get(), ID_SCHEMA.asStruct())));

private static final org.apache.iceberg.Schema VARIANT_SCHEMA =
new org.apache.iceberg.Schema(NestedField.required(1, "v", VariantType.get()));

private static final Schema CONNECT_SCHEMA =
SchemaBuilder.struct()
.field("i", Schema.INT32_SCHEMA)
Expand Down Expand Up @@ -881,6 +887,102 @@ public void testEvolveTypeDetectionStructNested() {
assertThat(updateMap.get("st.ff").type()).isInstanceOf(DoubleType.class);
}

@Test
Comment thread
seokyun-ha-toss marked this conversation as resolved.
Outdated
public void testConvertVariantFromPrimitiveString() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(VARIANT_SCHEMA);
RecordConverter converter = new RecordConverter(table, config);

Record record = converter.convert(ImmutableMap.of("v", "hello"));
Variant variant = (Variant) record.getField("v");

assertThat(variant).isNotNull();
assertThat(variant.metadata()).isNotNull();
assertThat(variant.metadata().dictionarySize()).isEqualTo(0);
assertThat(variant.value().type()).isEqualTo(PhysicalType.STRING);
assertThat(variant.value().asPrimitive().get()).isEqualTo("hello");
}

@Test
public void testConvertVariantFromPrimitiveNumber() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(VARIANT_SCHEMA);
RecordConverter converter = new RecordConverter(table, config);

Record record = converter.convert(ImmutableMap.of("v", 123));
Variant variant = (Variant) record.getField("v");

assertThat(variant).isNotNull();
assertThat(variant.metadata()).isNotNull();
assertThat(variant.metadata().dictionarySize()).isEqualTo(0);
assertThat(variant.value().type()).isEqualTo(PhysicalType.INT32);
assertThat(variant.value().asPrimitive().get()).isEqualTo(123);
}

@Test
public void testConvertVariantFromList() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(VARIANT_SCHEMA);
RecordConverter converter = new RecordConverter(table, config);

Record record = converter.convert(ImmutableMap.of("v", ImmutableList.of("hello", 1)));
Variant variant = (Variant) record.getField("v");

assertThat(variant).isNotNull();
assertThat(variant.metadata()).isNotNull();
assertThat(variant.metadata().dictionarySize()).isEqualTo(0);
assertThat(variant.value().type()).isEqualTo(PhysicalType.ARRAY);
assertThat(variant.value().asArray().numElements()).isEqualTo(2);
assertThat(variant.value().asArray().get(0).type()).isEqualTo(PhysicalType.STRING);
assertThat(variant.value().asArray().get(0).asPrimitive().get()).isEqualTo("hello");
assertThat(variant.value().asArray().get(1).type()).isEqualTo(PhysicalType.INT32);
assertThat(variant.value().asArray().get(1).asPrimitive().get()).isEqualTo(1);
}

@Test
public void testConvertVariantFromMap() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(VARIANT_SCHEMA);
RecordConverter converter = new RecordConverter(table, config);

Record record = converter.convert(ImmutableMap.of("v", ImmutableMap.of("hello", 1)));
Variant variant = (Variant) record.getField("v");

assertThat(variant).isNotNull();
assertThat(variant.metadata()).isNotNull();
assertThat(variant.metadata().dictionarySize()).isEqualTo(1);
assertThat(variant.metadata().get(0)).isEqualTo("hello");
assertThat(variant.value().type()).isEqualTo(PhysicalType.OBJECT);
assertThat(variant.value().asObject().numFields()).isEqualTo(1);
assertThat(variant.value().asObject().get("hello").type()).isEqualTo(PhysicalType.INT32);
assertThat(variant.value().asObject().get("hello").asPrimitive().get()).isEqualTo(1);
}

@Test
public void testConvertVariantFromMapNested() {
Table table = mock(Table.class);
when(table.schema()).thenReturn(VARIANT_SCHEMA);
RecordConverter converter = new RecordConverter(table, config);

Record record =
converter.convert(
ImmutableMap.of("v", ImmutableMap.of("hello", ImmutableMap.of("world", 1))));
Variant variant = (Variant) record.getField("v");

assertThat(variant).isNotNull();
assertThat(variant.value().type()).isEqualTo(PhysicalType.OBJECT);
assertThat(variant.metadata()).isNotNull();
assertThat(variant.metadata().dictionarySize()).isEqualTo(2);
assertThat(variant.metadata().get(0)).isEqualTo("hello");
assertThat(variant.value().asObject().numFields()).isEqualTo(1);
assertThat(variant.value().asObject().get("hello").type()).isEqualTo(PhysicalType.OBJECT);
assertThat(variant.value().asObject().get("hello").asObject().numFields()).isEqualTo(1);
assertThat(variant.value().asObject().get("hello").asObject().get("world").type())
.isEqualTo(PhysicalType.INT32);
assertThat(variant.value().asObject().get("hello").asObject().get("world").asPrimitive().get())
.isEqualTo(1);
}

public static Map<String, Object> createMapData() {
return ImmutableMap.<String, Object>builder()
.put("i", 1)
Expand Down