diff --git a/CHANGELOG.md b/CHANGELOG.md index afda21d8..4cad2d47 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,18 @@ -# Unreleased +# 1.3.8 (unreleased) + +## New Features +* Added `auto.evolve` configuration option for automatic table schema evolution. When enabled, the connector detects new fields in incoming records and issues `ALTER TABLE ... ADD COLUMN IF NOT EXISTS` against ClickHouse. Disabled by default. (https://github.com/ClickHouse/clickhouse-kafka-connect/issues/277) + +## Bug Fixes +* Fixed RowBinary serialization for Map columns with Nullable value types. The nullable marker byte was missing when writing map values, causing `CANNOT_READ_ALL_DATA` errors for `Map(K, Nullable(V))` columns. + +## New Features +* Added `auto.evolve` configuration option for automatic table schema evolution. When enabled, the connector detects new fields in incoming records and issues `ALTER TABLE ... ADD COLUMN IF NOT EXISTS` against ClickHouse. Disabled by default. (https://github.com/ClickHouse/clickhouse-kafka-connect/issues/277) ## Dependencies * Updated clickhouse-java version from `0.9.4` to `0.9.5` -# 1.3.7, 2026-03-25 +# 1.3.7, 2026-03-25 ## Security * Upgraded `com.fasterxml.jackson.core` dependencies to version with fix for https://github.com/advisories/GHSA-72hv-8253-57qq (https://github.com/ClickHouse/clickhouse-kafka-connect/pull/690). @@ -11,7 +20,6 @@ # Improvements * `Gson` replaced with `Jackson` for performance and better maintainability (https://github.com/ClickHouse/clickhouse-kafka-connect/pull/676). - # 1.3.6, 2026-03-18 ## New Features diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java index da6fafdb..22b1aff6 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java @@ -57,6 +57,9 @@ public class ClickHouseSinkConfig { public static final String REPORT_INSERTED_OFFSETS = "reportInsertedOffsets"; public static final String ERROR_TOLERANCE_ALL = "all"; public static final String ERROR_TOLERANCE_NONE = "none"; + public static final String AUTO_EVOLVE = "auto.evolve"; + public static final String AUTO_EVOLVE_DDL_REFRESH_RETRIES = "auto.evolve.ddl.refresh.retries"; + public static final String AUTO_EVOLVE_STRUCT_TO_JSON = "auto.evolve.struct.to.json"; public static final String CONNECTOR_RETRY_TIMEOUT = "errors.retry.timeout"; public static final long MINIMAL_RETRY_TIMEOUT_THR_WARN = TimeUnit.SECONDS.toMillis(10); @@ -110,6 +113,9 @@ public class ClickHouseSinkConfig { private final int bufferCount; private final long bufferFlushTime; private final boolean reportInsertedOffsets; + private final boolean autoEvolve; + private final int autoEvolveDdlRefreshRetries; + private final boolean autoEvolveStructToJson; private final boolean binaryFormatWrtiteJsonAsString; private final String sslSocketSni; @@ -296,6 +302,10 @@ public ClickHouseSinkConfig(Map props) { LOGGER.info("Internal buffering enabled: bufferCount={}, bufferFlushTime={}ms", this.bufferCount, this.bufferFlushTime); } + this.autoEvolve = Boolean.parseBoolean(props.getOrDefault(AUTO_EVOLVE, "false")); + this.autoEvolveDdlRefreshRetries = Integer.parseInt(props.getOrDefault(AUTO_EVOLVE_DDL_REFRESH_RETRIES, "3")); + this.autoEvolveStructToJson = Boolean.parseBoolean(props.getOrDefault(AUTO_EVOLVE_STRUCT_TO_JSON, "false")); + String jsonAsString = getClickhouseSettings().get("input_format_binary_read_json_as_string"); this.binaryFormatWrtiteJsonAsString = jsonAsString != null && (jsonAsString.equalsIgnoreCase("true") || jsonAsString.equals("1")); @@ -692,6 +702,40 @@ private static ConfigDef createConfigDef() { ConfigDef.Width.MEDIUM, "SSL Socket SNI" ); + + String ddlGroup = "DDL"; + int ddlOrderInGroup = 0; + configDef.define(AUTO_EVOLVE, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.MEDIUM, + "Whether to automatically add columns to the destination table when a record contains fields not present in the table. default: false", + ddlGroup, + ++ddlOrderInGroup, + ConfigDef.Width.SHORT, + "Auto evolve table schema." + ); + configDef.define(AUTO_EVOLVE_DDL_REFRESH_RETRIES, + ConfigDef.Type.INT, + 3, + ConfigDef.Range.atLeast(0), + ConfigDef.Importance.LOW, + "Number of retries when waiting for DDL changes to propagate after schema evolution. default: 3", + ddlGroup, + ++ddlOrderInGroup, + ConfigDef.Width.SHORT, + "DDL refresh retries" + ); + configDef.define(AUTO_EVOLVE_STRUCT_TO_JSON, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.MEDIUM, + "Whether to map Connect STRUCT fields to ClickHouse JSON columns during schema evolution. default: false", + ddlGroup, + ++ddlOrderInGroup, + ConfigDef.Width.SHORT, + "Map STRUCT to JSON" + ); return configDef; } } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index 9e569240..85c76719 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -33,6 +33,8 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.errors.RetriableException; +import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,8 +50,10 @@ import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoField; +import java.util.ArrayList; import java.util.Date; import java.util.HashMap; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -207,6 +211,26 @@ public void doInsert(List records, QueryIdentifier queryId, ErrorReporte String database = first.getDatabase(); Table table = getTable(database, topic); if (table == null) { return; }//We checked the error flag in getTable, so we don't need to check it again here + + if (csc.isAutoEvolve()) { + // Check the last record's schema for new fields. + // Limitation: if a batch contains multiple schema versions, only the last one is checked. + // A full scan across all records can be implemented later if needed. + Record last = records.get(records.size() - 1); + Map lastFields = new LinkedHashMap<>(); + if (last.getFields() != null) { + for (Field f : last.getFields()) { + lastFields.put(f.name(), f.schema()); + } + } + table = evolveTableSchema(table, lastFields); + } + + doInsertBatch(records, table, queryId); + } + + private void doInsertBatch(List records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException { + Record first = records.get(0); LOGGER.debug("Trying to insert [{}] records to table name [{}] (QueryId: [{}])", records.size(), table.getName(), queryId.getQueryId()); switch (first.getSchemaType()) { case SCHEMA: @@ -234,7 +258,8 @@ protected boolean validateDataSchema(Table table, Record record, boolean onlyFie Type type = col.getType(); boolean isNullable = col.isNullable(); boolean hasDefault = col.hasDefault(); - if (!isNullable && !hasDefault) { + // Variant has a native NULL discriminator (255) so it can accept missing values without Nullable or DEFAULT. + if (!isNullable && !hasDefault && type != Type.VARIANT) { Map schemaMap = record.getFields().stream().collect(Collectors.toMap(Field::name, Field::schema)); var objSchema = schemaMap.get(colName); Data obj = record.getJsonMap().get(colName); @@ -280,6 +305,9 @@ protected boolean validateDataSchema(Table table, Record record, boolean onlyFie if (colTypeName.equals("TUPLE") && dataTypeName.equals("STRUCT")) continue; + if (colTypeName.equals("VARIANT") && dataTypeName.equals("STRUCT")) + continue; + if (INT_TYPES.contains(colTypeName)) { continue; } @@ -494,6 +522,9 @@ protected void doWriteColValue(Column col, OutputStream stream, Data value, bool mapTmp.forEach((key, mapValue) -> { try { doWritePrimitive(col.getMapKeyType(), value.getMapKeySchema().type(), stream, key, col); + if (col.getMapValueType() != null && col.getMapValueType().isNullable() && mapValue != null) { + BinaryStreamUtils.writeNonNull(stream); + } doWriteColValue(col.getMapValueType(), stream, new Data(value.getNestedValueSchema(), mapValue), defaultsSupport); } catch (IOException e) { throw new RuntimeException(e); @@ -727,7 +758,7 @@ protected void doWritePrimitive(Type columnType, Schema.Type dataType, OutputStr } else if (unionData.getObject() instanceof byte[]) { BinaryStreamUtils.writeString(stream, (byte[]) unionData.getObject()); } else { - throw new DataException("Not implemented conversion from " + unionData.getObject().getClass() + " to String"); + BinaryStreamUtils.writeString(stream, unionData.getObject().toString().getBytes(StandardCharsets.UTF_8)); } break; } @@ -786,6 +817,10 @@ protected void doWriteCol(Data value, boolean fieldExists, Column col, OutputStr return;//And we're done } else if (colType == Type.ARRAY) {//If the column is an array BinaryStreamUtils.writeNonNull(stream);//Then we send nonNull + } else if (colType == Type.VARIANT) { + BinaryStreamUtils.writeNonNull(stream); + BinaryStreamUtils.writeUnsignedInt8(stream, 255); + return; } else { throw new RuntimeException(String.format("An attempt to write null into not nullable column '%s'", name)); } @@ -798,7 +833,10 @@ protected void doWriteCol(Data value, boolean fieldExists, Column col, OutputStr if (!col.isNullable() && value.getObject() == null) { if (colType == Type.ARRAY) BinaryStreamUtils.writeNonNull(stream); - else + else if (colType == Type.VARIANT) { + BinaryStreamUtils.writeUnsignedInt8(stream, 255); + return; + } else throw new RuntimeException(String.format("An attempt to write null into not nullable column '%s'", name)); } } @@ -813,6 +851,12 @@ protected void doWriteCol(Data value, boolean fieldExists, Column col, OutputStr BinaryStreamUtils.writeNonNull(stream); } BinaryStreamUtils.writeNull(stream); + } else if (col.getType() == Type.VARIANT) { + // Variant has a native NULL discriminator (255) — no Nullable/DEFAULT needed. + if (defaultsSupport) { + BinaryStreamUtils.writeNonNull(stream); + } + BinaryStreamUtils.writeUnsignedInt8(stream, 255); } else { // no filled and not nullable LOGGER.error("Column {} is not nullable and no value is provided", name); @@ -821,6 +865,61 @@ protected void doWriteCol(Data value, boolean fieldExists, Column col, OutputStr } } + protected Table evolveTableSchema(Table table, Map allFields) throws InterruptedException { + if (allFields.isEmpty()) { + throw new RuntimeException( + "auto.evolve requires a Connect schema (Avro, Protobuf, or JSON Schema). " + + "Schemaless or string records are not supported with auto.evolve=true."); + } + + Set missingColumns = table.getMissingColumns(allFields.keySet()); + + if (missingColumns.isEmpty()) { + return table; + } + + LOGGER.info("Detected {} new field(s) not present in table {}: {}", missingColumns.size(), table.getName(), missingColumns); + + List columnDefs = new ArrayList<>(); + + for (String fieldName : missingColumns) { + Schema fieldSchema = allFields.get(fieldName); + if (fieldSchema == null) { + continue; + } + + String chType = Column.connectTypeToClickHouseType(fieldSchema, csc.isAutoEvolveStructToJson()); + String defaultExpr = Column.defaultExpressionForType(chType); + columnDefs.add(String.format("%s %s%s", Utils.escapeName(fieldName), chType, defaultExpr)); + } + + if (!columnDefs.isEmpty()) { + chc.alterTableAddColumns(table.getDatabase(), table.getCleanName(), columnDefs, csc.getClickhouseSettings()); + LOGGER.info("Schema evolution complete for table {}. Added columns: {}", table.getName(), columnDefs); + table = refreshTableAfterDDL(table, missingColumns); + } + + return table; + } + + private static final long DDL_REFRESH_BACKOFF_MS = 200; + + private Table refreshTableAfterDDL(Table table, Set expectedNewColumns) throws InterruptedException { + int maxRetries = csc.getAutoEvolveDdlRefreshRetries(); + for (int attempt = 0; attempt < maxRetries; attempt++) { + Table refreshed = urgentTableUpdate(table); + Set stillMissing = refreshed.getMissingColumns(expectedNewColumns); + if (stillMissing.isEmpty()) { + return refreshed; + } + LOGGER.warn("DDL refresh attempt {}/{}: columns {} not yet visible, retrying in {}ms", + attempt + 1, maxRetries, stillMissing, DDL_REFRESH_BACKOFF_MS); + Thread.sleep(DDL_REFRESH_BACKOFF_MS); + } + throw new RetriableException(String.format( + "DDL propagation timeout: columns not visible after %d retries", maxRetries)); + } + protected void doInsertRawBinary(List records, Table table, QueryIdentifier queryId, boolean supportDefaults, boolean retry) throws IOException, ExecutionException, InterruptedException { try { if (chc.isUseClientV2()) { diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java index f89a44fe..ea79d6a5 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java @@ -5,6 +5,7 @@ import com.clickhouse.client.ClickHouseNode; import com.clickhouse.client.ClickHouseNodeSelector; import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseRequest; import com.clickhouse.client.ClickHouseResponse; import com.clickhouse.client.api.Client; import com.clickhouse.client.api.enums.ProxyType; @@ -35,6 +36,7 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; public class ClickHouseHelperClient implements AutoCloseable { @@ -469,6 +471,48 @@ public Table describeTableV2(String database, String tableName) { return table; } + public void alterTableAddColumns(String database, String tableName, List columnDefs, Map clickhouseSettings) { + String addClauses = columnDefs.stream() + .map(colDef -> "ADD COLUMN IF NOT EXISTS " + colDef) + .collect(Collectors.joining(", ")); + String sql = String.format("ALTER TABLE `%s`.`%s` %s", database, tableName, addClauses); + LOGGER.info("Executing DDL: {}", sql); + if (useClientV2) { + alterTableAddColumnV2(sql, clickhouseSettings); + } else { + alterTableAddColumnV1(sql, clickhouseSettings); + } + } + + private void alterTableAddColumnV1(String sql, Map clickhouseSettings) { + try (ClickHouseClient client = ClickHouseClient.builder() + .options(getDefaultClientOptions()) + .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP)) + .build()) { + ClickHouseRequest request = client.read(server).query(sql).set("alter_sync", "1"); + for (Map.Entry entry : clickhouseSettings.entrySet()) { + request.set(entry.getKey(), entry.getValue()); + } + try (ClickHouseResponse response = request.executeAndWait()) { + // DDL executed; alter_sync=1 waits for the local replica to apply + } + } catch (ClickHouseException e) { + throw new RuntimeException("Failed to execute ALTER TABLE: " + sql, e); + } + } + + private void alterTableAddColumnV2(String sql, Map clickhouseSettings) { + QuerySettings settings = new QuerySettings().serverSetting("alter_sync", "1"); + for (Map.Entry entry : clickhouseSettings.entrySet()) { + settings.serverSetting(entry.getKey(), entry.getValue()); + } + try (QueryResponse response = client.query(sql, settings).get()) { + // DDL executed; alter_sync=1 waits for the local replica to apply + } catch (Exception e) { + throw new RuntimeException("Failed to execute ALTER TABLE: " + sql, e); + } + } + public List extractTablesMapping(String database, Map cache) { List
tableList = new ArrayList<>(); for (Table table : showTables(database)) { diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java index dab47414..fbf21e60 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java @@ -6,6 +6,12 @@ import lombok.Getter; import lombok.Setter; import lombok.experimental.Accessors; +import org.apache.kafka.connect.data.Date; +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.clickhouse.kafka.connect.util.reactor.function.Tuple2; @@ -15,9 +21,11 @@ import java.util.ArrayList; import java.util.Comparator; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.Set; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -29,8 +37,16 @@ public class Column { private static final Pattern DECIMAL_TYPE_PATTERN = Pattern.compile("Decimal(?\\d{2,3})?\\s*(\\((?\\d{1,}\\s*)?,*\\s*(?\\d{1,})?\\))?"); private static final Pattern SIMPLE_AGGREGATE_FUNCTION_TYPE_PATTERN = Pattern.compile("^SimpleAggregateFunction\\s*\\([^,]+,\\s*(.+)\\)$"); + private static final int DECIMAL128_MAX_PRECISION = 38; private static final Logger LOGGER = LoggerFactory.getLogger(Column.class); + + // Confluent converter union schema markers + static final String AVRO_UNION_SCHEMA_NAME = "io.confluent.connect.avro.Union"; + static final String PROTOBUF_UNION_SCHEMA_PREFIX = "io.confluent.connect.protobuf.Union"; + static final String GENERALIZED_UNION_PREFIX = "connect_union_"; + static final String CONNECT_UNION_PARAMETER = "org.apache.kafka.connect.data.Union"; + private String name; private Type type; @@ -358,6 +374,148 @@ private static Map extractEnumValues(String valueType) { return data; } + public static String connectTypeToClickHouseType(Schema connectSchema) { + return connectTypeToClickHouseType(connectSchema, false); + } + + public static String connectTypeToClickHouseType(Schema connectSchema, boolean structToJson) { + String baseType = resolveBaseType(connectSchema, structToJson); + + // ClickHouse forbids Nullable wrapping for Array, Map, and Variant types. + if (connectSchema.type() == Schema.Type.ARRAY || connectSchema.type() == Schema.Type.MAP || baseType.startsWith("Variant(")) { + return baseType; + } + + return "Nullable(" + baseType + ")"; + } + + private static String resolveBaseType(Schema connectSchema, boolean structToJson) { + // Check logical types first (same pattern as JDBC connector) + if (connectSchema.name() != null) { + switch (connectSchema.name()) { + case Decimal.LOGICAL_NAME: + int precision = DECIMAL128_MAX_PRECISION; + int scale = 0; + if (connectSchema.parameters() != null && connectSchema.parameters().containsKey("scale")) { + scale = Integer.parseInt(connectSchema.parameters().get("scale")); + } + return String.format("Decimal(%d, %d)", precision, scale); + case Date.LOGICAL_NAME: + return "Date32"; + case Time.LOGICAL_NAME: + return "Int64"; + case Timestamp.LOGICAL_NAME: + return "DateTime64(3)"; + } + } + + // Then check primitive types + switch (connectSchema.type()) { + case INT8: + return "Int8"; + case INT16: + return "Int16"; + case INT32: + return "Int32"; + case INT64: + return "Int64"; + case FLOAT32: + return "Float32"; + case FLOAT64: + return "Float64"; + case BOOLEAN: + return "Bool"; + case STRING: + return "String"; + case BYTES: + return "String"; + case ARRAY: + if (connectSchema.valueSchema() == null) { + return "Array(String)"; + } + String elementType = connectTypeToClickHouseType(connectSchema.valueSchema(), structToJson); + return "Array(" + elementType + ")"; + case MAP: + String keyType = resolveBaseType(connectSchema.keySchema(), structToJson); + String valType = connectTypeToClickHouseType(connectSchema.valueSchema(), structToJson); + return "Map(" + keyType + ", " + valType + ")"; + case STRUCT: + if (isUnionSchema(connectSchema)) { + return resolveUnionType(connectSchema, structToJson); + } + if (structToJson) { + return "JSON"; + } + throw new SchemaTypeInferenceException( + "Cannot auto-evolve STRUCT fields to ClickHouse columns. " + + "Set auto.evolve.struct.to.json=true to map STRUCT to JSON, " + + "or manually create the column as Tuple, JSON, or Nested type."); + default: + throw new SchemaTypeInferenceException("Unsupported Connect type for auto-evolution: " + connectSchema.type()); + } + } + + // Type groups that ClickHouse considers suspicious when mixed inside a Variant. + // See: https://clickhouse.com/docs/sql-reference/data-types/variant + private static final Set SUSPICIOUS_NUMERIC_TYPES = Set.of( + "Int8", "Int16", "Int32", "Int64", + "UInt8", "UInt16", "UInt32", "UInt64", + "Float32", "Float64" + ); + private static final Set SUSPICIOUS_DATE_TYPES = Set.of( + "Date32", "DateTime64(3)" + ); + + private static String resolveUnionType(Schema connectSchema, boolean structToJson) { + if (connectSchema.fields() == null || connectSchema.fields().isEmpty()) { + return "String"; + } + + LinkedHashSet chTypes = new LinkedHashSet<>(); + for (Field field : connectSchema.fields()) { + chTypes.add(resolveBaseType(field.schema(), structToJson)); + } + + // All branches resolve to the same ClickHouse type (e.g. union(string, bytes) → String) + if (chTypes.size() == 1) { + return chTypes.iterator().next(); + } + + // Check for suspicious similar types that ClickHouse rejects by default + if (hasSuspiciousSimilarTypes(chTypes)) { + return "String"; + } + + // Multiple distinct types map to Variant(T1, T2, ...) requires ClickHouse 24.1+. + return "Variant(" + String.join(", ", chTypes) + ")"; + } + + private static boolean hasSuspiciousSimilarTypes(Set chTypes) { + int numericCount = 0; + int dateCount = 0; + for (String t : chTypes) { + if (SUSPICIOUS_NUMERIC_TYPES.contains(t)) numericCount++; + if (SUSPICIOUS_DATE_TYPES.contains(t)) dateCount++; + } + return numericCount > 1 || dateCount > 1; + } + + static boolean isUnionSchema(Schema connectSchema) { + if (connectSchema.type() != Schema.Type.STRUCT) { + return false; + } + String name = connectSchema.name(); + if (name != null + && (name.equals(AVRO_UNION_SCHEMA_NAME) + || name.startsWith(PROTOBUF_UNION_SCHEMA_PREFIX) + || name.startsWith(GENERALIZED_UNION_PREFIX))) { + return true; + } + return connectSchema.parameters() != null + && connectSchema.parameters().containsKey(CONNECT_UNION_PARAMETER); + } + + public Integer convertEnumValues(String value) { if ( this.enumValues != null ) { return enumValues.get(value); @@ -391,4 +549,14 @@ public String toString() { String.format(", variantTypes=%s", variantTypes.stream().map(Tuple2::getT2).collect(Collectors.joining(", ", "[", "]"))) ) + "}"; } + + // Returns a DEFAULT expression for non-Nullable types (Array, Map) so RowBinaryWithDefaults can handle missing fields. + public static String defaultExpressionForType(String chType) { + if (chType.startsWith("Array(")) { + return " DEFAULT []"; + } else if (chType.startsWith("Map(")) { + return " DEFAULT map()"; + } + return ""; + } } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/SchemaTypeInferenceException.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/SchemaTypeInferenceException.java new file mode 100644 index 00000000..a718713a --- /dev/null +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/SchemaTypeInferenceException.java @@ -0,0 +1,7 @@ +package com.clickhouse.kafka.connect.sink.db.mapping; + +public class SchemaTypeInferenceException extends RuntimeException { + public SchemaTypeInferenceException(String message) { + super(message); + } +} diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java index 8bb1844f..747fc14e 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java @@ -8,9 +8,12 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -78,6 +81,16 @@ public void addColumn(Column column) { } } + public Set getMissingColumns(Collection fieldNames) { + Set missing = new LinkedHashSet<>(); + for (String fieldName : fieldNames) { + if (!rootColumnsMap.containsKey(fieldName)) { + missing.add(fieldName); + } + } + return missing; + } + private void handleNonRoot(Column column) { String parentName = column.getName().substring(0, column.getName().lastIndexOf(".")); Column parent = allColumnsMap.getOrDefault(parentName, null); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java index 5021d42b..874af162 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java @@ -1,6 +1,7 @@ package com.clickhouse.kafka.connect.sink; import com.clickhouse.client.api.ClientConfigProperties; +import com.clickhouse.client.api.query.Records; import com.clickhouse.kafka.connect.avro.test.Event; import com.clickhouse.kafka.connect.avro.test.Image; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; @@ -61,7 +62,9 @@ import java.util.stream.LongStream; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @ExtendWith(FromVersionConditionExtension.class) @@ -1802,4 +1805,1174 @@ public void testAvroDateAndTimeTypes() throws Exception { assertEquals(event.getTime2().atDate(LocalDate.of(1970, 1, 1)).format(localFormatter), row.get("time2")); } } + + @Test + public void autoEvolveDisabledRejectsNewField() { + Map props = getBaseProps(); + // auto.evolve defaults to false + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_disabled_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Insert V1 records first (should succeed) + Collection srV1 = SchemaTestData.createSchemaV1(topic, 1, 10); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(srV1); + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Insert V2 records with new field (should succeed because input_format_skip_unknown_fields=1) + // But the new column should NOT be added to the table + Collection srV2 = SchemaTestData.createSchemaV2WithNewNullableField(topic, 1, 10); + chst.put(srV2); + chst.stop(); + + // Rows inserted but new column should not exist + assertEquals(20, ClickHouseTestHelpers.countRows(chc, topic)); + } + + @Test + public void autoEvolveAddsNullableColumn() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_nullable_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Insert V1 records + Collection srV1 = SchemaTestData.createSchemaV1(topic, 1, 10); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(srV1); + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Insert V2 records with new nullable field -> should trigger ALTER TABLE ADD COLUMN + Collection srV2 = SchemaTestData.createSchemaV2WithNewNullableField(topic, 1, 10); + chst.put(srV2); + chst.stop(); + + assertEquals(20, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify the new column exists + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + assertTrue(described.getRootColumnsMap().containsKey("new_string_field"), + "New column 'new_string_field' should have been added by auto.evolve"); + } + + @Test + public void autoEvolveAddsNonNullableFieldAsNullable() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_non_nullable_as_nullable_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + Collection srV2 = SchemaTestData.createSchemaV2WithNewNonNullableField(topic, 1, 10); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(srV2); + chst.stop(); + + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Non-nullable fields are created as Nullable columns - mandatory fields always have a value, + // and Nullable allows old records (without this field) to insert with NULL + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + assertTrue(described.getRootColumnsMap().containsKey("non_nullable_field"), + "New column 'non_nullable_field' should have been added by auto.evolve"); + } + + @Test + public void autoEvolveMultipleNewColumns() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_multi_cols_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Insert V1 first + Collection srV1 = SchemaTestData.createSchemaV1(topic, 1, 10); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(srV1); + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Insert V2 with multiple new nullable fields + Collection srV2 = SchemaTestData.createSchemaV2WithMultipleNewNullableFields(topic, 1, 10); + chst.put(srV2); + chst.stop(); + + assertEquals(20, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify all new columns exist + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + assertTrue(described.getRootColumnsMap().containsKey("new_string_field"), + "Column 'new_string_field' should exist"); + assertTrue(described.getRootColumnsMap().containsKey("new_int32_field"), + "Column 'new_int32_field' should exist"); + assertTrue(described.getRootColumnsMap().containsKey("new_float64_field"), + "Column 'new_float64_field' should exist"); + } + + @Test + public void autoEvolveCachesSchemaAfterDDL() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_cache_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + + // First batch triggers DDL + Collection srV2batch1 = SchemaTestData.createSchemaV2WithNewNullableField(topic, 1, 10); + chst.put(srV2batch1); + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Second batch with same schema should not re-trigger DDL (just insert) + // Use partition 2 to avoid offset deduplication + Collection srV2batch2 = SchemaTestData.createSchemaV2WithNewNullableField(topic, 2, 10); + chst.put(srV2batch2); + chst.stop(); + + assertEquals(20, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify column exists (only one 'new_string_field' column) + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + long count = described.getRootColumnsList().stream() + .filter(c -> c.getName().equals("new_string_field")) + .count(); + assertEquals(1, count, "Should have exactly one 'new_string_field' column"); + } + + @Test + public void autoEvolveMixedSchemaInSingleBatch() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_mixed_batch_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Build a single batch with V1 records followed by V2 records (mixed schemas) + List mixedBatch = new ArrayList<>(); + mixedBatch.addAll(SchemaTestData.createSchemaV1(topic, 1, 5)); + mixedBatch.addAll(SchemaTestData.createSchemaV2WithNewNullableField(topic, 1, 5)); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(mixedBatch); + chst.stop(); + + // All 10 records should be inserted + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // The new column should exist (evolved from V2 records in same batch) + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + assertTrue(described.getRootColumnsMap().containsKey("new_string_field"), + "Column 'new_string_field' should be added even when schema changes mid-batch"); + } + + @Test + public void autoEvolveMixedSchemaOlderRecordsGetNull() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_older_records_null_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // V1 records (no new_string_field) followed by V2 records (has new_string_field) + // Schema is evolved using last record (V2), then entire batch is inserted. + // V1 records should get NULL for the new column. + List mixedBatch = new ArrayList<>(); + mixedBatch.addAll(SchemaTestData.createSchemaV1(topic, 1, 5)); + mixedBatch.addAll(SchemaTestData.createSchemaV2WithNewNullableField(topic, 1, 5)); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(mixedBatch); + chst.stop(); + + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // V1 records should have NULL for the new column + String nullCountQuery = String.format( + "SELECT COUNT(*) FROM `%s` WHERE `new_string_field` IS NULL SETTINGS select_sequential_consistency = 1", topic); + try { + Records nullRecords = chc.getClient().queryRecords(nullCountQuery).get(); + int nullCount = Integer.parseInt(nullRecords.iterator().next().getString(1)); + assertEquals(5, nullCount, "V1 records should have NULL for new_string_field"); + } catch (Exception e) { + throw new RuntimeException(e); + } + + // V2 records should have non-NULL values + String nonNullQuery = String.format( + "SELECT COUNT(*) FROM `%s` WHERE `new_string_field` IS NOT NULL SETTINGS select_sequential_consistency = 1", topic); + try { + Records nonNullRecords = chc.getClient().queryRecords(nonNullQuery).get(); + int nonNullCount = Integer.parseInt(nonNullRecords.iterator().next().getString(1)); + assertEquals(5, nonNullCount, "V2 records should have non-NULL values for new_string_field"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void autoEvolveLogicalTypes() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_logical_types_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Insert V1 first + Collection srV1 = SchemaTestData.createSchemaV1(topic, 1, 10); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(srV1); + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Insert V2 with logical type fields (Decimal, Date, Timestamp) + Collection srV2 = SchemaTestData.createSchemaV2WithLogicalTypes(topic, 1, 10); + chst.put(srV2); + chst.stop(); + + assertEquals(20, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify the logical type columns were created with correct ClickHouse types + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + assertTrue(described.getRootColumnsMap().containsKey("new_decimal_field"), + "Column 'new_decimal_field' should exist"); + assertTrue(described.getRootColumnsMap().containsKey("new_date_field"), + "Column 'new_date_field' should exist"); + assertTrue(described.getRootColumnsMap().containsKey("new_timestamp_field"), + "Column 'new_timestamp_field' should exist"); + + // Verify types + com.clickhouse.kafka.connect.sink.db.mapping.Column decimalCol = described.getRootColumnsMap().get("new_decimal_field"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.Decimal, decimalCol.getType(), + "Decimal logical type should map to ClickHouse Decimal"); + + com.clickhouse.kafka.connect.sink.db.mapping.Column dateCol = described.getRootColumnsMap().get("new_date_field"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.Date32, dateCol.getType(), + "Date logical type should map to ClickHouse Date32"); + + com.clickhouse.kafka.connect.sink.db.mapping.Column tsCol = described.getRootColumnsMap().get("new_timestamp_field"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.DateTime64, tsCol.getType(), + "Timestamp logical type should map to ClickHouse DateTime64"); + } + + @Test + public void autoEvolveRejectsStructField() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_struct_reject_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + Collection srV2 = SchemaTestData.createSchemaV2WithStructField(topic, 1, 10); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + + try { + chst.put(srV2); + assertTrue(false, "Expected exception for STRUCT field auto-evolution"); + } catch (RuntimeException e) { + Throwable t = e; + boolean found = false; + while (t != null) { + if (t.getMessage() != null && t.getMessage().contains("Cannot auto-evolve STRUCT")) { + found = true; + break; + } + t = t.getCause(); + } + assertTrue(found, "Should reject STRUCT field with appropriate message, got: " + e.getMessage()); + } finally { + chst.stop(); + } + } + + // STRUCT field auto-evolved as JSON column when auto.evolve.struct.to.json=true + @Test + public void autoEvolveStructToJsonCreatesJsonColumn() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE_STRUCT_TO_JSON, "true"); + props.put(ClickHouseSinkConfig.CLICKHOUSE_SETTINGS, "input_format_binary_read_json_as_string=1"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_struct_to_json_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + Collection srV2 = SchemaTestData.createSchemaV2WithStructField(topic, 1, 10); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(srV2); + chst.stop(); + + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify the new column was created as JSON type + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + assertTrue(described.getRootColumnsMap().containsKey("new_struct_field"), + "Column 'new_struct_field' should exist"); + com.clickhouse.kafka.connect.sink.db.mapping.Column col = described.getRootColumnsMap().get("new_struct_field"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.JSON, col.getType(), + "Column 'new_struct_field' should be JSON type"); + } + + // V1 records (no struct) inserted first, then V2 records (with struct) trigger JSON column creation. + @Test + public void autoEvolveStructToJsonMixedBatchOlderRecordsGetDefault() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE_STRUCT_TO_JSON, "true"); + props.put(ClickHouseSinkConfig.CLICKHOUSE_SETTINGS, "input_format_binary_read_json_as_string=1"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_struct_json_mixed_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Insert V1 records (no struct field) + Collection srV1 = SchemaTestData.createSchemaV1(topic, 1, 5); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(srV1); + assertEquals(5, ClickHouseTestHelpers.countRows(chc, topic)); + + // Insert V2 records (with struct field) - triggers JSON column creation + Collection srV2 = SchemaTestData.createSchemaV2WithStructField(topic, 1, 5); + chst.put(srV2); + chst.stop(); + + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify JSON column exists + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + com.clickhouse.kafka.connect.sink.db.mapping.Column col = described.getRootColumnsMap().get("new_struct_field"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.JSON, col.getType(), + "Column 'new_struct_field' should be JSON type"); + } + + // STRUCT field with auto.evolve.struct.to.json explicitly false rejects with helpful error message + @Test + public void autoEvolveStructToJsonExplicitlyFalseRejectsStruct() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE_STRUCT_TO_JSON, "false"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_struct_json_false_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + Collection srV2 = SchemaTestData.createSchemaV2WithStructField(topic, 1, 10); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + + try { + chst.put(srV2); + assertTrue(false, "Expected exception for STRUCT field when struct.to.json is false"); + } catch (RuntimeException e) { + Throwable t = e; + boolean found = false; + while (t != null) { + if (t.getMessage() != null && t.getMessage().contains("auto.evolve.struct.to.json=true")) { + found = true; + break; + } + t = t.getCause(); + } + assertTrue(found, "Error message should suggest auto.evolve.struct.to.json=true, got: " + e.getMessage()); + } finally { + chst.stop(); + } + } + + + @Test + public void autoEvolveArrayAndMapFields() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_array_map_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Insert V1 first + Collection srV1 = SchemaTestData.createSchemaV1(topic, 1, 10); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(srV1); + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Insert V2 with Array and Map fields + Collection srV2 = SchemaTestData.createSchemaV2WithArrayAndMapFields(topic, 1, 10); + chst.put(srV2); + chst.stop(); + + assertEquals(20, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify array and map columns were created + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + assertTrue(described.getRootColumnsMap().containsKey("new_array_field"), + "Column 'new_array_field' should exist"); + assertTrue(described.getRootColumnsMap().containsKey("new_map_field"), + "Column 'new_map_field' should exist"); + } + + @Test + public void autoEvolveTripleSchemaInOneBatch() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_triple_schema_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Build a single batch with V1 + V2 + V3 records + List combined = new ArrayList<>(); + combined.addAll(SchemaTestData.createSchemaV1(topic, 1, 5)); + combined.addAll(SchemaTestData.createSchemaV2WithNewNullableField(topic, 1, 5)); + combined.addAll(SchemaTestData.createSchemaV3WithExtraField(topic, 1, 5)); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(combined); + chst.stop(); + + assertEquals(15, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify columns from both V2 and V3 exist + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + assertTrue(described.getRootColumnsMap().containsKey("new_string_field"), + "V2 column 'new_string_field' should exist"); + assertTrue(described.getRootColumnsMap().containsKey("v3_bool_field"), + "V3 column 'v3_bool_field' should exist"); + } + + @Test + public void autoEvolveThreeSeparateBatches() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_three_batches_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + + // Batch 1: Schema V1 (3 fields: off16, p_int64, name) + List batch1 = new ArrayList<>(SchemaTestData.createRichSchemaV1(topic, 1, 5, 0)); + chst.put(batch1); + + // Batch 2: Schema V2 (8 fields: off16, p_int64, name, email, age, score, active, city) + List batch2 = new ArrayList<>(SchemaTestData.createRichSchemaV2(topic, 1, 5, 5)); + chst.put(batch2); + + // Batch 3: Schema V3 (5 fields: off16, p_int64, name, email, country) + List batch3 = new ArrayList<>(SchemaTestData.createRichSchemaV3(topic, 1, 5, 10)); + chst.put(batch3); + + chst.stop(); + + assertEquals(15, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify all evolved columns exist + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + assertTrue(described.getRootColumnsMap().containsKey("name"), "V1 column 'name' should exist"); + assertTrue(described.getRootColumnsMap().containsKey("email"), "V2 column 'email' should exist"); + assertTrue(described.getRootColumnsMap().containsKey("age"), "V2 column 'age' should exist"); + assertTrue(described.getRootColumnsMap().containsKey("score"), "V2 column 'score' should exist"); + assertTrue(described.getRootColumnsMap().containsKey("active"), "V2 column 'active' should exist"); + assertTrue(described.getRootColumnsMap().containsKey("city"), "V2 column 'city' should exist"); + assertTrue(described.getRootColumnsMap().containsKey("country"), "V3 column 'country' should exist"); + + // V1 records should have NULL for V2/V3 columns + String nullEmailQuery = String.format( + "SELECT COUNT(*) FROM `%s` WHERE `email` IS NULL SETTINGS select_sequential_consistency = 1", topic); + // V3 records don't include age/score/active/city — those should be NULL + String nullAgeQuery = String.format( + "SELECT COUNT(*) FROM `%s` WHERE `age` IS NULL SETTINGS select_sequential_consistency = 1", topic); + try { + Records emailNulls = chc.getClient().queryRecords(nullEmailQuery).get(); + int emailNullCount = Integer.parseInt(emailNulls.iterator().next().getString(1)); + assertEquals(5, emailNullCount, "V1 records should have NULL for email"); + + Records ageNulls = chc.getClient().queryRecords(nullAgeQuery).get(); + int ageNullCount = Integer.parseInt(ageNulls.iterator().next().getString(1)); + assertEquals(10, ageNullCount, "V1 + V3 records (10) should have NULL for age"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void autoEvolveMixedSchemasTenRecordsInOneBatch() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_mixed_ten_records_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Single batch with 10 records spanning 3 schema versions, ordered so the + // newest schema (V3) is LAST. Auto-evolve only inspects the last record's + // schema, so only fields present in V3 will be added (name, email, country). + // V2-only fields (age, score, active, city) will NOT be evolved. + // Records 1–5: Schema V1 (3 fields: off16, p_int64, name) + // Records 6–7: Schema V2 (8 fields: off16, p_int64, name, email, age, score, active, city) + // Records 8–10: Schema V3 (5 fields: off16, p_int64, name, email, country) <-- LAST + List batch = new ArrayList<>(); + batch.addAll(SchemaTestData.createRichSchemaV1(topic, 1, 5, 0)); + batch.addAll(SchemaTestData.createRichSchemaV2(topic, 1, 2, 5)); + batch.addAll(SchemaTestData.createRichSchemaV3(topic, 1, 3, 7)); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(batch); + chst.stop(); + + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Only columns present in the LAST record's schema (V3) should be added. + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + assertTrue(described.getRootColumnsMap().containsKey("name"), "V3 column 'name' should exist"); + assertTrue(described.getRootColumnsMap().containsKey("email"), "V3 column 'email' should exist"); + assertTrue(described.getRootColumnsMap().containsKey("country"), "V3 column 'country' should exist"); + + // V2-only fields are NOT added because the last record (V3) does not include them. + // This documents the trade-off of last-record-only schema detection. + assertFalse(described.getRootColumnsMap().containsKey("age"), + "V2-only column 'age' should NOT be added (last record is V3)"); + assertFalse(described.getRootColumnsMap().containsKey("score"), + "V2-only column 'score' should NOT be added (last record is V3)"); + assertFalse(described.getRootColumnsMap().containsKey("active"), + "V2-only column 'active' should NOT be added (last record is V3)"); + assertFalse(described.getRootColumnsMap().containsKey("city"), + "V2-only column 'city' should NOT be added (last record is V3)"); + + // Verify NULL distribution for the columns that DO exist: + // name: all 10 records have it -> 0 NULLs + // email: V1 records (5) lack it -> 5 NULLs + // country: only V3 records (3) have it -> 7 NULLs + try { + String nameNullQuery = String.format( + "SELECT COUNT(*) FROM `%s` WHERE `name` IS NULL SETTINGS select_sequential_consistency = 1", topic); + Records nameNulls = chc.getClient().queryRecords(nameNullQuery).get(); + assertEquals(0, Integer.parseInt(nameNulls.iterator().next().getString(1)), + "All records have name, so 0 NULLs expected"); + + String emailNullQuery = String.format( + "SELECT COUNT(*) FROM `%s` WHERE `email` IS NULL SETTINGS select_sequential_consistency = 1", topic); + Records emailNulls = chc.getClient().queryRecords(emailNullQuery).get(); + assertEquals(5, Integer.parseInt(emailNulls.iterator().next().getString(1)), + "V1 records (5) should have NULL for email"); + + String countryNullQuery = String.format( + "SELECT COUNT(*) FROM `%s` WHERE `country` IS NULL SETTINGS select_sequential_consistency = 1", topic); + Records countryNulls = chc.getClient().queryRecords(countryNullQuery).get(); + assertEquals(7, Integer.parseInt(countryNulls.iterator().next().getString(1)), + "V1 (5) + V2 (2) records should have NULL for country"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // auto-evolve adds columns for every supported primitive + logical type in a single batch + @Test + public void autoEvolveAllPrimitiveAndLogicalTypes() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_all_types_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Insert V1 first to ensure existing rows get NULL for new columns + Collection srV1 = SchemaTestData.createSchemaV1(topic, 1, 5); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(srV1); + assertEquals(5, ClickHouseTestHelpers.countRows(chc, topic)); + + // Insert V2 with all primitive + logical type fields + Collection srV2 = SchemaTestData.createSchemaV2WithAllPrimitiveTypes(topic, 1, 5); + chst.put(srV2); + chst.stop(); + + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify all columns were created with correct types + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + Map cols = described.getRootColumnsMap(); + + // Primitive types + assertTrue(cols.containsKey("new_int8"), "Column 'new_int8' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.INT8, cols.get("new_int8").getType()); + assertTrue(cols.containsKey("new_int16"), "Column 'new_int16' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.INT16, cols.get("new_int16").getType()); + assertTrue(cols.containsKey("new_int32"), "Column 'new_int32' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.INT32, cols.get("new_int32").getType()); + assertTrue(cols.containsKey("new_int64"), "Column 'new_int64' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.INT64, cols.get("new_int64").getType()); + assertTrue(cols.containsKey("new_float32"), "Column 'new_float32' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.FLOAT32, cols.get("new_float32").getType()); + assertTrue(cols.containsKey("new_float64"), "Column 'new_float64' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.FLOAT64, cols.get("new_float64").getType()); + assertTrue(cols.containsKey("new_bool"), "Column 'new_bool' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.BOOLEAN, cols.get("new_bool").getType()); + assertTrue(cols.containsKey("new_string"), "Column 'new_string' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.STRING, cols.get("new_string").getType()); + assertTrue(cols.containsKey("new_bytes"), "Column 'new_bytes' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.STRING, cols.get("new_bytes").getType()); + + // Logical types + assertTrue(cols.containsKey("new_decimal"), "Column 'new_decimal' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.Decimal, cols.get("new_decimal").getType()); + assertTrue(cols.containsKey("new_date"), "Column 'new_date' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.Date32, cols.get("new_date").getType()); + assertTrue(cols.containsKey("new_timestamp"), "Column 'new_timestamp' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.DateTime64, cols.get("new_timestamp").getType()); + } + + // auto-evolve creates Array columns with different element types (Int32, Float64, Bool, String) + @Test + public void autoEvolveTypedArrayColumns() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_typed_arrays_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + Collection srV2 = SchemaTestData.createSchemaV2WithTypedArrays(topic, 1, 10); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(srV2); + chst.stop(); + + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify all array columns were created + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + Map cols = described.getRootColumnsMap(); + + assertTrue(cols.containsKey("arr_int32"), "Column 'arr_int32' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.ARRAY, cols.get("arr_int32").getType()); + assertTrue(cols.containsKey("arr_float64"), "Column 'arr_float64' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.ARRAY, cols.get("arr_float64").getType()); + assertTrue(cols.containsKey("arr_bool"), "Column 'arr_bool' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.ARRAY, cols.get("arr_bool").getType()); + assertTrue(cols.containsKey("arr_string"), "Column 'arr_string' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.ARRAY, cols.get("arr_string").getType()); + } + + // DDL refresh timeout - retries set to 0 so refresh loop never runs, throws RetriableException + @Test + public void autoEvolveDdlRefreshTimeoutThrowsRetriable() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE_DDL_REFRESH_RETRIES, "0"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_ddl_timeout_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // V2 schema with a new field - DDL will succeed but refresh will timeout with 0 retries + Collection srV2 = SchemaTestData.createSchemaV2WithNewNullableField(topic, 1, 5); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + + try { + chst.put(srV2); + assertTrue(false, "Expected RetriableException due to DDL refresh timeout with 0 retries"); + } catch (RuntimeException e) { + // Processing layer may wrap the RetriableException - walk the cause chain + Throwable t = e; + boolean found = false; + while (t != null) { + if (t instanceof org.apache.kafka.connect.errors.RetriableException + && t.getMessage() != null && t.getMessage().contains("DDL propagation timeout")) { + found = true; + break; + } + t = t.getCause(); + } + assertTrue(found, "Should contain RetriableException with DDL propagation timeout in cause chain, got: " + e); + } finally { + chst.stop(); + } + } + + // ALTER TABLE itself fails (table dropped externally after cache populated) + @Test + public void autoEvolveDdlExecutionFailureThrowsRuntimeException() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_ddl_exec_failure_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Insert V1 records to populate the connector's internal table mapping cache + Collection srV1 = SchemaTestData.createSchemaV1(topic, 1, 5); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(srV1); + assertEquals(5, ClickHouseTestHelpers.countRows(chc, topic)); + + // Drop the table externally - the connector still has it cached in memory + ClickHouseTestHelpers.dropTable(chc, topic); + + // V2 schema with a new field - ALTER TABLE will fail because the table no longer exists + Collection srV2 = SchemaTestData.createSchemaV2WithNewNullableField(topic, 1, 5); + try { + chst.put(srV2); + assertTrue(false, "Expected RuntimeException due to ALTER TABLE on dropped table"); + } catch (RuntimeException e) { + // Processing layer wraps exceptions - walk the cause chain for the DDL failure + Throwable t = e; + boolean found = false; + while (t != null) { + if (t.getMessage() != null && (t.getMessage().contains("ALTER TABLE") || t.getMessage().contains("UNKNOWN_TABLE"))) { + found = true; + break; + } + t = t.getCause(); + } + assertTrue(found, "Should indicate DDL failure in cause chain, got: " + e.getClass().getName() + ": " + e.getMessage()); + } finally { + chst.stop(); + } + } + + // STRUCT field without struct-to-json flag throws SchemaTypeInferenceException with helpful message + @Test + public void autoEvolveUnsupportedStructTypeThrowsError() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + // auto.evolve.struct.to.json is false by default + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_unsupported_struct_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + Collection srV2 = SchemaTestData.createSchemaV2WithStructField(topic, 1, 5); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + + try { + chst.put(srV2); + assertTrue(false, "Expected SchemaTypeInferenceException for unsupported STRUCT type"); + } catch (RuntimeException e) { + Throwable t = e; + boolean foundInference = false; + while (t != null) { + if (t instanceof com.clickhouse.kafka.connect.sink.db.mapping.SchemaTypeInferenceException) { + foundInference = true; + break; + } + t = t.getCause(); + } + assertTrue(foundInference, + "Should throw SchemaTypeInferenceException for unsupported STRUCT, got: " + e.getClass().getName() + ": " + e.getMessage()); + } finally { + chst.stop(); + } + } + + // Avro-style union(string, bytes) STRUCT collapses to Nullable(String), not JSON + @Test + public void autoEvolveStringBytesUnionCollapsesToString() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_union_string_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + Collection srV2 = SchemaTestData.createSchemaV2WithStringBytesUnionField(topic, 1, 10); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(srV2); + chst.stop(); + + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify the union field was created as String (not JSON) + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + com.clickhouse.kafka.connect.sink.db.mapping.Column col = described.getRootColumnsMap().get("new_union_field"); + assertNotNull(col, "Column 'new_union_field' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.STRING, col.getType(), + "Union(string, bytes) should collapse to String, not JSON"); + } + + // Avro union(string, int) auto-evolved as Variant(String, Int32) column + @Test + @SinceClickHouseVersion("24.1") + public void autoEvolveMixedUnionCreatesVariantColumn() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + props.put(ClickHouseSinkConfig.CLICKHOUSE_SETTINGS, "allow_experimental_variant_type=1"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = createTopicName("auto_evolve_variant_test"); + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + Collection records = SchemaTestData.createSchemaV2WithMixedTypeUnionField(topic, 1, 10); + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(records); + chst.stop(); + + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify the union field was created as Variant + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + com.clickhouse.kafka.connect.sink.db.mapping.Column col = described.getRootColumnsMap().get("mixed_union"); + assertNotNull(col, "Column 'mixed_union' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.VARIANT, col.getType(), + "union(string, int) should map to Variant, not String or JSON"); + } + + @Test + public void autoEvolveSchemalessRecordsThrowError() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_schemaless_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Create schemaless (string) records. No valueSchema. + List schemaless = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + String json = String.format("{\"off16\": %d, \"p_int64\": %d}", i, (long) i); + schemaless.add(new SinkRecord( + topic, 1, null, null, null, json, + i, System.currentTimeMillis(), TimestampType.CREATE_TIME + )); + } + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + + try { + chst.put(schemaless); + assertTrue(false, "Expected exception for schemaless records with auto.evolve=true"); + } catch (RuntimeException e) { + Throwable t = e; + boolean found = false; + while (t != null) { + if (t.getMessage() != null && t.getMessage().contains("auto.evolve requires a Connect schema")) { + found = true; + break; + } + t = t.getCause(); + } + assertTrue(found, "Expected error about schemaless records in cause chain, got: " + e.getMessage()); + } finally { + chst.stop(); + } + } + + // Avro union(string, bytes) fields auto-evolved as Nullable(String) columns + @Test + public void autoEvolveAvroUnionStringBytesCreatesStringColumn() throws Exception { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = "auto_evolve_avro_union_test"; + ClickHouseTestHelpers.dropTable(chc, topic); + // Table starts with only "name" - union fields "content" and "description" will be auto-evolved + ClickHouseTestHelpers.runQuery(chc, String.format( + "CREATE TABLE `%s` (`name` String) Engine = MergeTree ORDER BY name", topic)); + + Image image1 = Image.newBuilder() + .setName("image1") + .setContent("content1") + .build(); + Image image2 = Image.newBuilder() + .setName("image2") + .setContent(ByteBuffer.wrap("content2".getBytes())) + .setDescription("desc2") + .build(); + + List records = SchemaTestData.convertAvroToSinkRecord( + topic, new AvroSchema(Image.getClassSchema()), Arrays.asList(image1, image2)); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(records); + chst.stop(); + + assertEquals(2, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify that union columns were created as String (not JSON) + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + Map cols = described.getRootColumnsMap(); + + assertTrue(cols.containsKey("content"), "Column 'content' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.STRING, cols.get("content").getType(), + "union(string, bytes) should map to String, not JSON"); + + assertTrue(cols.containsKey("description"), "Column 'description' should exist"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.STRING, cols.get("description").getType(), + "union(null, string, bytes) should map to Nullable(String), not JSON"); + + // Verify data was inserted correctly + List rows = ClickHouseTestHelpers.getAllRowsAsJson(chc, topic); + if (rows.size() == 0) { + rows = ClickHouseTestHelpers.getAllRowsAsJson(chc, topic); + } + assertEquals(2, rows.size()); + } + + @Test + public void autoEvolveMixedBatchLastRecordOlderSchema() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = createTopicName("auto_evolve_last_record_older_test"); + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Build batch where V2 records come first and V1 (older) records are LAST. + // Auto-evolve only inspects the last record's schema, so V1 (which has no + // new fields) means no ALTER TABLE is issued. This test documents that + // trade-off: producers SHOULD ensure the newest schema is the last record + // in a batch when relying on auto-evolve. + List batch = new ArrayList<>(); + batch.addAll(SchemaTestData.createSchemaV2WithNewNullableField(topic, 1, 3)); + batch.addAll(SchemaTestData.createSchemaV1(topic, 1, 2)); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(batch); + chst.stop(); + + // All 5 rows are still inserted (V2 extra field is dropped because the + // table doesn't have the column and input_format_skip_unknown_fields=1). + assertEquals(5, ClickHouseTestHelpers.countRows(chc, topic)); + + // The new column is NOT added because the LAST record (V1) does not include it. + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + assertFalse(described.getRootColumnsMap().containsKey("new_string_field"), + "Column 'new_string_field' should NOT be added when the last record is V1 (older schema)"); + } + + @Test + public void autoEvolveMultiVersionUnionSemantics() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = createTopicName("auto_evolve_union_semantics_test"); + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Batch with V1, V2, V3, V4 in order. Auto-evolve only inspects the LAST + // record's schema (V4), which only adds `v4_float_field`. The unique + // fields from V2 (`new_string_field`) and V3 (`v3_bool_field`) are NOT + // added because they are absent from V4. + List batch = new ArrayList<>(); + batch.addAll(SchemaTestData.createSchemaV1(topic, 1, 2)); + batch.addAll(SchemaTestData.createSchemaV2WithNewNullableField(topic, 1, 2)); + batch.addAll(SchemaTestData.createSchemaV3WithExtraField(topic, 1, 2)); + batch.addAll(SchemaTestData.createSchemaV4WithUniqueField(topic, 1, 2)); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(batch); + chst.stop(); + + assertEquals(8, ClickHouseTestHelpers.countRows(chc, topic)); + + // Only V4's unique field is added (V4 is the last record's schema). + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + assertTrue(described.getRootColumnsMap().containsKey("v4_float_field"), + "V4 column 'v4_float_field' should exist (V4 is last record)"); + + // V2 and V3 unique fields are NOT added because the last record (V4) does not include them. + // This documents the trade-off of last-record-only schema detection. + assertFalse(described.getRootColumnsMap().containsKey("new_string_field"), + "V2-only column 'new_string_field' should NOT be added (last record is V4)"); + assertFalse(described.getRootColumnsMap().containsKey("v3_bool_field"), + "V3-only column 'v3_bool_field' should NOT be added (last record is V4)"); + } + + @Test + public void autoEvolveInterleavedSchemaVersions() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = createTopicName("auto_evolve_non_monotonic_test"); + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Interleaved schema versions [V1, V3, V2, V1, V3] + List batch = new ArrayList<>(); + batch.addAll(SchemaTestData.createSchemaV1(topic, 1, 1)); + batch.addAll(SchemaTestData.createSchemaV3WithExtraField(topic, 1, 1)); + batch.addAll(SchemaTestData.createSchemaV2WithNewNullableField(topic, 1, 1)); + batch.addAll(SchemaTestData.createSchemaV1(topic, 1, 1)); + batch.addAll(SchemaTestData.createSchemaV3WithExtraField(topic, 1, 1)); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(batch); + chst.stop(); + + assertEquals(5, ClickHouseTestHelpers.countRows(chc, topic)); + + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + assertTrue(described.getRootColumnsMap().containsKey("new_string_field"), + "V2 column 'new_string_field' should exist despite non-monotonic order"); + assertTrue(described.getRootColumnsMap().containsKey("v3_bool_field"), + "V3 column 'v3_bool_field' should exist despite non-monotonic order"); + } + + @Test + public void autoEvolveCrossPartitionSchemaDrift() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + props.put(ClickHouseSinkConfig.IGNORE_PARTITIONS_WHEN_BATCHING, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = createTopicName("auto_evolve_cross_partition_test"); + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format("CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Records from different partitions with different schema versions. + // With ignorePartitionsWhenBatching=true, they are merged into a single batch. + // Partition 0: V2 records (has new_string_field) + // Partition 1: V1 records (no new_string_field) - these are last in the merged batch + // Auto-evolve only inspects the last record's schema (V1), so the new + // column is NOT added. This documents the trade-off when schema drifts + // across partitions: the producer of the last partition's records + // determines which schema is used for evolution. + List batch = new ArrayList<>(); + batch.addAll(SchemaTestData.createSchemaV2WithNewNullableField(topic, 0, 3)); + batch.addAll(SchemaTestData.createSchemaV1(topic, 1, 3)); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(batch); + chst.stop(); + + assertEquals(6, ClickHouseTestHelpers.countRows(chc, topic)); + + // The new column from V2 is NOT added because the last record in the + // merged cross-partition batch is V1 (older schema). + com.clickhouse.kafka.connect.sink.db.mapping.Table described = chc.describeTable(chc.getDatabase(), topic); + assertFalse(described.getRootColumnsMap().containsKey("new_string_field"), + "Column 'new_string_field' should NOT be added when last record across partitions is V1"); + } + + // Mixed batch where older records lack auto-evolved Array/Map columns. + @Test + public void autoEvolveMixedBatchArrayMapFieldsMissingInOlderRecords() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = createTopicName("auto_evolve_mixed_array_map_test"); + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format( + "CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // Build a single batch: V1 records (no array/map) followed by V2 records (with array/map) + List mixedBatch = new ArrayList<>(); + mixedBatch.addAll(SchemaTestData.createSchemaV1(topic, 1, 5)); + mixedBatch.addAll(SchemaTestData.createSchemaV2WithArrayAndMapFields(topic, 1, 5)); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(mixedBatch); + chst.stop(); + + // All 10 records should be inserted + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // The new array and map columns should exist + com.clickhouse.kafka.connect.sink.db.mapping.Table described = + chc.describeTable(chc.getDatabase(), topic); + assertTrue(described.getRootColumnsMap().containsKey("new_array_field"), + "Column 'new_array_field' should exist after auto-evolve"); + assertTrue(described.getRootColumnsMap().containsKey("new_map_field"), + "Column 'new_map_field' should exist after auto-evolve"); + } + + // Mixed batch where older records lack an auto-evolved Variant column. + @Test + public void autoEvolveMixedBatchVariantFieldMissingInOlderRecords() { + Map props = getBaseProps(); + props.put(ClickHouseSinkConfig.AUTO_EVOLVE, "true"); + props.put(ClickHouseSinkConfig.CLICKHOUSE_SETTINGS, "allow_experimental_variant_type=1"); + ClickHouseHelperClient chc = ClickHouseTestHelpers.createClient(props); + + String topic = createTopicName("auto_evolve_mixed_variant_test"); + ClickHouseTestHelpers.dropTable(chc, topic); + ClickHouseTestHelpers.runQuery(chc, String.format( + "CREATE TABLE %s ( `off16` Int16, `p_int64` Int64 ) Engine = MergeTree ORDER BY off16", topic)); + + // V1 records (off16 + p_int64 only) followed by V2 records (off16 + p_int64 + mixed_union Variant) + List mixedBatch = new ArrayList<>(); + mixedBatch.addAll(SchemaTestData.createSchemaV1(topic, 1, 5)); + mixedBatch.addAll(SchemaTestData.createSchemaV2WithMixedTypeUnionField(topic, 1, 5)); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + chst.start(props); + chst.put(mixedBatch); + chst.stop(); + + assertEquals(10, ClickHouseTestHelpers.countRows(chc, topic)); + + // Verify the Variant column was created + com.clickhouse.kafka.connect.sink.db.mapping.Table described = + chc.describeTable(chc.getDatabase(), topic); + com.clickhouse.kafka.connect.sink.db.mapping.Column col = described.getRootColumnsMap().get("mixed_union"); + assertNotNull(col, "Column 'mixed_union' should exist after auto-evolve"); + assertEquals(com.clickhouse.kafka.connect.sink.db.mapping.Type.VARIANT, col.getType(), + "mixed_union should be Variant type"); + } } diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/db/mapping/ColumnTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/db/mapping/ColumnTest.java index 22296e8c..0fba5cc6 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/db/mapping/ColumnTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/db/mapping/ColumnTest.java @@ -1,5 +1,7 @@ package com.clickhouse.kafka.connect.sink.db.mapping; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; import org.junit.jupiter.api.Test; import java.util.List; @@ -177,5 +179,179 @@ public void extractEnumOfPrimitives() { assertTrue(col.getEnumValues().containsKey("a, valid")); assertTrue(col.getEnumValues().containsKey("b")); } + + // --- isUnionSchema detection tests --- + + @Test + public void isUnionSchema_avroUnion() { + Schema union = SchemaBuilder.struct() + .name(Column.AVRO_UNION_SCHEMA_NAME) + .field("string", Schema.OPTIONAL_STRING_SCHEMA) + .field("int", Schema.OPTIONAL_INT32_SCHEMA) + .optional() + .build(); + assertTrue(Column.isUnionSchema(union)); + } + + @Test + public void isUnionSchema_protobufOneof() { + Schema union = SchemaBuilder.struct() + .name("io.confluent.connect.protobuf.Union.content") + .field("user_info", Schema.OPTIONAL_STRING_SCHEMA) + .field("product_info", Schema.OPTIONAL_STRING_SCHEMA) + .optional() + .build(); + assertTrue(Column.isUnionSchema(union)); + } + + @Test + public void isUnionSchema_generalizedUnion() { + Schema union = SchemaBuilder.struct() + .name("connect_union_0") + .parameter(Column.CONNECT_UNION_PARAMETER, "connect_union_0") + .field("string", Schema.OPTIONAL_STRING_SCHEMA) + .field("int", Schema.OPTIONAL_INT32_SCHEMA) + .optional() + .build(); + assertTrue(Column.isUnionSchema(union)); + } + + @Test + public void isUnionSchema_connectUnionParameter() { + Schema union = SchemaBuilder.struct() + .parameter(Column.CONNECT_UNION_PARAMETER, "some_annotation") + .field("string", Schema.OPTIONAL_STRING_SCHEMA) + .field("int", Schema.OPTIONAL_INT32_SCHEMA) + .optional() + .build(); + assertTrue(Column.isUnionSchema(union)); + } + + @Test + public void isUnionSchema_realStruct_notDetected() { + Schema struct = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .field("age", Schema.INT32_SCHEMA) + .build(); + assertFalse(Column.isUnionSchema(struct)); + } + + @Test + public void isUnionSchema_primitiveType_notDetected() { + assertFalse(Column.isUnionSchema(Schema.STRING_SCHEMA)); + } + + // --- connectTypeToClickHouseType union mapping tests --- + + @Test + public void unionStringBytes_collapsesToNullableString() { + Schema union = SchemaBuilder.struct() + .name(Column.AVRO_UNION_SCHEMA_NAME) + .field("string", Schema.OPTIONAL_STRING_SCHEMA) + .field("bytes", Schema.OPTIONAL_BYTES_SCHEMA) + .optional() + .build(); + assertEquals("Nullable(String)", Column.connectTypeToClickHouseType(union)); + } + + @Test + public void unionStringInt_mapsToVariant() { + Schema union = SchemaBuilder.struct() + .name(Column.AVRO_UNION_SCHEMA_NAME) + .field("string", Schema.OPTIONAL_STRING_SCHEMA) + .field("int", Schema.OPTIONAL_INT32_SCHEMA) + .optional() + .build(); + assertEquals("Variant(String, Int32)", Column.connectTypeToClickHouseType(union)); + } + + @Test + public void unionStringIntBoolean_mapsToVariant() { + Schema union = SchemaBuilder.struct() + .name(Column.AVRO_UNION_SCHEMA_NAME) + .field("string", Schema.OPTIONAL_STRING_SCHEMA) + .field("int", Schema.OPTIONAL_INT32_SCHEMA) + .field("boolean", Schema.OPTIONAL_BOOLEAN_SCHEMA) + .optional() + .build(); + assertEquals("Variant(String, Int32, Bool)", Column.connectTypeToClickHouseType(union)); + } + + @Test + public void unionSuspiciousNumericTypes_fallsBackToString() { + // Variant(Int32, Int64) is rejected by ClickHouse unless allow_suspicious_variant_types + Schema union = SchemaBuilder.struct() + .name(Column.AVRO_UNION_SCHEMA_NAME) + .field("int", Schema.OPTIONAL_INT32_SCHEMA) + .field("long", Schema.OPTIONAL_INT64_SCHEMA) + .optional() + .build(); + assertEquals("Nullable(String)", Column.connectTypeToClickHouseType(union)); + } + + @Test + public void unionSuspiciousNumericWithString_fallsBackToString() { + Schema union = SchemaBuilder.struct() + .name(Column.AVRO_UNION_SCHEMA_NAME) + .field("string", Schema.OPTIONAL_STRING_SCHEMA) + .field("int", Schema.OPTIONAL_INT32_SCHEMA) + .field("long", Schema.OPTIONAL_INT64_SCHEMA) + .optional() + .build(); + assertEquals("Nullable(String)", Column.connectTypeToClickHouseType(union)); + } + + @Test + public void protobufOneof_stringBytes_collapsesToNullableString() { + // Protobuf oneof { string url = 1; bytes raw = 2; } — field names differ from Avro + Schema union = SchemaBuilder.struct() + .name("io.confluent.connect.protobuf.Union.image") + .field("url", Schema.OPTIONAL_STRING_SCHEMA) + .field("raw", Schema.OPTIONAL_BYTES_SCHEMA) + .optional() + .build(); + assertEquals("Nullable(String)", Column.connectTypeToClickHouseType(union)); + } + + @Test + public void realStruct_withStructToJson_mapsToJSON() { + Schema struct = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .field("age", Schema.INT32_SCHEMA) + .build(); + assertEquals("Nullable(JSON)", Column.connectTypeToClickHouseType(struct, true)); + } + + @Test + public void realStruct_withoutFlag_throws() { + Schema struct = SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .field("age", Schema.INT32_SCHEMA) + .build(); + assertThrows(SchemaTypeInferenceException.class, + () -> Column.connectTypeToClickHouseType(struct, false)); + } + + @Test + public void unionEmptyFields_fallsBackToNullableString() { + Schema union = SchemaBuilder.struct() + .name(Column.AVRO_UNION_SCHEMA_NAME) + .optional() + .build(); + assertEquals("Nullable(String)", Column.connectTypeToClickHouseType(union)); + } + + @Test + public void unionVariant_notWrappedInNullable() { + Schema union = SchemaBuilder.struct() + .name(Column.AVRO_UNION_SCHEMA_NAME) + .field("string", Schema.OPTIONAL_STRING_SCHEMA) + .field("boolean", Schema.OPTIONAL_BOOLEAN_SCHEMA) + .optional() + .build(); + String result = Column.connectTypeToClickHouseType(union); + assertEquals("Variant(String, Bool)", result); + assertFalse(result.startsWith("Nullable(")); + } } diff --git a/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java b/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java index 39bea1b5..f159d046 100644 --- a/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java +++ b/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java @@ -1585,4 +1585,566 @@ public static List convertAvroToSinkRecord(String topic, ParsedSchem schemaAndValues.add(new SinkRecord(topic, 0, null, null, schemaAndValue.schema(), schemaAndValue.value(), schemaAndValues.size())), ArrayList::addAll); } + + public static Collection createSchemaV1(String topic, int partition) { + return createSchemaV1(topic, partition, DEFAULT_TOTAL_RECORDS); + } + + public static Collection createSchemaV1(String topic, int partition, int totalRecords) { + List array = new ArrayList<>(); + + Schema SCHEMA_V1 = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .build(); + + LongStream.range(0, totalRecords).forEachOrdered(n -> { + Struct value_struct = new Struct(SCHEMA_V1) + .put("off16", (short) n) + .put("p_int64", n); + + array.add(new SinkRecord( + topic, + partition, + null, + null, SCHEMA_V1, + value_struct, + n, + System.currentTimeMillis(), + TimestampType.CREATE_TIME + )); + }); + return array; + } + + public static Collection createSchemaV2WithNewNullableField(String topic, int partition) { + return createSchemaV2WithNewNullableField(topic, partition, DEFAULT_TOTAL_RECORDS); + } + + public static Collection createSchemaV2WithNewNullableField(String topic, int partition, int totalRecords) { + List array = new ArrayList<>(); + + Schema SCHEMA_V2 = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("new_string_field", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + long offset = totalRecords; // continue offsets from V1 + for (long n = 0; n < totalRecords; n++) { + Struct value_struct = new Struct(SCHEMA_V2) + .put("off16", (short) n) + .put("p_int64", n) + .put("new_string_field", "value_" + n); + + array.add(new SinkRecord( + topic, + partition, + null, + null, SCHEMA_V2, + value_struct, + offset + n, + System.currentTimeMillis(), + TimestampType.CREATE_TIME + )); + } + return array; + } + + public static Collection createSchemaV2WithNewNonNullableField(String topic, int partition) { + return createSchemaV2WithNewNonNullableField(topic, partition, DEFAULT_TOTAL_RECORDS); + } + + public static Collection createSchemaV2WithNewNonNullableField(String topic, int partition, int totalRecords) { + List array = new ArrayList<>(); + + Schema SCHEMA_V2 = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("non_nullable_field", Schema.STRING_SCHEMA) + .build(); + + long offset = totalRecords; + for (long n = 0; n < totalRecords; n++) { + Struct value_struct = new Struct(SCHEMA_V2) + .put("off16", (short) n) + .put("p_int64", n) + .put("non_nullable_field", "required_" + n); + + array.add(new SinkRecord( + topic, + partition, + null, + null, SCHEMA_V2, + value_struct, + offset + n, + System.currentTimeMillis(), + TimestampType.CREATE_TIME + )); + } + return array; + } + + public static Collection createSchemaV2WithMultipleNewNullableFields(String topic, int partition) { + return createSchemaV2WithMultipleNewNullableFields(topic, partition, DEFAULT_TOTAL_RECORDS); + } + + public static Collection createSchemaV2WithMultipleNewNullableFields(String topic, int partition, int totalRecords) { + List array = new ArrayList<>(); + + Schema SCHEMA_V2 = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("new_string_field", Schema.OPTIONAL_STRING_SCHEMA) + .field("new_int32_field", SchemaBuilder.int32().optional().build()) + .field("new_float64_field", SchemaBuilder.float64().optional().build()) + .build(); + + long offset = totalRecords; + for (long n = 0; n < totalRecords; n++) { + Struct value_struct = new Struct(SCHEMA_V2) + .put("off16", (short) n) + .put("p_int64", n) + .put("new_string_field", "val_" + n) + .put("new_int32_field", (int) n) + .put("new_float64_field", n * 1.5); + + array.add(new SinkRecord( + topic, + partition, + null, + null, SCHEMA_V2, + value_struct, + offset + n, + System.currentTimeMillis(), + TimestampType.CREATE_TIME + )); + } + return array; + } + + public static Collection createSchemaV2WithStructField(String topic, int partition) { + return createSchemaV2WithStructField(topic, partition, DEFAULT_TOTAL_RECORDS); + } + + public static Collection createSchemaV2WithStructField(String topic, int partition, int totalRecords) { + List array = new ArrayList<>(); + + Schema INNER_SCHEMA = SchemaBuilder.struct() + .field("nested_str", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + Schema SCHEMA_V2 = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("new_struct_field", SchemaBuilder.struct() + .field("nested_str", Schema.OPTIONAL_STRING_SCHEMA) + .optional() + .build()) + .build(); + + long offset = totalRecords; + for (long n = 0; n < totalRecords; n++) { + Struct nested = new Struct(SCHEMA_V2.field("new_struct_field").schema()) + .put("nested_str", "nested_" + n); + Struct value_struct = new Struct(SCHEMA_V2) + .put("off16", (short) n) + .put("p_int64", n) + .put("new_struct_field", nested); + + array.add(new SinkRecord( + topic, partition, null, null, SCHEMA_V2, value_struct, + offset + n, System.currentTimeMillis(), TimestampType.CREATE_TIME + )); + } + return array; + } + + public static Collection createSchemaV2WithArrayAndMapFields(String topic, int partition) { + return createSchemaV2WithArrayAndMapFields(topic, partition, DEFAULT_TOTAL_RECORDS); + } + + public static Collection createSchemaV2WithArrayAndMapFields(String topic, int partition, int totalRecords) { + List array = new ArrayList<>(); + + Schema SCHEMA_V2 = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("new_array_field", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build()) + .field("new_map_field", SchemaBuilder.map(Schema.STRING_SCHEMA, SchemaBuilder.int32().optional().build()).optional().build()) + .build(); + + long offset = totalRecords; + for (long n = 0; n < totalRecords; n++) { + Struct value_struct = new Struct(SCHEMA_V2) + .put("off16", (short) n) + .put("p_int64", n) + .put("new_array_field", Arrays.asList("a_" + n, "b_" + n)) + .put("new_map_field", Collections.singletonMap("key_" + n, (int) n)); + + array.add(new SinkRecord( + topic, partition, null, null, SCHEMA_V2, value_struct, + offset + n, System.currentTimeMillis(), TimestampType.CREATE_TIME + )); + } + return array; + } + + public static Collection createSchemaV3WithExtraField(String topic, int partition) { + return createSchemaV3WithExtraField(topic, partition, DEFAULT_TOTAL_RECORDS); + } + + public static Collection createSchemaV3WithExtraField(String topic, int partition, int totalRecords) { + List array = new ArrayList<>(); + + Schema SCHEMA_V3 = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("new_string_field", Schema.OPTIONAL_STRING_SCHEMA) + .field("v3_bool_field", SchemaBuilder.bool().optional().build()) + .build(); + + long offset = totalRecords * 2L; // after V1 and V2 offsets + for (long n = 0; n < totalRecords; n++) { + Struct value_struct = new Struct(SCHEMA_V3) + .put("off16", (short) n) + .put("p_int64", n) + .put("new_string_field", "v3_" + n) + .put("v3_bool_field", n % 2 == 0); + + array.add(new SinkRecord( + topic, partition, null, null, SCHEMA_V3, value_struct, + offset + n, System.currentTimeMillis(), TimestampType.CREATE_TIME + )); + } + return array; + } + + /** + * Schema V4 with a unique field (v4_float_field) not in V2 or V3. + * Fields: off16, p_int64, v4_float_field. + */ + public static Collection createSchemaV4WithUniqueField(String topic, int partition) { + return createSchemaV4WithUniqueField(topic, partition, DEFAULT_TOTAL_RECORDS); + } + + public static Collection createSchemaV4WithUniqueField(String topic, int partition, int totalRecords) { + List array = new ArrayList<>(); + + Schema SCHEMA_V4 = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("v4_float_field", SchemaBuilder.float64().optional().build()) + .build(); + + long offset = totalRecords * 3L; // after V1, V2, V3 offsets + for (long n = 0; n < totalRecords; n++) { + Struct value_struct = new Struct(SCHEMA_V4) + .put("off16", (short) n) + .put("p_int64", n) + .put("v4_float_field", (double) n * 1.5); + + array.add(new SinkRecord( + topic, partition, null, null, SCHEMA_V4, value_struct, + offset + n, System.currentTimeMillis(), TimestampType.CREATE_TIME + )); + } + return array; + } + + /** + * Rich V1 schema with 3 fields: off16, p_int64, name. + */ + public static Collection createRichSchemaV1(String topic, int partition, int totalRecords, long startOffset) { + List array = new ArrayList<>(); + + Schema SCHEMA = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("name", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + for (long n = 0; n < totalRecords; n++) { + Struct value_struct = new Struct(SCHEMA) + .put("off16", (short) n) + .put("p_int64", n) + .put("name", "user_" + n); + + array.add(new SinkRecord( + topic, partition, null, null, SCHEMA, value_struct, + startOffset + n, System.currentTimeMillis(), TimestampType.CREATE_TIME + )); + } + return array; + } + + /** + * Rich V2 schema with 8 fields: off16, p_int64, name, email, age, score, active, city. + */ + public static Collection createRichSchemaV2(String topic, int partition, int totalRecords, long startOffset) { + List array = new ArrayList<>(); + + Schema SCHEMA = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("name", Schema.OPTIONAL_STRING_SCHEMA) + .field("email", Schema.OPTIONAL_STRING_SCHEMA) + .field("age", SchemaBuilder.int32().optional().build()) + .field("score", SchemaBuilder.float64().optional().build()) + .field("active", SchemaBuilder.bool().optional().build()) + .field("city", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + for (long n = 0; n < totalRecords; n++) { + Struct value_struct = new Struct(SCHEMA) + .put("off16", (short) n) + .put("p_int64", n) + .put("name", "user_" + n) + .put("email", "user_" + n + "@example.com") + .put("age", 20 + (int) n) + .put("score", n * 1.5) + .put("active", n % 2 == 0) + .put("city", "city_" + n); + + array.add(new SinkRecord( + topic, partition, null, null, SCHEMA, value_struct, + startOffset + n, System.currentTimeMillis(), TimestampType.CREATE_TIME + )); + } + return array; + } + + /** + * Rich V3 schema with 5 fields: off16, p_int64, name, email, country. + * Drops some V2 fields (age, score, active, city) and adds country. + */ + public static Collection createRichSchemaV3(String topic, int partition, int totalRecords, long startOffset) { + List array = new ArrayList<>(); + + Schema SCHEMA = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("name", Schema.OPTIONAL_STRING_SCHEMA) + .field("email", Schema.OPTIONAL_STRING_SCHEMA) + .field("country", Schema.OPTIONAL_STRING_SCHEMA) + .build(); + + for (long n = 0; n < totalRecords; n++) { + Struct value_struct = new Struct(SCHEMA) + .put("off16", (short) n) + .put("p_int64", n) + .put("name", "user_" + n) + .put("email", "user_" + n + "@example.com") + .put("country", "country_" + n); + + array.add(new SinkRecord( + topic, partition, null, null, SCHEMA, value_struct, + startOffset + n, System.currentTimeMillis(), TimestampType.CREATE_TIME + )); + } + return array; + } + + + public static Collection createSchemaV2WithLogicalTypes(String topic, int partition) { + return createSchemaV2WithLogicalTypes(topic, partition, DEFAULT_TOTAL_RECORDS); + } + + public static Collection createSchemaV2WithLogicalTypes(String topic, int partition, int totalRecords) { + List array = new ArrayList<>(); + + Schema SCHEMA_V2 = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("new_decimal_field", Decimal.builder(2).optional().build()) + .field("new_date_field", org.apache.kafka.connect.data.Date.builder().optional().build()) + .field("new_timestamp_field", Timestamp.builder().optional().build()) + .build(); + + long offset = totalRecords; + for (long n = 0; n < totalRecords; n++) { + Struct value_struct = new Struct(SCHEMA_V2) + .put("off16", (short) n) + .put("p_int64", n) + .put("new_decimal_field", BigDecimal.valueOf(n * 100 + 50, 2)) + .put("new_date_field", Date.from(java.time.Instant.ofEpochMilli(n * 86400000L))) + .put("new_timestamp_field", Date.from(java.time.Instant.ofEpochMilli(System.currentTimeMillis()))); + + array.add(new SinkRecord( + topic, + partition, + null, + null, SCHEMA_V2, + value_struct, + offset + n, + System.currentTimeMillis(), + TimestampType.CREATE_TIME + )); + } + return array; + } + + // Schema with all supported primitive types + logical types as new columns + public static Collection createSchemaV2WithAllPrimitiveTypes(String topic, int partition) { + return createSchemaV2WithAllPrimitiveTypes(topic, partition, DEFAULT_TOTAL_RECORDS); + } + + public static Collection createSchemaV2WithAllPrimitiveTypes(String topic, int partition, int totalRecords) { + List array = new ArrayList<>(); + + Schema SCHEMA_V2 = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("new_int8", SchemaBuilder.int8().optional().build()) + .field("new_int16", SchemaBuilder.int16().optional().build()) + .field("new_int32", SchemaBuilder.int32().optional().build()) + .field("new_int64", SchemaBuilder.int64().optional().build()) + .field("new_float32", SchemaBuilder.float32().optional().build()) + .field("new_float64", SchemaBuilder.float64().optional().build()) + .field("new_bool", SchemaBuilder.bool().optional().build()) + .field("new_string", Schema.OPTIONAL_STRING_SCHEMA) + .field("new_bytes", Schema.OPTIONAL_BYTES_SCHEMA) + .field("new_decimal", Decimal.builder(4).optional().build()) + .field("new_date", org.apache.kafka.connect.data.Date.builder().optional().build()) + .field("new_timestamp", Timestamp.builder().optional().build()) + .build(); + + long offset = totalRecords; + for (long n = 0; n < totalRecords; n++) { + Struct value_struct = new Struct(SCHEMA_V2) + .put("off16", (short) n) + .put("p_int64", n) + .put("new_int8", (byte) n) + .put("new_int16", (short) n) + .put("new_int32", (int) n) + .put("new_int64", n * 100L) + .put("new_float32", (float) n * 1.1f) + .put("new_float64", n * 2.2) + .put("new_bool", n % 2 == 0) + .put("new_string", "str_" + n) + .put("new_bytes", ("bytes_" + n).getBytes()) + .put("new_decimal", java.math.BigDecimal.valueOf(n * 100 + 50, 4)) + .put("new_date", Date.from(java.time.Instant.ofEpochMilli(n * 86400000L))) + .put("new_timestamp", Date.from(java.time.Instant.ofEpochMilli(System.currentTimeMillis()))); + + array.add(new SinkRecord( + topic, partition, null, null, SCHEMA_V2, value_struct, + offset + n, System.currentTimeMillis(), TimestampType.CREATE_TIME + )); + } + return array; + } + + // Schema with arrays of different element types + public static Collection createSchemaV2WithTypedArrays(String topic, int partition) { + return createSchemaV2WithTypedArrays(topic, partition, DEFAULT_TOTAL_RECORDS); + } + + public static Collection createSchemaV2WithTypedArrays(String topic, int partition, int totalRecords) { + List array = new ArrayList<>(); + + Schema SCHEMA_V2 = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("arr_int32", SchemaBuilder.array(SchemaBuilder.int32().optional().build()).optional().build()) + .field("arr_float64", SchemaBuilder.array(SchemaBuilder.float64().optional().build()).optional().build()) + .field("arr_bool", SchemaBuilder.array(SchemaBuilder.bool().optional().build()).optional().build()) + .field("arr_string", SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build()) + .build(); + + long offset = totalRecords; + for (long n = 0; n < totalRecords; n++) { + Struct value_struct = new Struct(SCHEMA_V2) + .put("off16", (short) n) + .put("p_int64", n) + .put("arr_int32", Arrays.asList((int) n, (int) n + 1)) + .put("arr_float64", Arrays.asList(n * 1.1, n * 2.2)) + .put("arr_bool", Arrays.asList(n % 2 == 0, n % 2 != 0)) + .put("arr_string", Arrays.asList("a_" + n, "b_" + n)); + + array.add(new SinkRecord( + topic, partition, null, null, SCHEMA_V2, value_struct, + offset + n, System.currentTimeMillis(), TimestampType.CREATE_TIME + )); + } + return array; + } + + // Schema with an Avro-style union STRUCT (all STRING/BYTES fields) that should collapse to String + public static Collection createSchemaV2WithStringBytesUnionField(String topic, int partition) { + return createSchemaV2WithStringBytesUnionField(topic, partition, DEFAULT_TOTAL_RECORDS); + } + + public static Collection createSchemaV2WithStringBytesUnionField(String topic, int partition, int totalRecords) { + List array = new ArrayList<>(); + + // Simulates how Confluent AvroConverter represents union(string, bytes) + // AvroConverter sets schema.name() to "io.confluent.connect.avro.Union" + Schema UNION_SCHEMA = SchemaBuilder.struct() + .name("io.confluent.connect.avro.Union") + .field("string", Schema.OPTIONAL_STRING_SCHEMA) + .field("bytes", Schema.OPTIONAL_BYTES_SCHEMA) + .optional() + .build(); + + Schema SCHEMA_V2 = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("new_union_field", UNION_SCHEMA) + .build(); + + long offset = totalRecords; + for (long n = 0; n < totalRecords; n++) { + Struct unionValue = new Struct(UNION_SCHEMA) + .put("string", "union_str_" + n); + Struct value_struct = new Struct(SCHEMA_V2) + .put("off16", (short) n) + .put("p_int64", n) + .put("new_union_field", unionValue); + + array.add(new SinkRecord( + topic, partition, null, null, SCHEMA_V2, value_struct, + offset + n, System.currentTimeMillis(), TimestampType.CREATE_TIME + )); + } + return array; + } + + // Schema with an Avro-style union STRUCT (string + int) that should map to Variant(String, Int32) + public static Collection createSchemaV2WithMixedTypeUnionField(String topic, int partition, int totalRecords) { + List array = new ArrayList<>(); + + Schema UNION_SCHEMA = SchemaBuilder.struct() + .name("io.confluent.connect.avro.Union") + .field("string", Schema.OPTIONAL_STRING_SCHEMA) + .field("int", Schema.OPTIONAL_INT32_SCHEMA) + .optional() + .build(); + + Schema SCHEMA_V2 = SchemaBuilder.struct() + .field("off16", Schema.INT16_SCHEMA) + .field("p_int64", Schema.INT64_SCHEMA) + .field("mixed_union", UNION_SCHEMA) + .build(); + + long offset = totalRecords; + for (long n = 0; n < totalRecords; n++) { + Struct unionValue; + if (n % 2 == 0) { + unionValue = new Struct(UNION_SCHEMA).put("string", "val_" + n); + } else { + unionValue = new Struct(UNION_SCHEMA).put("int", (int) n); + } + Struct value_struct = new Struct(SCHEMA_V2) + .put("off16", (short) n) + .put("p_int64", n) + .put("mixed_union", unionValue); + + array.add(new SinkRecord( + topic, partition, null, null, SCHEMA_V2, value_struct, + offset + n, System.currentTimeMillis(), TimestampType.CREATE_TIME + )); + } + return array; + } }