-
Notifications
You must be signed in to change notification settings - Fork 470
docs: add docs for kafka sink auto evolve option #5824
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 1 commit
1df9a37
73f24d0
cfc7e6b
3711e7a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -122,6 +122,7 @@ | |
| | `bufferCount` (since v1.3.6) | Number of records to buffer in memory before flushing to ClickHouse. `0` disables internal buffering. Buffering is not supported with `exactlyOnce=true`. | `"0"` | | ||
| | `bufferFlushTime` (since v1.3.6) | Maximum time in milliseconds to buffer records before flush when `exactlyOnce=false`. `0` disables time-based flushing. Default value is `0`. Only required for time-base threshold. Only effective when `bufferCount > 0`. | `"0"` | | ||
| | `reportInsertedOffsets` (since v1.3.6) | Enables returning only successfully inserted offsets from `preCommit` (instead of `currentOffsets`) when `exactlyOnce=false`. This does not apply when `ignorePartitionsWhenBatching=true`, where `currentOffsets` are still returned. | `"false"` | | ||
| | `auto.evolve` (since v1.3.7) | Automatically add columns to the ClickHouse table when incoming records contain new fields not present in the table. See [Schema Evolution](#schema-evolution). | `"false"` | | ||
|
|
||
| ### Target tables {#target-tables} | ||
|
|
||
|
|
@@ -327,6 +328,57 @@ | |
| } | ||
| ``` | ||
|
|
||
| ### Schema Evolution {#schema-evolution} | ||
|
|
||
| The connector supports automatic table schema evolution when `auto.evolve=true`. When incoming Kafka records contain fields not present in the destination ClickHouse table, the connector automatically issues `ALTER TABLE ... ADD COLUMN IF NOT EXISTS` statements to add the missing columns. | ||
|
|
||
| This mirrors the [`auto.evolve` feature](https://docs.confluent.io/kafka-connectors/jdbc/current/sink-connector/sink_config_options.html#auto-evolution) in the Confluent JDBC Sink Connector. | ||
|
|
||
| #### How it works {#how-schema-evolution-works} | ||
|
|
||
| 1. For each batch of records, the connector compares the record schema against the table's column list. | ||
| 2. If new fields are detected, it maps the Kafka Connect types to ClickHouse types and issues DDL. | ||
| 3. If multiple schema versions appear in a single batch, the batch is split at schema boundaries - each sub-batch is flushed and the table is evolved before continuing. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is risky - what if we have a small batches by this split and will get too many parts as the result? |
||
|
|
||
| #### Type mapping for new columns {#schema-evolution-type-mapping} | ||
|
|
||
| When creating new columns, the connector maps Connect types to ClickHouse types as follows: | ||
|
|
||
| | Kafka Connect Type | ClickHouse Type | Notes | | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding non-nullable columns will back backward compatibility and only records with none -null fields will be inserted. Please make it clear if |
||
| |---|---|---| | ||
| | `org.apache.kafka.connect.data.Decimal` | `Decimal(38, S)` | Scale from schema parameters | | ||
| | `org.apache.kafka.connect.data.Date` | `Date32` | | | ||
| | `org.apache.kafka.connect.data.Time` | `Int64` | | | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. actually depends on version - latest version of ClickHouse support Time |
||
| | `org.apache.kafka.connect.data.Timestamp` | `DateTime64(3)` | | | ||
| | `INT8` .. `INT64` | `Int8` .. `Int64` | | | ||
| | `FLOAT32` / `FLOAT64` | `Float32` / `Float64` | | | ||
| | `BOOLEAN` | `Bool` | | | ||
| | `STRING` / `BYTES` | `String` | | | ||
| | `ARRAY` | `Array(<element_type>)` | Recursive | | ||
| | `MAP` | `Map(<key_type>, <value_type>)` | Recursive | | ||
| | `STRUCT` | Not supported | Throws an error | | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is supported by our connector and should not throw an error Besides |
||
|
|
||
| Optional (nullable) fields are wrapped in `Nullable(...)`, except for `ARRAY` and `MAP` types which [cannot be Nullable in ClickHouse](/sql-reference/data-types/nullable). Elements and values inside composite types can still be Nullable. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. optional may have some default value, but as stated before we have to create Nullable columns to not break insert of older records. |
||
|
|
||
| #### Guards {#schema-evolution-guards} | ||
|
|
||
| The connector rejects schema evolution in the following cases with a clear error message: | ||
|
|
||
| - **Non-nullable field without a default value** - ClickHouse requires new columns to be either `Nullable` or have a `DEFAULT`. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please describe how it should be configured:
|
||
| - **STRUCT fields** - Mapping Connect STRUCT to ClickHouse is non-trivial (could be Tuple, JSON, or Nested). Not supported for auto-evolution. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in most cases JSON is used - and it is easy to add as an option. |
||
| - **Schemaless or string records** - No Connect schema is available to derive ClickHouse types. Evolution is skipped with a warning. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should throw an error and configuration doc should state it clear that evolution is available with schema only. |
||
|
|
||
| #### Concurrent tasks {#schema-evolution-concurrent-tasks} | ||
|
|
||
| Schema evolution is safe to use with multiple connector tasks. `ADD COLUMN IF NOT EXISTS` is idempotent - if two tasks race to add the same column, both succeed silently. DDL statements are executed with [`alter_sync=1`](/sql-reference/statements/alter#synchronicity-of-alter-queries) to wait for the local replica to apply the change. A retry loop on `DESCRIBE TABLE` (5 retries, 200ms backoff) handles propagation to other replicas. | ||
|
|
||
| #### Limitations {#schema-evolution-limitations} | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should be on the top. As you might see I've already asked questions that are covered here. |
||
|
|
||
| - **Add-only** - Columns are only added, never removed or modified. This is the same behavior as the JDBC connector. Stale nullable columns accumulate harmlessly. | ||
| - **Schema required** - Evolution only works with schema-based data (Avro, Protobuf, JSON Schema). Schemaless JSON and string records are not evolved. | ||
| - **STRUCT not supported** - Individual STRUCT fields cannot be auto-evolved. The top-level value must be a STRUCT (i.e., a row), but nested STRUCT fields are rejected. | ||
| - **Schema Registry recommended** - For best results, use a [Schema Registry](https://docs.confluent.io/platform/current/schema-registry/index.html) with BACKWARD or FULL compatibility mode. This ensures new fields are always optional (nullable), which is the safest mode for auto-evolution. | ||
|
|
||
| ### Logging {#logging} | ||
|
|
||
| Logging is automatically provided by Kafka Connect Platform. | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please mention schema to be more clear what is evolving. May be even saying something like
schema.auto.column_creation- because in the future we would have column alteration and need to configure them separately.