-
Notifications
You must be signed in to change notification settings - Fork 66
Feature/schema evolution #687
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 30 commits
3510939
060a44d
fe8c6be
90807e7
c95ecb0
197cef2
e62cba1
48767ff
8d4d49f
e926ae1
b458daf
3c755bb
670eec5
80f0c4e
f67be35
e0d340a
5b763ee
2209243
b1e616a
cd28e13
aec1935
f8eacc5
bd9ba2d
f2235e2
16c0a44
95e330a
bcc0874
598171b
d3a72a5
8646bc0
564b1f9
f113745
352b9f9
419bac5
00d652d
bff79be
f45995c
b41ee1d
b206080
3923c86
dde2b57
5cf2e1c
9ef332f
79c91af
3424a5e
bfb5dad
c3ee1a0
2b45928
4d67ee9
7b8d2d7
4050a55
c07421e
34843af
a24127b
4538336
401fe21
4571045
a0a71e1
c44c2f4
65a802d
471e2b6
183c1e0
2e56b81
714435f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,8 @@ | |
| import org.apache.kafka.connect.data.Schema; | ||
| import org.apache.kafka.connect.data.Struct; | ||
| import org.apache.kafka.connect.errors.DataException; | ||
| import org.apache.kafka.connect.errors.RetriableException; | ||
| import org.apache.kafka.connect.sink.SinkRecord; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
||
|
|
@@ -48,8 +50,10 @@ | |
| import java.time.ZonedDateTime; | ||
| import java.time.format.DateTimeFormatter; | ||
| import java.time.temporal.ChronoField; | ||
| import java.util.ArrayList; | ||
| import java.util.Date; | ||
| import java.util.HashMap; | ||
| import java.util.LinkedHashMap; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Objects; | ||
|
|
@@ -207,6 +211,26 @@ public void doInsert(List<Record> records, QueryIdentifier queryId, ErrorReporte | |
| String database = first.getDatabase(); | ||
| Table table = getTable(database, topic); | ||
| if (table == null) { return; }//We checked the error flag in getTable, so we don't need to check it again here | ||
|
|
||
| if (csc.isAutoEvolve()) { | ||
| // Check the last record's schema for new fields. | ||
| // Limitation: if a batch contains multiple schema versions, only the last one is checked. | ||
| // A full scan across all records can be implemented later if needed. | ||
| Record last = records.get(records.size() - 1); | ||
| Map<String, Schema> lastFields = new LinkedHashMap<>(); | ||
| if (last.getFields() != null) { | ||
| for (Field f : last.getFields()) { | ||
| lastFields.put(f.name(), f.schema()); | ||
| } | ||
| } | ||
| table = evolveTableSchema(table, lastFields); | ||
| } | ||
|
Comment on lines
+215
to
+227
|
||
|
|
||
| doInsertBatch(records, table, queryId); | ||
| } | ||
|
|
||
| private void doInsertBatch(List<Record> records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException { | ||
| Record first = records.get(0); | ||
| LOGGER.debug("Trying to insert [{}] records to table name [{}] (QueryId: [{}])", records.size(), table.getName(), queryId.getQueryId()); | ||
| switch (first.getSchemaType()) { | ||
| case SCHEMA: | ||
|
|
@@ -234,7 +258,8 @@ protected boolean validateDataSchema(Table table, Record record, boolean onlyFie | |
| Type type = col.getType(); | ||
| boolean isNullable = col.isNullable(); | ||
| boolean hasDefault = col.hasDefault(); | ||
| if (!isNullable && !hasDefault) { | ||
| // Variant has a native NULL discriminator (255) so it can accept missing values without Nullable or DEFAULT. | ||
| if (!isNullable && !hasDefault && type != Type.VARIANT) { | ||
| Map<String, Schema> schemaMap = record.getFields().stream().collect(Collectors.toMap(Field::name, Field::schema)); | ||
| var objSchema = schemaMap.get(colName); | ||
| Data obj = record.getJsonMap().get(colName); | ||
|
|
@@ -280,6 +305,9 @@ protected boolean validateDataSchema(Table table, Record record, boolean onlyFie | |
| if (colTypeName.equals("TUPLE") && dataTypeName.equals("STRUCT")) | ||
| continue; | ||
|
|
||
| if (colTypeName.equals("VARIANT") && dataTypeName.equals("STRUCT")) | ||
| continue; | ||
|
|
||
| if (INT_TYPES.contains(colTypeName)) { | ||
| continue; | ||
| } | ||
|
|
@@ -494,6 +522,9 @@ protected void doWriteColValue(Column col, OutputStream stream, Data value, bool | |
| mapTmp.forEach((key, mapValue) -> { | ||
| try { | ||
| doWritePrimitive(col.getMapKeyType(), value.getMapKeySchema().type(), stream, key, col); | ||
| if (col.getMapValueType() != null && col.getMapValueType().isNullable() && mapValue != null) { | ||
| BinaryStreamUtils.writeNonNull(stream); | ||
| } | ||
| doWriteColValue(col.getMapValueType(), stream, new Data(value.getNestedValueSchema(), mapValue), defaultsSupport); | ||
| } catch (IOException e) { | ||
| throw new RuntimeException(e); | ||
|
|
@@ -727,7 +758,7 @@ protected void doWritePrimitive(Type columnType, Schema.Type dataType, OutputStr | |
| } else if (unionData.getObject() instanceof byte[]) { | ||
| BinaryStreamUtils.writeString(stream, (byte[]) unionData.getObject()); | ||
| } else { | ||
| throw new DataException("Not implemented conversion from " + unionData.getObject().getClass() + " to String"); | ||
| BinaryStreamUtils.writeString(stream, unionData.getObject().toString().getBytes(StandardCharsets.UTF_8)); | ||
| } | ||
| break; | ||
| } | ||
|
|
@@ -786,6 +817,10 @@ protected void doWriteCol(Data value, boolean fieldExists, Column col, OutputStr | |
| return;//And we're done | ||
| } else if (colType == Type.ARRAY) {//If the column is an array | ||
| BinaryStreamUtils.writeNonNull(stream);//Then we send nonNull | ||
| } else if (colType == Type.VARIANT) { | ||
| BinaryStreamUtils.writeNonNull(stream); | ||
| BinaryStreamUtils.writeUnsignedInt8(stream, 255); | ||
| return; | ||
| } else { | ||
| throw new RuntimeException(String.format("An attempt to write null into not nullable column '%s'", name)); | ||
| } | ||
|
|
@@ -798,7 +833,10 @@ protected void doWriteCol(Data value, boolean fieldExists, Column col, OutputStr | |
| if (!col.isNullable() && value.getObject() == null) { | ||
| if (colType == Type.ARRAY) | ||
| BinaryStreamUtils.writeNonNull(stream); | ||
| else | ||
| else if (colType == Type.VARIANT) { | ||
| BinaryStreamUtils.writeUnsignedInt8(stream, 255); | ||
| return; | ||
| } else | ||
| throw new RuntimeException(String.format("An attempt to write null into not nullable column '%s'", name)); | ||
| } | ||
| } | ||
|
|
@@ -813,6 +851,12 @@ protected void doWriteCol(Data value, boolean fieldExists, Column col, OutputStr | |
| BinaryStreamUtils.writeNonNull(stream); | ||
| } | ||
| BinaryStreamUtils.writeNull(stream); | ||
| } else if (col.getType() == Type.VARIANT) { | ||
| // Variant has a native NULL discriminator (255) — no Nullable/DEFAULT needed. | ||
| if (defaultsSupport) { | ||
| BinaryStreamUtils.writeNonNull(stream); | ||
| } | ||
| BinaryStreamUtils.writeUnsignedInt8(stream, 255); | ||
| } else { | ||
| // no filled and not nullable | ||
| LOGGER.error("Column {} is not nullable and no value is provided", name); | ||
|
|
@@ -821,6 +865,61 @@ protected void doWriteCol(Data value, boolean fieldExists, Column col, OutputStr | |
| } | ||
| } | ||
|
|
||
| protected Table evolveTableSchema(Table table, Map<String, Schema> allFields) throws InterruptedException { | ||
| if (allFields.isEmpty()) { | ||
| throw new RuntimeException( | ||
| "auto.evolve requires a Connect schema (Avro, Protobuf, or JSON Schema). " + | ||
| "Schemaless or string records are not supported with auto.evolve=true."); | ||
| } | ||
|
|
||
| Set<String> missingColumns = table.getMissingColumns(allFields.keySet()); | ||
|
|
||
| if (missingColumns.isEmpty()) { | ||
| return table; | ||
| } | ||
|
|
||
| LOGGER.info("Detected {} new field(s) not present in table {}: {}", missingColumns.size(), table.getName(), missingColumns); | ||
|
|
||
| List<String> columnDefs = new ArrayList<>(); | ||
|
|
||
| for (String fieldName : missingColumns) { | ||
| Schema fieldSchema = allFields.get(fieldName); | ||
| if (fieldSchema == null) { | ||
| continue; | ||
| } | ||
|
|
||
| String chType = Column.connectTypeToClickHouseType(fieldSchema, csc.isAutoEvolveStructToJson()); | ||
| String defaultExpr = Column.defaultExpressionForType(chType); | ||
| columnDefs.add(String.format("%s %s%s", Utils.escapeName(fieldName), chType, defaultExpr)); | ||
| } | ||
|
|
||
| if (!columnDefs.isEmpty()) { | ||
| chc.alterTableAddColumns(table.getDatabase(), table.getCleanName(), columnDefs, csc.getClickhouseSettings()); | ||
| LOGGER.info("Schema evolution complete for table {}. Added columns: {}", table.getName(), columnDefs); | ||
| table = refreshTableAfterDDL(table, missingColumns); | ||
| } | ||
|
|
||
| return table; | ||
| } | ||
|
|
||
| private static final long DDL_REFRESH_BACKOFF_MS = 200; | ||
|
|
||
| private Table refreshTableAfterDDL(Table table, Set<String> expectedNewColumns) throws InterruptedException { | ||
| int maxRetries = csc.getAutoEvolveDdlRefreshRetries(); | ||
| for (int attempt = 0; attempt < maxRetries; attempt++) { | ||
| Table refreshed = urgentTableUpdate(table); | ||
| Set<String> stillMissing = refreshed.getMissingColumns(expectedNewColumns); | ||
| if (stillMissing.isEmpty()) { | ||
| return refreshed; | ||
| } | ||
| LOGGER.warn("DDL refresh attempt {}/{}: columns {} not yet visible, retrying in {}ms", | ||
| attempt + 1, maxRetries, stillMissing, DDL_REFRESH_BACKOFF_MS); | ||
| Thread.sleep(DDL_REFRESH_BACKOFF_MS); | ||
| } | ||
| throw new RetriableException(String.format( | ||
| "DDL propagation timeout: columns not visible after %d retries", maxRetries)); | ||
| } | ||
|
|
||
| protected void doInsertRawBinary(List<Record> records, Table table, QueryIdentifier queryId, boolean supportDefaults, boolean retry) throws IOException, ExecutionException, InterruptedException { | ||
| try { | ||
| if (chc.isUseClientV2()) { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how can we garante that the last recrod will have all fields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might need to check all records and look for the new fields since we don't support deletion we could obtain a superset of all new fields. Imagine schema changes from V1 -> V2 -> V3 -> V4 in one batch we should keep new fields from V2/3/4 and generate the DDL based on that like if it was only one migration from V1 to a V4 with all fields
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check guilleov@859e162 @mzitnik
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This implementation is overkill to iterte all records again.
I would have a dislaimer in docs about this situation or implemented some other way
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another way might be to let it fail when schema changes (schema should change often) and then retry and in that case we check all records to see what needs to be modified (Bad design to let a query fail just to retry I think but might work)
Other options are moving the detection of schema changes up to the ProxySinkTask.java where we iterate over all records already
What option do you think will be better for this new feature? @chernser @mzitnik
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is what I was thinking @guilleov, let's roll back to check only the last for now.
Add this as a limitation, and we will come back to it later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I reverted it and noted it both here and in the clickhouse-docs PR. Thanks @mzitnik
guilleov@4ad5f17