Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
3510939
feature: enable auto.evolve parameter
guilleov Mar 23, 2026
060a44d
fix: avoid query connection leak
guilleov Mar 25, 2026
fe8c6be
chore: move value to static field to make it more clear
guilleov Mar 25, 2026
90807e7
fix: create custom type inference exception
guilleov Mar 25, 2026
c95ecb0
fix: dont wrap connectTypeToClickHouseType in try-catch since method …
guilleov Mar 25, 2026
197cef2
fix: move Nullable wrapping into connectTypeToClickHouseType and alwa…
guilleov Mar 25, 2026
e62cba1
chore: remove sub batching since is not needed
guilleov Mar 25, 2026
48767ff
feat: add auto evolve DDL refresh retries configuration to ClickHouse…
guilleov Mar 25, 2026
8d4d49f
fix: update DDL refresh logic to throw RetriableException on timeout
guilleov Mar 25, 2026
e926ae1
fix: schema requirement for auto evolve
guilleov Mar 25, 2026
b458daf
refactor: optimize alterTableAddColumns method to use a single SQL st…
guilleov Mar 25, 2026
3c755bb
feat: add auto evolve struct to JSON configuration option to ClickHou…
guilleov Mar 25, 2026
670eec5
feat: enhance schema handling by adding union type detection and mapp…
guilleov Mar 25, 2026
80f0c4e
feat: add tests for auto-evolving schemas in ClickHouseSinkTask
guilleov Mar 26, 2026
f67be35
feat: improve auto-evolve functionality to handle mixed schema versions
guilleov Mar 26, 2026
e0d340a
fix: remove unused import
guilleov Mar 27, 2026
5b763ee
feature: enable auto.evolve parameter
guilleov Mar 23, 2026
2209243
feat: add auto evolve struct to JSON configuration option to ClickHou…
guilleov Mar 25, 2026
b1e616a
feat: enhance schema handling by adding union type detection and mapp…
guilleov Mar 25, 2026
cd28e13
feat: add tests for auto-evolving schemas in ClickHouseSinkTask
guilleov Mar 26, 2026
aec1935
fix: remove unused import
guilleov Mar 27, 2026
f8eacc5
fix: escape column names in ClickHouseWriter to prevent SQL injection…
guilleov Mar 27, 2026
bd9ba2d
refactor: remove duplicated code from merge
guilleov Mar 27, 2026
f2235e2
feat: add default expressions for Array and Map types
guilleov Mar 27, 2026
16c0a44
fix: handle Variant columns in mixed-schema batches with auto.evolve
guilleov Mar 27, 2026
95e330a
feat: optimize field extraction in auto-evolve by using IdentityHashM…
guilleov Mar 27, 2026
bcc0874
feat: forward clickhouse settings to ALTER TABLE DDL in auto-evolve
guilleov Mar 29, 2026
598171b
fix: handle Variant and union null serialization in RowBinary writer
guilleov Mar 29, 2026
d3a72a5
refactor: simplify auto-evolve to check only last record schema
guilleov Mar 29, 2026
8646bc0
fix: replace createTable with runQuery to match main convention
guilleov Apr 1, 2026
564b1f9
feature: enable auto.evolve parameter
guilleov Mar 23, 2026
f113745
fix: avoid query connection leak
guilleov Mar 25, 2026
352b9f9
chore: move value to static field to make it more clear
guilleov Mar 25, 2026
419bac5
fix: create custom type inference exception
guilleov Mar 25, 2026
00d652d
fix: dont wrap connectTypeToClickHouseType in try-catch since method …
guilleov Mar 25, 2026
bff79be
fix: move Nullable wrapping into connectTypeToClickHouseType and alwa…
guilleov Mar 25, 2026
f45995c
chore: remove sub batching since is not needed
guilleov Mar 25, 2026
b41ee1d
feat: add auto evolve DDL refresh retries configuration to ClickHouse…
guilleov Mar 25, 2026
b206080
fix: update DDL refresh logic to throw RetriableException on timeout
guilleov Mar 25, 2026
3923c86
fix: schema requirement for auto evolve
guilleov Mar 25, 2026
dde2b57
refactor: optimize alterTableAddColumns method to use a single SQL st…
guilleov Mar 25, 2026
5cf2e1c
feat: add auto evolve struct to JSON configuration option to ClickHou…
guilleov Mar 25, 2026
9ef332f
feat: enhance schema handling by adding union type detection and mapp…
guilleov Mar 25, 2026
79c91af
feat: add tests for auto-evolving schemas in ClickHouseSinkTask
guilleov Mar 26, 2026
3424a5e
feat: improve auto-evolve functionality to handle mixed schema versions
guilleov Mar 26, 2026
bfb5dad
fix: remove unused import
guilleov Mar 27, 2026
c3ee1a0
feature: enable auto.evolve parameter
guilleov Mar 23, 2026
2b45928
feat: add auto evolve struct to JSON configuration option to ClickHou…
guilleov Mar 25, 2026
4d67ee9
feat: enhance schema handling by adding union type detection and mapp…
guilleov Mar 25, 2026
7b8d2d7
feat: add tests for auto-evolving schemas in ClickHouseSinkTask
guilleov Mar 26, 2026
4050a55
fix: remove unused import
guilleov Mar 27, 2026
c07421e
fix: escape column names in ClickHouseWriter to prevent SQL injection…
guilleov Mar 27, 2026
34843af
refactor: remove duplicated code from merge
guilleov Mar 27, 2026
a24127b
feat: add default expressions for Array and Map types
guilleov Mar 27, 2026
4538336
fix: handle Variant columns in mixed-schema batches with auto.evolve
guilleov Mar 27, 2026
401fe21
feat: optimize field extraction in auto-evolve by using IdentityHashM…
guilleov Mar 27, 2026
4571045
feat: forward clickhouse settings to ALTER TABLE DDL in auto-evolve
guilleov Mar 29, 2026
a0a71e1
fix: handle Variant and union null serialization in RowBinary writer
guilleov Mar 29, 2026
c44c2f4
refactor: simplify auto-evolve to check only last record schema
guilleov Mar 29, 2026
65a802d
fix: replace createTable with runQuery to match main convention
guilleov Apr 1, 2026
471e2b6
Merge branch 'main' into feature/schema-evolution
guilleov Apr 29, 2026
183c1e0
Merge branch 'feature/schema-evolution' of github.com:guilleov/clickh…
guilleov Apr 29, 2026
2e56b81
fix: use existing test helpers for auto-evolve tests
guilleov Apr 29, 2026
714435f
test: align auto-evolve tests with last-record-only contract
guilleov Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
# Unreleased
# 1.3.8 (unreleased)

## New Features
* Added `auto.evolve` configuration option for automatic table schema evolution. When enabled, the connector detects new fields in incoming records and issues `ALTER TABLE ... ADD COLUMN IF NOT EXISTS` against ClickHouse. Disabled by default. (https://github.com/ClickHouse/clickhouse-kafka-connect/issues/277)

## Bug Fixes
* Fixed RowBinary serialization for Map columns with Nullable value types. The nullable marker byte was missing when writing map values, causing `CANNOT_READ_ALL_DATA` errors for `Map(K, Nullable(V))` columns.

## New Features
* Added `auto.evolve` configuration option for automatic table schema evolution. When enabled, the connector detects new fields in incoming records and issues `ALTER TABLE ... ADD COLUMN IF NOT EXISTS` against ClickHouse. Disabled by default. (https://github.com/ClickHouse/clickhouse-kafka-connect/issues/277)

## Dependencies
* Updated clickhouse-java version from `0.9.4` to `0.9.5`

# 1.3.7, 2026-03-25
# 1.3.7, 2026-03-25

## Security
* Upgraded `com.fasterxml.jackson.core` dependencies to version with fix for https://github.com/advisories/GHSA-72hv-8253-57qq (https://github.com/ClickHouse/clickhouse-kafka-connect/pull/690).

# Improvements
* `Gson` replaced with `Jackson` for performance and better maintainability (https://github.com/ClickHouse/clickhouse-kafka-connect/pull/676).


# 1.3.6, 2026-03-18

## New Features
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ public class ClickHouseSinkConfig {
public static final String REPORT_INSERTED_OFFSETS = "reportInsertedOffsets";
public static final String ERROR_TOLERANCE_ALL = "all";
public static final String ERROR_TOLERANCE_NONE = "none";
public static final String AUTO_EVOLVE = "auto.evolve";
public static final String AUTO_EVOLVE_DDL_REFRESH_RETRIES = "auto.evolve.ddl.refresh.retries";
public static final String AUTO_EVOLVE_STRUCT_TO_JSON = "auto.evolve.struct.to.json";
public static final String CONNECTOR_RETRY_TIMEOUT = "errors.retry.timeout";

public static final long MINIMAL_RETRY_TIMEOUT_THR_WARN = TimeUnit.SECONDS.toMillis(10);
Expand Down Expand Up @@ -110,6 +113,9 @@ public class ClickHouseSinkConfig {
private final int bufferCount;
private final long bufferFlushTime;
private final boolean reportInsertedOffsets;
private final boolean autoEvolve;
private final int autoEvolveDdlRefreshRetries;
private final boolean autoEvolveStructToJson;
private final boolean binaryFormatWrtiteJsonAsString;
private final String sslSocketSni;

Expand Down Expand Up @@ -296,6 +302,10 @@ public ClickHouseSinkConfig(Map<String, String> props) {
LOGGER.info("Internal buffering enabled: bufferCount={}, bufferFlushTime={}ms", this.bufferCount, this.bufferFlushTime);
}

this.autoEvolve = Boolean.parseBoolean(props.getOrDefault(AUTO_EVOLVE, "false"));
this.autoEvolveDdlRefreshRetries = Integer.parseInt(props.getOrDefault(AUTO_EVOLVE_DDL_REFRESH_RETRIES, "3"));
this.autoEvolveStructToJson = Boolean.parseBoolean(props.getOrDefault(AUTO_EVOLVE_STRUCT_TO_JSON, "false"));

String jsonAsString = getClickhouseSettings().get("input_format_binary_read_json_as_string");
this.binaryFormatWrtiteJsonAsString = jsonAsString != null && (jsonAsString.equalsIgnoreCase("true") || jsonAsString.equals("1"));

Expand Down Expand Up @@ -692,6 +702,40 @@ private static ConfigDef createConfigDef() {
ConfigDef.Width.MEDIUM,
"SSL Socket SNI"
);

String ddlGroup = "DDL";
int ddlOrderInGroup = 0;
configDef.define(AUTO_EVOLVE,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.MEDIUM,
"Whether to automatically add columns to the destination table when a record contains fields not present in the table. default: false",
ddlGroup,
++ddlOrderInGroup,
ConfigDef.Width.SHORT,
"Auto evolve table schema."
);
configDef.define(AUTO_EVOLVE_DDL_REFRESH_RETRIES,
ConfigDef.Type.INT,
3,
ConfigDef.Range.atLeast(0),
ConfigDef.Importance.LOW,
"Number of retries when waiting for DDL changes to propagate after schema evolution. default: 3",
ddlGroup,
++ddlOrderInGroup,
ConfigDef.Width.SHORT,
"DDL refresh retries"
);
configDef.define(AUTO_EVOLVE_STRUCT_TO_JSON,
ConfigDef.Type.BOOLEAN,
false,
ConfigDef.Importance.MEDIUM,
"Whether to map Connect STRUCT fields to ClickHouse JSON columns during schema evolution. default: false",
ddlGroup,
++ddlOrderInGroup,
ConfigDef.Width.SHORT,
"Map STRUCT to JSON"
);
return configDef;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.DataException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,8 +50,10 @@
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoField;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -207,6 +211,26 @@ public void doInsert(List<Record> records, QueryIdentifier queryId, ErrorReporte
String database = first.getDatabase();
Table table = getTable(database, topic);
if (table == null) { return; }//We checked the error flag in getTable, so we don't need to check it again here

if (csc.isAutoEvolve()) {
// Check the last record's schema for new fields.
// Limitation: if a batch contains multiple schema versions, only the last one is checked.
// A full scan across all records can be implemented later if needed.
Record last = records.get(records.size() - 1);
Map<String, Schema> lastFields = new LinkedHashMap<>();
if (last.getFields() != null) {
for (Field f : last.getFields()) {
lastFields.put(f.name(), f.schema());
}
}
table = evolveTableSchema(table, lastFields);
}
Comment on lines +215 to +227
Copy link

Copilot AI Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says auto-evolve "only adds Nullable columns", but Column.connectTypeToClickHouseType explicitly returns non-nullable types for Array/Map/Variant (ClickHouse restriction). After evolving such a column, older records that don't contain the new field will hit the fieldExists=false path in doWriteCol(...) and currently throw (and for Map may NPE if the field exists-but-null). Consider adding DEFAULT expressions for newly added Array/Map/Variant columns (so RowBinaryWithDefaults can emit defaults), or explicitly treat missing fields for these types as empty values / Variant NULL discriminator during serialization.

Copilot uses AI. Check for mistakes.

doInsertBatch(records, table, queryId);
}

private void doInsertBatch(List<Record> records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException {
Record first = records.get(0);
LOGGER.debug("Trying to insert [{}] records to table name [{}] (QueryId: [{}])", records.size(), table.getName(), queryId.getQueryId());
switch (first.getSchemaType()) {
case SCHEMA:
Expand Down Expand Up @@ -234,7 +258,8 @@ protected boolean validateDataSchema(Table table, Record record, boolean onlyFie
Type type = col.getType();
boolean isNullable = col.isNullable();
boolean hasDefault = col.hasDefault();
if (!isNullable && !hasDefault) {
// Variant has a native NULL discriminator (255) so it can accept missing values without Nullable or DEFAULT.
if (!isNullable && !hasDefault && type != Type.VARIANT) {
Map<String, Schema> schemaMap = record.getFields().stream().collect(Collectors.toMap(Field::name, Field::schema));
var objSchema = schemaMap.get(colName);
Data obj = record.getJsonMap().get(colName);
Expand Down Expand Up @@ -280,6 +305,9 @@ protected boolean validateDataSchema(Table table, Record record, boolean onlyFie
if (colTypeName.equals("TUPLE") && dataTypeName.equals("STRUCT"))
continue;

if (colTypeName.equals("VARIANT") && dataTypeName.equals("STRUCT"))
continue;

if (INT_TYPES.contains(colTypeName)) {
continue;
}
Expand Down Expand Up @@ -494,6 +522,9 @@ protected void doWriteColValue(Column col, OutputStream stream, Data value, bool
mapTmp.forEach((key, mapValue) -> {
try {
doWritePrimitive(col.getMapKeyType(), value.getMapKeySchema().type(), stream, key, col);
if (col.getMapValueType() != null && col.getMapValueType().isNullable() && mapValue != null) {
BinaryStreamUtils.writeNonNull(stream);
}
doWriteColValue(col.getMapValueType(), stream, new Data(value.getNestedValueSchema(), mapValue), defaultsSupport);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -727,7 +758,7 @@ protected void doWritePrimitive(Type columnType, Schema.Type dataType, OutputStr
} else if (unionData.getObject() instanceof byte[]) {
BinaryStreamUtils.writeString(stream, (byte[]) unionData.getObject());
} else {
throw new DataException("Not implemented conversion from " + unionData.getObject().getClass() + " to String");
BinaryStreamUtils.writeString(stream, unionData.getObject().toString().getBytes(StandardCharsets.UTF_8));
}
break;
}
Expand Down Expand Up @@ -786,6 +817,10 @@ protected void doWriteCol(Data value, boolean fieldExists, Column col, OutputStr
return;//And we're done
} else if (colType == Type.ARRAY) {//If the column is an array
BinaryStreamUtils.writeNonNull(stream);//Then we send nonNull
} else if (colType == Type.VARIANT) {
BinaryStreamUtils.writeNonNull(stream);
BinaryStreamUtils.writeUnsignedInt8(stream, 255);
return;
} else {
throw new RuntimeException(String.format("An attempt to write null into not nullable column '%s'", name));
}
Expand All @@ -798,7 +833,10 @@ protected void doWriteCol(Data value, boolean fieldExists, Column col, OutputStr
if (!col.isNullable() && value.getObject() == null) {
if (colType == Type.ARRAY)
BinaryStreamUtils.writeNonNull(stream);
else
else if (colType == Type.VARIANT) {
BinaryStreamUtils.writeUnsignedInt8(stream, 255);
return;
} else
throw new RuntimeException(String.format("An attempt to write null into not nullable column '%s'", name));
}
}
Expand All @@ -813,6 +851,12 @@ protected void doWriteCol(Data value, boolean fieldExists, Column col, OutputStr
BinaryStreamUtils.writeNonNull(stream);
}
BinaryStreamUtils.writeNull(stream);
} else if (col.getType() == Type.VARIANT) {
// Variant has a native NULL discriminator (255) — no Nullable/DEFAULT needed.
if (defaultsSupport) {
BinaryStreamUtils.writeNonNull(stream);
}
BinaryStreamUtils.writeUnsignedInt8(stream, 255);
} else {
// no filled and not nullable
LOGGER.error("Column {} is not nullable and no value is provided", name);
Expand All @@ -821,6 +865,61 @@ protected void doWriteCol(Data value, boolean fieldExists, Column col, OutputStr
}
}

protected Table evolveTableSchema(Table table, Map<String, Schema> allFields) throws InterruptedException {
if (allFields.isEmpty()) {
throw new RuntimeException(
"auto.evolve requires a Connect schema (Avro, Protobuf, or JSON Schema). " +
"Schemaless or string records are not supported with auto.evolve=true.");
}

Set<String> missingColumns = table.getMissingColumns(allFields.keySet());

if (missingColumns.isEmpty()) {
return table;
}

LOGGER.info("Detected {} new field(s) not present in table {}: {}", missingColumns.size(), table.getName(), missingColumns);

List<String> columnDefs = new ArrayList<>();

for (String fieldName : missingColumns) {
Schema fieldSchema = allFields.get(fieldName);
if (fieldSchema == null) {
continue;
}

String chType = Column.connectTypeToClickHouseType(fieldSchema, csc.isAutoEvolveStructToJson());
String defaultExpr = Column.defaultExpressionForType(chType);
columnDefs.add(String.format("%s %s%s", Utils.escapeName(fieldName), chType, defaultExpr));
}

if (!columnDefs.isEmpty()) {
chc.alterTableAddColumns(table.getDatabase(), table.getCleanName(), columnDefs, csc.getClickhouseSettings());
LOGGER.info("Schema evolution complete for table {}. Added columns: {}", table.getName(), columnDefs);
table = refreshTableAfterDDL(table, missingColumns);
}

return table;
}

private static final long DDL_REFRESH_BACKOFF_MS = 200;

private Table refreshTableAfterDDL(Table table, Set<String> expectedNewColumns) throws InterruptedException {
int maxRetries = csc.getAutoEvolveDdlRefreshRetries();
for (int attempt = 0; attempt < maxRetries; attempt++) {
Table refreshed = urgentTableUpdate(table);
Set<String> stillMissing = refreshed.getMissingColumns(expectedNewColumns);
if (stillMissing.isEmpty()) {
return refreshed;
}
LOGGER.warn("DDL refresh attempt {}/{}: columns {} not yet visible, retrying in {}ms",
attempt + 1, maxRetries, stillMissing, DDL_REFRESH_BACKOFF_MS);
Thread.sleep(DDL_REFRESH_BACKOFF_MS);
}
throw new RetriableException(String.format(
"DDL propagation timeout: columns not visible after %d retries", maxRetries));
}

protected void doInsertRawBinary(List<Record> records, Table table, QueryIdentifier queryId, boolean supportDefaults, boolean retry) throws IOException, ExecutionException, InterruptedException {
try {
if (chc.isUseClientV2()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseNodeSelector;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.client.api.Client;
import com.clickhouse.client.api.enums.ProxyType;
Expand Down Expand Up @@ -35,6 +36,7 @@
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;

public class ClickHouseHelperClient implements AutoCloseable {

Expand Down Expand Up @@ -469,6 +471,48 @@ public Table describeTableV2(String database, String tableName) {
return table;
}

public void alterTableAddColumns(String database, String tableName, List<String> columnDefs, Map<String, String> clickhouseSettings) {
String addClauses = columnDefs.stream()
.map(colDef -> "ADD COLUMN IF NOT EXISTS " + colDef)
.collect(Collectors.joining(", "));
String sql = String.format("ALTER TABLE `%s`.`%s` %s", database, tableName, addClauses);
LOGGER.info("Executing DDL: {}", sql);
if (useClientV2) {
alterTableAddColumnV2(sql, clickhouseSettings);
} else {
alterTableAddColumnV1(sql, clickhouseSettings);
}
}

private void alterTableAddColumnV1(String sql, Map<String, String> clickhouseSettings) {
try (ClickHouseClient client = ClickHouseClient.builder()
.options(getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build()) {
ClickHouseRequest<?> request = client.read(server).query(sql).set("alter_sync", "1");
for (Map.Entry<String, String> entry : clickhouseSettings.entrySet()) {
request.set(entry.getKey(), entry.getValue());
}
try (ClickHouseResponse response = request.executeAndWait()) {
// DDL executed; alter_sync=1 waits for the local replica to apply
}
} catch (ClickHouseException e) {
throw new RuntimeException("Failed to execute ALTER TABLE: " + sql, e);
}
}

private void alterTableAddColumnV2(String sql, Map<String, String> clickhouseSettings) {
QuerySettings settings = new QuerySettings().serverSetting("alter_sync", "1");
for (Map.Entry<String, String> entry : clickhouseSettings.entrySet()) {
settings.serverSetting(entry.getKey(), entry.getValue());
}
try (QueryResponse response = client.query(sql, settings).get()) {
// DDL executed; alter_sync=1 waits for the local replica to apply
} catch (Exception e) {
throw new RuntimeException("Failed to execute ALTER TABLE: " + sql, e);
}
}

public List<Table> extractTablesMapping(String database, Map<String, Table> cache) {
List<Table> tableList = new ArrayList<>();
for (Table table : showTables(database)) {
Expand Down
Loading
Loading