Datalake schema registry context#30132
Conversation
fb6fc42 to
3661d1d
Compare
There was a problem hiding this comment.
Pull request overview
Adds per-topic Schema Registry context support for datalake/Iceberg translation, ensuring schema resolution and caching are isolated by SR context.
Changes:
- Introduces a new topic property
redpanda.schema.registry.context, including validation, alter-config handling, and config reporting. - Wires the datalake translator/coordinator to resolve schema IDs within the topic’s configured SR context and isolates in-memory caches by
(context, schema_id). - Adds unit + ducktape E2E tests covering non-default context resolution, strict no-fallback behavior, and cache isolation.
Reviewed changes
Copilot reviewed 31 out of 31 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| tools/offline_log_viewer/controller.py | Extends topic-properties decoding to include schema_registry_context for newer serde versions. |
| tests/rptest/tests/datalake/schema_registry_context_test.py | New ducktape E2E coverage for context isolation and DLQ behavior when resolving in the wrong context. |
| tests/rptest/clients/types.py | Adds TopicSpec constant for the new topic property name. |
| src/v/pandaproxy/schema_registry/types.h / types.cc | Adds validate_context() helper for context format validation. |
| src/v/kafka/server/handlers/topics/{types.h,types.cc,validators.h} | Declares the topic property and validates it on CreateTopics. |
| src/v/kafka/server/handlers/{alter_configs.cc,incremental_alter_configs.cc} | Supports altering the new property via (incremental) AlterConfigs. |
| src/v/kafka/server/handlers/configs/{config_utils.h,config_response_utils.cc,storage_mode_properties.h} | Adds validation and DescribeConfigs/reporting support for the new context type/property. |
| src/v/datalake/{schema_identifier.h,record_schema_resolver.h,record_schema_resolver.cc,datalake_manager.cc} | Makes schema/type caches context-aware and threads context through resolvers used by translators. |
| src/v/datalake/coordinator/coordinator.cc | Resolves identifiers using the topic’s current configured context at resolution time. |
| src/v/datalake/tests/{test_utils.cc,record_schema_resolver_test.cc} | Updates and adds unit tests for context-aware resolution and cache isolation. |
| src/v/cluster/{topic_properties.h,topic_properties.cc,types.h,types.cc,topic_table.cc} | Persists and propagates the new topic property through cluster topic configuration/update plumbing. |
| src/v/cluster/tests/topic_properties_generator.h | Generates randomized topic properties including non-default contexts for tests. |
| src/v/cluster_link/utils/topic_properties_utils.cc | Propagates the new property through cluster-link update parsing. |
| src/v/{kafka/server/BUILD,cluster/BUILD,cluster_link/utils/BUILD} | Adds build deps for schema registry types where needed. |
3661d1d to
d32b1bf
Compare
d32b1bf to
864f6f8
Compare
|
The latest Buf updates on your PR. Results from workflow Buf CI / validate (pull_request).
|
Retry command for Build#83041please wait until all jobs are finished before running the slash command |
d159f56 to
8c59496
Compare
8c59496 to
81b9cf9
Compare
c22e833 to
7cd9274
Compare
Retry command for Build#83764please wait until all jobs are finished before running the slash command |
nvartolomei
left a comment
There was a problem hiding this comment.
lgtm at high level - will take a break and review carefully
| #include "model/fundamental.h" | ||
| #include "model/metadata.h" | ||
| #include "model/namespace.h" | ||
| #include "pandaproxy/schema_registry/types.h" |
There was a problem hiding this comment.
that's a relatively unrelated dependency with quite a few additional transitive dependencies to pull in
not worth imho
There was a problem hiding this comment.
split validation into a separate file?
There was a problem hiding this comment.
also string in - optional error out/std::expected<void, string>?
also return type should be context_invalid and not subject_invalid - it does exist already
There was a problem hiding this comment.
the throwing variant can be built on top if SR needs it
There was a problem hiding this comment.
not duplicating sounds like nice idea but you are duplicating the rules anyway albeit in text only version
return fmt::format(
"redpanda.schema.registry.context `{}' is invalid: must start "
"with '.', must not contain ':', and must not be the reserved "
"'.__GLOBAL' context",
There was a problem hiding this comment.
Refactored as suggested.
| .error_code, | ||
| kafka::error_code::none); | ||
|
|
||
| // Changing schema_registry_context while translation is enabled must fail. |
There was a problem hiding this comment.
what happens when a user sets both during i.e. topic creation? do we sequence them correctly?
There was a problem hiding this comment.
also, by claude
alter_configs full-replace can silently strip the context while iceberg is enabled.
- src/v/kafka/server/handlers/alter_configs.cc:454-477 rejects an explicit set of redpanda.schema.registry.context while iceberg_mode != disabled. But alter_configs is full-replace: at
line 98, std::apply(apply_op(op_t::remove), update.properties.serde_fields()) initializes every property's op to remove. If the user sends an alter request that omits the
schema_registry_context key, the property is removed (reset to default), and no branch in the loop fires the iceberg-state check. The coordinator will then resolve schema ids against
the default context for in-flight entries committed under a non-default context — the exact poisoning scenario the author tries to prevent. Fix: after the loop, if
update.properties.schema_registry_context.op == remove AND it would actually change the value AND iceberg is currently enabled, return invalid_config. Or model with op_t::none default
(precedent at lines 125-127 for remote_*/iceberg_mode).
There was a problem hiding this comment.
what happens when a user sets both during i.e. topic creation? do we sequence them correctly?
Added test applying both to show it works.
There was a problem hiding this comment.
model with op_t::none default
(precedent at lines 125-127 for remote_*/iceberg_mode)
Done, with test.
| auto topic_md = topic_table_.get_topic_metadata_ref( | ||
| model::topic_namespace_view{model::kafka_namespace, topic}); | ||
| auto sr_ctx = topic_md ? topic_md->get() | ||
| .get_configuration() | ||
| .properties.schema_registry_context.value_or( | ||
| pandaproxy::schema_registry::default_context) | ||
| : pandaproxy::schema_registry::default_context; |
There was a problem hiding this comment.
Did you consider having the translators pass the context to the coordinator in the RPC? It's a bit surprising to me that that isn't the case, particularly because the RPC request contains other topic + schema information already. I guess we would expect them to always be the same...
There was a problem hiding this comment.
Yeah, but the coordinator already has access to the topic config so that it can read the context info. There's an invariant that context doesn't change while translation is active, so there can't be a mismatch between what the coordinator sees as the context and what the translator did.
| @@ -30,6 +30,24 @@ struct schema_identifier | |||
| bool operator==(const schema_identifier&) const = default; | |||
There was a problem hiding this comment.
Based on the commit message, there might be a misunderstanding about what is persisted -- at least, I'm under the impression that the schema_identifer is not persisted and that it's only serialized over RPC.
There was a problem hiding this comment.
Yeah that is confusing. "Persisted" there is referring to the wire compatibility implications of changing schema_indentifier... not that it's persisted :) . Will update the description.
| def _make_confluent_record(self, schema_id, schema_dict, record): | ||
| """Build a Confluent wire-format payload: magic byte + 4-byte | ||
| schema ID + Avro binary-encoded record.""" | ||
| parsed = avro.schema.parse(json.dumps(schema_dict)) | ||
| buf = io.BytesIO() | ||
| buf.write(struct.pack(">bI", 0, schema_id)) | ||
| encoder = avro.io.BinaryEncoder(buf) | ||
| writer = avro.io.DatumWriter(parsed) | ||
| writer.write(record, encoder) |
There was a problem hiding this comment.
This seems off to me, but I'm not an expert in confluent kafka python. Is this actually the correct way to write Avro with a schema in a given context? I would have expected this is all handled by the library. If this isn't supported by the library or something, please add a comment explaining why we need to create the bytes manually
There was a problem hiding this comment.
I'll have the clanker rewrite it to form the messages legit.
The incremental topic update reader stopped at reader_version=8,
silently dropping fields added in later serde versions:
v9: message_timestamp_before_max_ms, message_timestamp_after_max_ms
(added in 98fc4e2)
v10: remote_label, storage_mode
(added in 03678e1)
Bump the reader to version=10 and decode the missing fields.
Add a new topic property `redpanda.schema.registry.context` that binds
a topic to a specific Schema Registry context for schema id resolution.
This lets the in-broker Iceberg translator (and future schema id
validation) look up schemas in the correct SR context namespace.
The property is stored as `std::optional<context>`; nullopt means the
default SR context ("."). Validation rejects values that don't start
with '.', contain ':', or match the reserved '.__GLOBAL' context name.
Validation logic lives in a shared `validate_context()` helper in
pandaproxy/schema_registry/types.h.
Pure plumbing: the property is visible and settable via create-topic,
alter-configs, incremental-alter-configs, and describe-configs, but has
no runtime effect yet (wired to the datalake resolver in the next
commit). Also plumbed through cluster-link property propagation and the
offline log viewer.
Wire the new `schema_registry_context` topic property into the datalake translator's schema resolution path. Both `record_schema_resolver` and `latest_subject_schema_resolver` now accept a context parameter and use it instead of the hardcoded `default_context` when calling the Schema Registry. Extend the shared schema and resolved-type caches with context-aware keys (`context_schema_cache_key` and `context_schema_identifier`) so that topics bound to different SR contexts on the same shard don't poison each other's cache entries. A topic's context can't be changed while translation is enabled. This prevents races in translation and commit.
Add a ducktape integration test verifying the full end-to-end path for the `redpanda.schema.registry.context` topic property: SR schema registration in contexts, topic property configuration, translator schema resolution, and typed Iceberg columns. test_context_isolation: two topics bound to different SR contexts resolve different schemas from the same numeric schema ID, producing different Iceberg table column layouts. test_wrong_context_dlq: schema ID not present in the configured context sends records to the dead-letter-queue table.
7cd9274 to
4e64c71
Compare
Add per-topic Schema Registry context support for datalake/Iceberg translation.
Design note
The SR context is not persisted alongside the schema_identifier in the coordinator STM. The coordinator reads the context from the topic's current configuration at resolution time. To safely change a topic's context mid-stream: disable translation, let the coordinator commit pending entries, change the context, then re-enable.
Backports Required
Release Notes
Features