Feature/schema evolution#687
Conversation
|
ClickHouse docs PR: ClickHouse/clickhouse-docs#5824 |
|
Good day, @guilleov ! Thank you for the contribution! |
| } | ||
| return "Map(" + keyType + ", " + valType + ")"; | ||
| case STRUCT: | ||
| throw new RuntimeException( |
There was a problem hiding this comment.
here we can add one more flag to allow creating JSON for struct
Also we need to handle unions because it is common case.
There was a problem hiding this comment.
How we should handle unions?
There was a problem hiding this comment.
guilleov@a9586e5 dunno what you think of this way of handling it @chernser let me know
|
|
||
| private Table doInsertWithSchemaEvolution(List<Record> records, Table table, QueryIdentifier queryId) throws IOException, ExecutionException, InterruptedException { | ||
| // Split records into sub batches at schema boundaries (like JDBC BufferedRecords.add pattern) | ||
| // When schema changes mid batch the current sub batch is flushed, the table is evolved, and the insertion continues. |
There was a problem hiding this comment.
what problem does it solve?
There was a problem hiding this comment.
I was thinking it might fail if schema changed mid batch insert would fail because some records wouldn't have the correct schema but I think thats not the case right?
I did simplified this section on commit: guilleov@022f88e
|
@guilleov
|
|
Thank you for the update! I will review today. |
|
Didn't finish yet all changes from the comments @chernser still working on some of them. thanks for the comments! |
|
|
||
| if (csc.isAutoEvolve()) { | ||
| // New columns are Nullable, so older records without the new fields insert with NULL. | ||
| Record last = records.get(records.size() - 1); |
There was a problem hiding this comment.
how can we garante that the last recrod will have all fields?
There was a problem hiding this comment.
- If kafka topics are ordered last messages should have the newest schema.
- If ignorePartitionsWhenBatching=true that might not be the case.
- If someones evolves the schema to V2 but then another producer keeps sending V1 messages since are backward compatible could potentially have a V1 -> V2 -> V1 schema insert and also fail to evolve
Might need to check all records and look for the new fields since we don't support deletion we could obtain a superset of all new fields. Imagine schema changes from V1 -> V2 -> V3 -> V4 in one batch we should keep new fields from V2/3/4 and generate the DDL based on that like if it was only one migration from V1 to a V4 with all fields
There was a problem hiding this comment.
This implementation is overkill to iterte all records again.
I would have a dislaimer in docs about this situation or implemented some other way
There was a problem hiding this comment.
Another way might be to let it fail when schema changes (schema should change often) and then retry and in that case we check all records to see what needs to be modified (Bad design to let a query fail just to retry I think but might work)
Other options are moving the detection of schema changes up to the ProxySinkTask.java where we iterate over all records already
What option do you think will be better for this new feature? @chernser @mzitnik
There was a problem hiding this comment.
Yes, this is what I was thinking @guilleov, let's roll back to check only the last for now.
Add this as a limitation, and we will come back to it later
There was a problem hiding this comment.
Ok, I reverted it and noted it both here and in the clickhouse-docs PR. Thanks @mzitnik
| assertEquals(event.getTime2().atDate(LocalDate.of(1970, 1, 1)).format(localFormatter), row.get("time2")); | ||
| } | ||
| } | ||
|
|
There was a problem hiding this comment.
Can we expand our tests to insert 3 batches:
Batch 1 → Schema V1
Batch 2 → Schema V2
Batch 3 → Schema V3
Also, add one batch with 10 records:
Records 1–5 with Schema V1 (3 fields)
Records 6–7 with Schema V2 (8 fields)
Records 8–10 with Schema V3 (5 fields)
There was a problem hiding this comment.
Pull request overview
Implements auto.evolve schema evolution for the ClickHouse Kafka Connect sink connector, allowing the connector to automatically add new ClickHouse columns when upstream Kafka record schemas introduce new fields.
Changes:
- Added
auto.evolve/ DDL-refresh / struct-to-JSON configuration and wiring to triggerALTER TABLE ... ADD COLUMN IF NOT EXISTS. - Added Connect-schema → ClickHouse-type inference (including Confluent Avro/Protobuf union detection and Variant mapping) plus a new
SchemaTypeInferenceException. - Fixed RowBinary serialization for
Map(K, Nullable(V))values and added extensive unit/integration-style tests for schema evolution scenarios.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
| src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java | Adds schema evolution flow (detect missing columns, run ALTER, refresh mapping) and fixes Map(…, Nullable(…)) RowBinary marker writing. |
| src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java | Adds helper to issue multi-column ALTER TABLE ADD COLUMN IF NOT EXISTS with alter_sync=1. |
| src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java | Introduces Connect schema type inference, union detection, and Variant type resolution for auto-evolve DDL. |
| src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Table.java | Adds getMissingColumns(...) to compute schema diffs against incoming fields. |
| src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/SchemaTypeInferenceException.java | New exception type for unsupported schema-to-ClickHouse mappings during auto-evolve. |
| src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java | Adds and documents new connector configs: auto.evolve, auto.evolve.ddl.refresh.retries, auto.evolve.struct.to.json. |
| src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java | Adds end-to-end tests covering auto-evolve behavior across many schema evolution scenarios. |
| src/test/java/com/clickhouse/kafka/connect/sink/db/mapping/ColumnTest.java | Adds focused unit tests for union detection and union→Variant/String mapping. |
| src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/SchemaTestData.java | Adds schema-versioned test data builders used by the new auto-evolve tests. |
| CHANGELOG.md | Documents the new feature and the Map nullable serialization bugfix. |
| VERSION | Bumps version to v1.3.7. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| import java.util.Collections; | ||
| import java.util.List; | ||
|
|
There was a problem hiding this comment.
Unused import java.util.Collections will fail compilation (unused imports are errors in Java). Remove it or reference it in the new tests.
| import java.util.Collections; | |
| import java.util.List; |
| } | ||
|
|
||
| String chType = Column.connectTypeToClickHouseType(fieldSchema, csc.isAutoEvolveStructToJson()); | ||
| columnDefs.add(String.format("`%s` %s", fieldName, chType)); |
There was a problem hiding this comment.
fieldName is interpolated directly into backticked DDL. Elsewhere (e.g., Utils.escapeName) backticks are stripped to prevent malformed SQL. Please sanitize/escape column names here as well (at minimum remove backticks) before building columnDefs.
| columnDefs.add(String.format("`%s` %s", fieldName, chType)); | |
| String escapedFieldName = Utils.escapeName(fieldName); | |
| columnDefs.add(String.format("`%s` %s", escapedFieldName, chType)); |
| if (csc.isAutoEvolve()) { | ||
| // Since auto-evolve only adds Nullable columns (never deletes), the superset is ok. | ||
| Map<String, Schema> allFields = new LinkedHashMap<>(); | ||
| for (Record r : records) { | ||
| if (r.getFields() != null) { | ||
| for (Field f : r.getFields()) { | ||
| allFields.putIfAbsent(f.name(), f.schema()); | ||
| } | ||
| } | ||
| } | ||
| table = evolveTableSchema(table, allFields); | ||
| } |
There was a problem hiding this comment.
The comment says auto-evolve "only adds Nullable columns", but Column.connectTypeToClickHouseType explicitly returns non-nullable types for Array/Map/Variant (ClickHouse restriction). After evolving such a column, older records that don't contain the new field will hit the fieldExists=false path in doWriteCol(...) and currently throw (and for Map may NPE if the field exists-but-null). Consider adding DEFAULT expressions for newly added Array/Map/Variant columns (so RowBinaryWithDefaults can emit defaults), or explicitly treat missing fields for these types as empty values / Variant NULL discriminator during serialization.
…ing for ClickHouse integration
…ap for deduplication
|
any updates? something else left to do @chernser @mzitnik @kurnoolsaketh |
|
@guilleov i have approved can just fix the conflicts |
|
@mzitnik conflicts resolved! |
…ouse-kafka-connect into feature/schema-evolution
|
@guilleov is this passing for you localy? seems like tests are still failling |
|
@guilleov are sync with main there still errors in tests |
|
let me check @mzitnik |
After merging main, the new auto-evolve tests called createProps() and createClient(props) which don't exist on the test class. Replace with the existing helpers getBaseProps() and ClickHouseTestHelpers.createClient(props) to fix the compileTestJava failures.
The auto-evolve detection was simplified in c44c2f4 to inspect only the last record's schema in a batch. Update the four mixed-schema tests that still asserted the previous full-batch-scan behavior: - autoEvolveMixedSchemasTenRecordsInOneBatch: keep V3 last and assert only V3 columns are added; explicitly assert V2-only columns are NOT. - autoEvolveMultiVersionUnionSemantics: assert only V4's unique field is added; V2/V3 unique fields are NOT. - autoEvolveMixedBatchLastRecordOlderSchema: invert to document that no ALTER is issued when the last record is the older schema. - autoEvolveCrossPartitionSchemaDrift: invert similarly for the cross-partition case.
|
@mzitnik sorry some test were outdated per my last change of checking only last record (and also some missing method that is not available on main) Could you re-run the CI tests? |
|
I did run locally now and worked Tests summary: 265 tests, 262 succeeded, 0 failed, 3 skipped |
Implement
auto.evolveschema evolution (closes #277)Summary
Adds automatic table schema evolution to the ClickHouse Kafka Connect sink connector. When
auto.evolve=true, the connector detects new fields in incoming Kafka records and issuesALTER TABLE ... ADD COLUMN IF NOT EXISTSstatements against ClickHouse - no manual DDL required when upstream schemas change.This mirrors the auto.evolve feature in the Confluent JDBC Sink Connector, adapted for ClickHouse.
Resolves: #277 - Support automatic table schema evolution
Why this feature
When a Kafka topic's schema changes (e.g. a new field is added to an Avro/Protobuf/JSON Schema definition), the connector currently either silently drops the new field (
input_format_skip_unknown_fields=1) or fails. Users must manually issueALTER TABLEstatements to add columns before the connector can ingest the new fields. This is inconvenient and error-prone, especially in organizations with frequent domain model changes (original issue context).Design decisions
Idempotent DDL
Issue #277 raised a concern about race conditions when multiple connector tasks run concurrently. ClickHouse has
ADD COLUMN IF NOT EXISTS, which makes the DDL idempotent - if two tasks race to add the same column, both will succeed.Async DDL propagation on replicated tables
For replicated tables, ALTER queries add instructions to ZooKeeper/Keeper and are applied asynchronously on replicas. Two mitigations:
alter_sync=1on DDL statements - Waits for the local replica to apply the change before returning. This eliminates most staleness without the risk ofalter_sync=2(which blocks until ALL replicas apply and can hang if a replica is down).refreshTableAfterDDL()verifies the expected new columns are visible. If not (e.g., reading from a different replica), it retries up toauto.evolve.ddl.refresh.retriestimes (default 3) with 200ms backoff.Type mapping
Nullable(...)so older records missing the field get NULL.DEFAULTexpression (DEFAULT []for Array,DEFAULT map()for Map). This allowsRowBinaryWithDefaultsto emit the "use server default" marker for older records that lack the field.JSONcolumns viaauto.evolve.struct.to.json=true.Configuration
auto.evolvefalseauto.evolve.ddl.refresh.retries3auto.evolve.struct.to.jsonfalseBackward compatibility
auto.evolvedefaults tofalse- existing deployments are completely unaffectedTests
30 new auto-evolve tests in
ClickHouseSinkTaskWithSchemaTest.javacovering:Full test suite: 264 tests, 261 succeeded, 0 failed, 3 skipped
Checklist
./gradlew test- 0 failures)