diff --git a/src/v/cluster/BUILD b/src/v/cluster/BUILD index 4bb7aebbf1d96..5e40dba111966 100644 --- a/src/v/cluster/BUILD +++ b/src/v/cluster/BUILD @@ -238,6 +238,7 @@ redpanda_cc_library( "//src/v/cloud_storage:remote_label", "//src/v/model", "//src/v/pandaproxy/schema_registry:subject_name_strategy", + "//src/v/pandaproxy/schema_registry:types", "//src/v/reflection:adl", "//src/v/serde", "//src/v/serde:chrono", @@ -777,6 +778,7 @@ redpanda_cc_library( "//src/v/container:contiguous_range_map", "//src/v/model", "//src/v/pandaproxy/schema_registry:subject_name_strategy", + "//src/v/pandaproxy/schema_registry:types", "//src/v/raft", "//src/v/security", "//src/v/security:license", @@ -2037,6 +2039,7 @@ redpanda_cc_library( "//src/v/base", "//src/v/config", "//src/v/metrics", + "//src/v/pandaproxy/schema_registry:types", "//src/v/storage", "//src/v/utils:uuid", "@seastar", @@ -2447,6 +2450,7 @@ redpanda_cc_library( "//src/v/net:types", "//src/v/pandaproxy/schema_registry:config", "//src/v/pandaproxy/schema_registry:subject_name_strategy", + "//src/v/pandaproxy/schema_registry:types", "//src/v/raft", "//src/v/random:generators", "//src/v/random:time_jitter", diff --git a/src/v/cluster/tests/topic_properties_generator.h b/src/v/cluster/tests/topic_properties_generator.h index 6cf2bb17a8011..b9925a0f875d1 100644 --- a/src/v/cluster/tests/topic_properties_generator.h +++ b/src/v/cluster/tests/topic_properties_generator.h @@ -11,6 +11,7 @@ #include "base/units.h" #include "cluster/types.h" #include "model/tests/randoms.h" +#include "pandaproxy/schema_registry/types.h" #include "random/generators.h" #include "test_utils/randoms.h" @@ -79,5 +80,12 @@ inline cluster::topic_properties random_topic_properties() { model::redpanda_storage_mode::tiered, model::redpanda_storage_mode::cloud}); + // Mix of default context and randomly generated non-default contexts. + if (tests::random_bool()) { + properties.schema_registry_context + = pandaproxy::schema_registry::context{ + "." + random_generators::gen_alphanum_string(8)}; + } + return properties; } diff --git a/src/v/cluster/topic_properties.cc b/src/v/cluster/topic_properties.cc index 6a8630b100ecb..7ce60fa827734 100644 --- a/src/v/cluster/topic_properties.cc +++ b/src/v/cluster/topic_properties.cc @@ -27,6 +27,7 @@ fmt::iterator topic_properties::format_to(fmt::iterator it) const { "remote_topic_allow_gaps: {}, " "batch_max_bytes: {}, retention_local_target_bytes: {}, " "retention_local_target_ms: {}, remote_delete: {}, segment_ms: {}, " + "schema_registry_context: {}, " "record_key_schema_id_validation: {}, " "record_key_schema_id_validation_compat: {}, " "record_key_subject_name_strategy: {}, " @@ -73,6 +74,7 @@ fmt::iterator topic_properties::format_to(fmt::iterator it) const { retention_local_target_ms, remote_delete, segment_ms, + schema_registry_context, record_key_schema_id_validation, record_key_schema_id_validation_compat, record_key_subject_name_strategy, @@ -165,7 +167,8 @@ bool topic_properties::has_overrides() const { || remote_topic_allow_gaps.has_value() || message_timestamp_before_max_ms.has_value() || message_timestamp_after_max_ms.has_value() - || storage_mode != storage::ntp_config::default_storage_mode; + || storage_mode != storage::ntp_config::default_storage_mode + || schema_registry_context.has_value(); return overrides; } diff --git a/src/v/cluster/topic_properties.h b/src/v/cluster/topic_properties.h index b36eb38dd8658..ba615f0fe1dec 100644 --- a/src/v/cluster/topic_properties.h +++ b/src/v/cluster/topic_properties.h @@ -17,6 +17,7 @@ #include "model/metadata.h" #include "model/timestamp.h" #include "pandaproxy/schema_registry/subject_name_strategy.h" +#include "pandaproxy/schema_registry/types.h" #include "reflection/adl.h" #include "serde/rw/chrono.h" #include "serde/rw/envelope.h" @@ -34,7 +35,7 @@ namespace cluster { */ struct topic_properties : serde:: - envelope, serde::compat_version<0>> { + envelope, serde::compat_version<0>> { topic_properties() noexcept = default; topic_properties( std::optional compression, @@ -176,6 +177,16 @@ struct topic_properties tristate segment_ms{std::nullopt}; + // Schema Registry context — the namespace within which schema ids and + // subjects resolve for this topic. Contexts are a Schema Registry + // namespacing mechanism: schema ids are unique within a context but not + // across them. Consumed by the in-broker Iceberg translator (today) and + // intended to also cover record_{key,value}_schema_id_validation below + // in the future, since a given schema id on a given record can only + // resolve to one schema. std::nullopt means the SR default context ("."); + // has_overrides/describe treat nullopt as unset. + std::optional schema_registry_context; + std::optional record_key_schema_id_validation; std::optional record_key_schema_id_validation_compat; std::optional @@ -317,7 +328,8 @@ struct topic_properties max_compaction_lag_ms, message_timestamp_before_max_ms, message_timestamp_after_max_ms, - storage_mode); + storage_mode, + schema_registry_context); } friend bool diff --git a/src/v/cluster/topic_table.cc b/src/v/cluster/topic_table.cc index 4472efff1280e..c0d31299a82cb 100644 --- a/src/v/cluster/topic_table.cc +++ b/src/v/cluster/topic_table.cc @@ -23,6 +23,7 @@ #include "container/chunked_hash_map.h" #include "model/fundamental.h" #include "model/metadata.h" +#include "pandaproxy/schema_registry/types.h" #include "storage/ntp_config.h" #include @@ -1231,6 +1232,9 @@ topic_properties topic_table::update_topic_properties( updated_properties.storage_mode, overrides.storage_mode, storage::ntp_config::default_storage_mode); + incremental_update( + updated_properties.schema_registry_context, + overrides.schema_registry_context); return updated_properties; } diff --git a/src/v/cluster/types.cc b/src/v/cluster/types.cc index fc783bbe39d12..6fe3f3767dcd3 100644 --- a/src/v/cluster/types.cc +++ b/src/v/cluster/types.cc @@ -325,6 +325,7 @@ fmt::iterator incremental_topic_updates::format_to(fmt::iterator it) const { "segment_size: {} retention_bytes: {} retention_duration: {} " "shadow_indexing: {}, batch_max_bytes: {}, retention_local_target_bytes: " "{}, retention_local_target_ms: {}, remote_delete: {}, segment_ms: {}, " + "schema_registry_context: {}, " "record_key_schema_id_validation: {}" "record_key_schema_id_validation_compat: {}" "record_key_subject_name_strategy: {}" @@ -355,6 +356,7 @@ fmt::iterator incremental_topic_updates::format_to(fmt::iterator it) const { retention_local_target_ms, remote_delete, segment_ms, + schema_registry_context, record_key_schema_id_validation, record_key_schema_id_validation_compat, record_key_subject_name_strategy, diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index 1883ed99ce5e1..e3d0f5d82f3e6 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -35,6 +35,7 @@ #include "model/timeout_clock.h" #include "model/transform.h" #include "pandaproxy/schema_registry/subject_name_strategy.h" +#include "pandaproxy/schema_registry/types.h" #include "raft/errc.h" #include "raft/fwd.h" #include "raft/transfer_leadership.h" @@ -591,7 +592,7 @@ struct property_update> struct incremental_topic_updates : serde::envelope< incremental_topic_updates, - serde::version<10>, + serde::version<11>, serde::compat_version<0>> { static constexpr int8_t version_with_data_policy = -1; static constexpr int8_t version_with_shadow_indexing = -3; @@ -683,6 +684,9 @@ struct incremental_topic_updates message_timestamp_after_max_ms; property_update> storage_mode; + property_update> + schema_registry_context; + // Not a regular topic property. Used to assign topic UUIDs to pre-25-2 // topics that were created without one. property_update> topic_id; @@ -745,7 +749,8 @@ struct incremental_topic_updates message_timestamp_before_max_ms, message_timestamp_after_max_ms, remote_label, - storage_mode); + storage_mode, + schema_registry_context); } fmt::iterator format_to(fmt::iterator it) const; diff --git a/src/v/cluster_link/utils/BUILD b/src/v/cluster_link/utils/BUILD index 87ff172aaadf8..ca28278c7f1da 100644 --- a/src/v/cluster_link/utils/BUILD +++ b/src/v/cluster_link/utils/BUILD @@ -13,6 +13,7 @@ redpanda_cc_library( "//src/v/base", "//src/v/cluster:types", "//src/v/kafka/server:topic_config_utils", + "//src/v/pandaproxy/schema_registry:types", "//src/v/reflection:type_traits", ], ) diff --git a/src/v/cluster_link/utils/topic_properties_utils.cc b/src/v/cluster_link/utils/topic_properties_utils.cc index d6f1c9baa75c2..43d8733922607 100644 --- a/src/v/cluster_link/utils/topic_properties_utils.cc +++ b/src/v/cluster_link/utils/topic_properties_utils.cc @@ -13,6 +13,7 @@ #include "base/type_traits.h" #include "kafka/server/handlers/configs/config_utils.h" +#include "pandaproxy/schema_registry/types.h" #include "reflection/type_traits.h" namespace { @@ -289,6 +290,21 @@ bool maybe_append_update( topic_config.properties.iceberg_target_lag_ms, kafka::iceberg_target_lag_ms_validator); } + if (config_name == kafka::topic_property_schema_registry_context) { + // context is named_type, not ss::sstring, so we need an + // explicit parse function. + kafka::parse_and_set_property( + topic_config.tp_ns, + update.properties.schema_registry_context, + config_value, + kafka::config_resource_operation::set, + kafka::schema_registry_context_validator{}, + [](const ss::sstring& s) { + return pandaproxy::schema_registry::context{s}; + }); + return update.properties.schema_registry_context.value + != topic_config.properties.schema_registry_context; + } if (config_name == kafka::topic_property_min_cleanable_dirty_ratio) { return parse_and_set( diff --git a/src/v/datalake/coordinator/coordinator.cc b/src/v/datalake/coordinator/coordinator.cc index 3d04a32c2ed5d..cfdea30572267 100644 --- a/src/v/datalake/coordinator/coordinator.cc +++ b/src/v/datalake/coordinator/coordinator.cc @@ -17,6 +17,7 @@ #include "datalake/coordinator/state_update.h" #include "datalake/logger.h" #include "datalake/partition_spec_parser.h" +#include "datalake/record_schema_resolver.h" #include "datalake/record_translator.h" #include "datalake/table_id_provider.h" #include "model/fundamental.h" @@ -401,8 +402,10 @@ coordinator::do_ensure_table_exists( struct coordinator::main_table_schema_provider : public coordinator::table_schema_provider { - explicit main_table_schema_provider(coordinator& parent) - : parent(parent) {} + main_table_schema_provider( + coordinator& parent, pandaproxy::schema_registry::context ctx) + : parent(parent) + , ctx_(std::move(ctx)) {} iceberg::table_identifier get_table_id(const model::topic& topic) const final { @@ -414,7 +417,7 @@ struct coordinator::main_table_schema_provider std::optional val_type; if (comps.val_identifier) { auto type_res = co_await parent.type_resolver_.resolve_identifier( - comps.val_identifier.value()); + comps.val_identifier.value(), ctx_); if (type_res.has_error()) { co_return errc::failed; } @@ -432,6 +435,7 @@ struct coordinator::main_table_schema_provider } const coordinator& parent; + pandaproxy::schema_registry::context ctx_; }; ss::future> @@ -447,12 +451,20 @@ coordinator::sync_ensure_table_exists( if (waiter_fut.has_value()) { co_return co_await std::move(*waiter_fut); } + auto topic_md = topic_table_.get_topic_metadata_ref( + model::topic_namespace_view{model::kafka_namespace, topic}); + auto sr_ctx = topic_md ? topic_md->get() + .get_configuration() + .properties.schema_registry_context.value_or( + pandaproxy::schema_registry::default_context) + : pandaproxy::schema_registry::default_context; + auto res_fut = co_await ss::coroutine::as_future(do_ensure_table_exists( topic, topic_revision, std::move(comps), "sync_ensure_table_exists", - main_table_schema_provider{*this})); + main_table_schema_provider{*this, std::move(sr_ctx)})); if (res_fut.failed()) { // NOTE: we don't expect any exceptions given we're using result types, diff --git a/src/v/datalake/datalake_manager.cc b/src/v/datalake/datalake_manager.cc index d081be86d9e75..4daad3bd56e47 100644 --- a/src/v/datalake/datalake_manager.cc +++ b/src/v/datalake/datalake_manager.cc @@ -39,6 +39,7 @@ namespace { static std::unique_ptr make_type_resolver( const model::iceberg_mode& mode, model::topic_view topic_name, + pandaproxy::schema_registry::context sr_context, schema::registry& sr, schema_cache& cache, resolved_type_cache& type_cache) { @@ -50,7 +51,8 @@ static std::unique_ptr make_type_resolver( case model::iceberg_mode::variant::key_value: return std::make_unique(); case model::iceberg_mode::variant::value_schema_id_prefix: - return std::make_unique(sr, cache, type_cache); + return std::make_unique( + sr, cache, type_cache, std::move(sr_context)); case model::iceberg_mode::variant::value_schema_latest: auto subject = pandaproxy::schema_registry::subject( fmt::format("{}-value", topic_name)); @@ -59,6 +61,7 @@ static std::unique_ptr make_type_resolver( } return std::make_unique( sr, + std::move(sr_context), subject, mode.protobuf_full_name(), config::shard_local_cfg().iceberg_latest_schema_cache_ttl_ms.bind(), @@ -633,6 +636,8 @@ ss::future<> datalake_manager::handle_translator_state_change( auto type_resolver = make_type_resolver( mode, ntp.tp.topic, + partition->topic_cfg->properties.schema_registry_context.value_or( + pandaproxy::schema_registry::default_context), *_schema_registry, *_schema_cache, *_resolved_type_cache); diff --git a/src/v/datalake/record_schema_resolver.cc b/src/v/datalake/record_schema_resolver.cc index 656bcbf9fd7ec..c4f44f5131808 100644 --- a/src/v/datalake/record_schema_resolver.cc +++ b/src/v/datalake/record_schema_resolver.cc @@ -184,21 +184,23 @@ struct from_identifier_visitor { ss::future> get_schema( schema::registry* sr, std::optional> cache, + const ppsr::context& context, ppsr::schema_id id) { if (!sr->is_enabled()) { vlog(datalake_log.warn, "Schema registry is not enabled"); // TODO: should we treat this as transient? co_return type_resolver::errc::translation_error; } + context_schema_cache_key cache_key{.context = context, .schema_id = id}; if (cache.has_value()) { - auto cached_schema = cache->get().get_value(id); + auto cached_schema = cache->get().get_value(cache_key); if (cached_schema) { co_return std::move(*cached_schema); } } auto schema_fut = co_await ss::coroutine::as_future( - sr->get_valid_schema({ppsr::default_context, id})); + sr->get_valid_schema({context, id})); if (schema_fut.failed()) { auto ex = schema_fut.get_exception(); vlog(datalake_log.warn, "Error getting schema from registry: {}", ex); @@ -206,22 +208,29 @@ ss::future> get_schema( } auto resolved_schema = std::move(schema_fut.get()); if (!resolved_schema.has_value()) { - vlog(datalake_log.trace, "Schema ID {} not in registry", id); + vlog( + datalake_log.trace, + "Schema ID {} not in registry under context {}", + id, + context); co_return type_resolver::errc::bad_input; } auto shared_schema = ss::make_shared(std::move(resolved_schema.value())); if (cache.has_value()) { - cache->get().try_insert(id, shared_schema); + cache->get().try_insert(cache_key, shared_schema); } co_return std::move(shared_schema); } checked get_resolved_type( + const ppsr::context& context, schema_identifier&& ident, shared_schema_t&& schema, std::optional> cache) { + auto key = context_schema_identifier{ + .context = context, .identifier = std::move(ident)}; if (cache.has_value()) { - auto cached_val = cache->get().get_value(ident); + auto cached_val = cache->get().get_value(key); if (cached_val) { return *cached_val; } @@ -229,7 +238,7 @@ checked get_resolved_type( auto* schema_ptr = schema.get(); auto resolve_res = schema_ptr->visit( - from_identifier_visitor{std::move(ident), std::move(schema)}); + from_identifier_visitor{std::move(key.identifier), std::move(schema)}); if (resolve_res.has_error()) { return resolve_res.error(); } @@ -237,7 +246,10 @@ checked get_resolved_type( auto shared_val = ss::make_shared( std::move(resolve_res.value())); if (cache.has_value()) { - cache->get().try_insert(shared_val->id, shared_val); + cache->get().try_insert( + context_schema_identifier{ + .context = context, .identifier = shared_val->id}, + shared_val); } return shared_val; @@ -250,14 +262,14 @@ struct datalake_cache_traits; template<> struct datalake_cache_traits< - pandaproxy::schema_registry::schema_id, + context_schema_cache_key, pandaproxy::schema_registry::valid_schema> { static constexpr const char* metrics_group_name = "datalake:schema_cache"; static constexpr const char* item_label = "a schema"; }; template<> -struct datalake_cache_traits { +struct datalake_cache_traits { static constexpr const char* metrics_group_name = "datalake:resolved_type_cache"; static constexpr const char* item_label = "an Iceberg type"; @@ -325,9 +337,9 @@ void chunked_datalake_cache::setup_metrics() { } template class chunked_datalake_cache< - pandaproxy::schema_registry::schema_id, + context_schema_cache_key, pandaproxy::schema_registry::valid_schema>; -template class chunked_datalake_cache; +template class chunked_datalake_cache; resolved_schema::resolved_schema(ss::shared_ptr ir) : shared_schema_(std::move(ir)) @@ -347,7 +359,8 @@ binary_type_resolver::resolve_buf_type(std::optional b) const { } ss::future> -binary_type_resolver::resolve_identifier(schema_identifier) const { +binary_type_resolver::resolve_identifier( + schema_identifier, ppsr::context) const { // method is not expected to be called, as this resolver always returns // nullopt type. co_return type_resolver::errc::translation_error; @@ -362,11 +375,13 @@ test_binary_type_resolver::resolve_buf_type(std::optional b) const { } ss::future> -test_binary_type_resolver::resolve_identifier(schema_identifier id) const { +test_binary_type_resolver::resolve_identifier( + schema_identifier id, ppsr::context ctx) const { if (injected_error_.has_value()) { co_return *injected_error_; } - co_return co_await binary_type_resolver::resolve_identifier(std::move(id)); + co_return co_await binary_type_resolver::resolve_identifier( + std::move(id), std::move(ctx)); } ss::future> @@ -388,7 +403,7 @@ record_schema_resolver::resolve_buf_type(std::optional b) const { auto schema_id = schema_id_res.schema_id; auto buf_no_id = std::move(schema_id_res.shared_message_data); - auto schema_res = co_await get_schema(&sr_, cache_, schema_id); + auto schema_res = co_await get_schema(&sr_, cache_, context_, schema_id); if (schema_res.has_error()) { co_return schema_res.error(); } @@ -428,7 +443,10 @@ record_schema_resolver::resolve_buf_type(std::optional b) const { auto [ident, parsable_buf] = std::move(ident_res.value()); auto resolve_res = get_resolved_type( - std::move(ident), std::move(schema_res.value()), resolved_type_cache_); + context_, + std::move(ident), + std::move(schema_res.value()), + resolved_type_cache_); if (resolve_res.has_error()) { co_return resolve_res.error(); } @@ -440,24 +458,30 @@ record_schema_resolver::resolve_buf_type(std::optional b) const { } ss::future> -record_schema_resolver::resolve_identifier(schema_identifier ident) const { - auto schema_res = co_await get_schema(&sr_, cache_, ident.schema_id); +record_schema_resolver::resolve_identifier( + schema_identifier ident, ppsr::context ctx) const { + auto schema_res = co_await get_schema(&sr_, cache_, ctx, ident.schema_id); if (schema_res.has_error()) { co_return schema_res.error(); } co_return get_resolved_type( - std::move(ident), std::move(schema_res.value()), resolved_type_cache_); + ctx, + std::move(ident), + std::move(schema_res.value()), + resolved_type_cache_); } latest_subject_schema_resolver::latest_subject_schema_resolver( schema::registry& sr, + ppsr::context context, ppsr::subject subject, std::optional protobuf_message_name, config::binding cache_duration, std::optional> sc, std::optional> rc) : sr_(&sr) + , context_(std::move(context)) , subject_(std::move(subject)) , protobuf_message_name_(std::move(protobuf_message_name)) , cache_ttl_(std::move(cache_duration)) @@ -527,7 +551,7 @@ latest_subject_schema_resolver::resolve_buf_type(std::optional b) const { auto latest_schema_fut = co_await ss::coroutine::as_future( sr_->get_subject_schema( - {ppsr::default_context, subject_}, /*subject_version=*/std::nullopt)); + {context_, subject_}, /*subject_version=*/std::nullopt)); if (latest_schema_fut.failed()) { auto ex = latest_schema_fut.get_exception(); vlog( @@ -540,7 +564,8 @@ latest_subject_schema_resolver::resolve_buf_type(std::optional b) const { co_return type_resolver::errc::registry_error; } auto latest_schema = std::move(latest_schema_fut.get()); - auto schema_res = co_await get_schema(sr_, cache_, latest_schema.id); + auto schema_res = co_await get_schema( + sr_, cache_, context_, latest_schema.id); if (schema_res.has_error()) { schema_lookup_cache_ = schema_lookup_cache( schema_res.error(), last_sync_time); @@ -580,6 +605,7 @@ latest_subject_schema_resolver::resolve_buf_type(std::optional b) const { } auto resolve_res = get_resolved_type( + context_, std::move(schema_id_res.value()), std::move(schema_res.value()), resolved_type_cache_); @@ -606,14 +632,17 @@ latest_subject_schema_resolver::resolve_buf_type(std::optional b) const { ss::future> latest_subject_schema_resolver::resolve_identifier( - schema_identifier ident) const { - auto schema_res = co_await get_schema(sr_, cache_, ident.schema_id); + schema_identifier ident, ppsr::context ctx) const { + auto schema_res = co_await get_schema(sr_, cache_, ctx, ident.schema_id); if (schema_res.has_error()) { co_return schema_res.error(); } co_return get_resolved_type( - std::move(ident), std::move(schema_res.value()), resolved_type_cache_); + ctx, + std::move(ident), + std::move(schema_res.value()), + resolved_type_cache_); } } // namespace datalake diff --git a/src/v/datalake/record_schema_resolver.h b/src/v/datalake/record_schema_resolver.h index ce2a3c74b96f9..0caf0b4a9747a 100644 --- a/src/v/datalake/record_schema_resolver.h +++ b/src/v/datalake/record_schema_resolver.h @@ -77,10 +77,10 @@ class chunked_datalake_cache : public datalake_cache { }; using schema_cache = datalake_cache< - pandaproxy::schema_registry::schema_id, + context_schema_cache_key, pandaproxy::schema_registry::valid_schema>; using chunked_schema_cache = chunked_datalake_cache< - pandaproxy::schema_registry::schema_id, + context_schema_cache_key, pandaproxy::schema_registry::valid_schema>; using shared_schema_t @@ -130,9 +130,12 @@ using shared_resolved_type_t = ss::shared_ptr; // Note that the resolved type cache needs to be keyed by the // `schema_identifier` which can only be fully determined once the schema is // known. Hence the resolved type can't be stored inline to the schema cache. -using resolved_type_cache = datalake_cache; +// The key is further qualified by the schema registry context to keep +// resolutions from different contexts isolated. +using resolved_type_cache + = datalake_cache; using chunked_resolved_type_cache - = chunked_datalake_cache; + = chunked_datalake_cache; struct type_and_buf { std::optional type; @@ -168,11 +171,21 @@ class type_resolver { return fmt::format_to(out, "type_resolver::errc::invalid_config"); } } + // Used by the per-partition record translator. Implementations use a + // context baked in at construction time; the translator creates one + // resolver per partition with the topic's SR context. virtual ss::future> resolve_buf_type(std::optional b) const = 0; - // TODO(iceberg): This should be it's own interface. + + // TODO(iceberg): resolve_identifier belongs on its own interface. + // Used by the coordinator, which holds one shared resolver for all topics + // and therefore cannot bake a context in at construction — it passes one + // per call. These two methods are used by disjoint callers and the context + // asymmetry is intentional. If that changes, both should take an explicit + // context parameter and the stored context_ field can be dropped. virtual ss::future> - resolve_identifier(schema_identifier) const = 0; + resolve_identifier( + schema_identifier, pandaproxy::schema_registry::context) const = 0; virtual ~type_resolver() = default; }; @@ -183,8 +196,8 @@ class binary_type_resolver : public type_resolver { ss::future> resolve_buf_type(std::optional b) const override; - ss::future> - resolve_identifier(schema_identifier) const override; + ss::future> resolve_identifier( + schema_identifier, pandaproxy::schema_registry::context) const override; ~binary_type_resolver() override = default; }; @@ -193,8 +206,8 @@ class test_binary_type_resolver : public binary_type_resolver { ss::future> resolve_buf_type(std::optional b) const override; - ss::future> - resolve_identifier(schema_identifier) const override; + ss::future> resolve_identifier( + schema_identifier, pandaproxy::schema_registry::context) const override; ~test_binary_type_resolver() override = default; void set_fail_requests(type_resolver::errc e) { injected_error_ = e; } @@ -210,16 +223,19 @@ class record_schema_resolver : public type_resolver { schema::registry& sr, std::optional> sc = std::nullopt, std::optional> rc - = std::nullopt) + = std::nullopt, + pandaproxy::schema_registry::context ctx + = pandaproxy::schema_registry::default_context) : sr_(sr) , cache_(sc) - , resolved_type_cache_(rc) {} + , resolved_type_cache_(rc) + , context_(std::move(ctx)) {} ss::future> resolve_buf_type(std::optional b) const override; - ss::future> - resolve_identifier(schema_identifier) const override; + ss::future> resolve_identifier( + schema_identifier, pandaproxy::schema_registry::context) const override; ~record_schema_resolver() override = default; private: @@ -227,6 +243,7 @@ class record_schema_resolver : public type_resolver { std::optional> cache_; std::optional> resolved_type_cache_; + pandaproxy::schema_registry::context context_; }; // latest_subject_schema_resolver is a schema resolver that uses the latest @@ -239,6 +256,7 @@ class latest_subject_schema_resolver : public type_resolver { public: latest_subject_schema_resolver( schema::registry& sr, + pandaproxy::schema_registry::context context, pandaproxy::schema_registry::subject subject, std::optional protobuf_message_name, config::binding cache_ttl, @@ -256,11 +274,12 @@ class latest_subject_schema_resolver : public type_resolver { ss::future> resolve_buf_type(std::optional b) const override; - ss::future> - resolve_identifier(schema_identifier) const override; + ss::future> resolve_identifier( + schema_identifier, pandaproxy::schema_registry::context) const override; private: schema::registry* sr_; + pandaproxy::schema_registry::context context_; pandaproxy::schema_registry::subject subject_; std::optional protobuf_message_name_; config::binding cache_ttl_; diff --git a/src/v/datalake/schema_identifier.h b/src/v/datalake/schema_identifier.h index 20ba70ef409b2..7a1676e41accb 100644 --- a/src/v/datalake/schema_identifier.h +++ b/src/v/datalake/schema_identifier.h @@ -30,6 +30,24 @@ struct schema_identifier bool operator==(const schema_identifier&) const = default; }; +// In-memory cache key combining a schema registry context with a schema id. +// Used to keep per-shard schema caches correct when multiple topics look up +// schemas in different contexts (schema ids are unique within a context, not +// across). +struct context_schema_cache_key { + pandaproxy::schema_registry::context context; + pandaproxy::schema_registry::schema_id schema_id; + bool operator==(const context_schema_cache_key&) const = default; +}; + +// In-memory cache key wrapping a schema_identifier with its schema registry +// context. Not persisted: only used for the resolved type cache. +struct context_schema_identifier { + pandaproxy::schema_registry::context context; + schema_identifier identifier; + bool operator==(const context_schema_identifier&) const = default; +}; + // The components required to build the Iceberg schema of a record. struct record_schema_components : serde::envelope< @@ -75,4 +93,25 @@ struct hash { return h; } }; +template<> +struct hash { + size_t operator()(const datalake::context_schema_cache_key& k) const { + namespace ppsr = pandaproxy::schema_registry; + size_t h = 0; + boost::hash_combine(h, hash()(k.context)); + boost::hash_combine(h, hash()(k.schema_id)); + return h; + } +}; +template<> +struct hash { + size_t operator()(const datalake::context_schema_identifier& k) const { + namespace ppsr = pandaproxy::schema_registry; + size_t h = 0; + boost::hash_combine(h, hash()(k.context)); + boost::hash_combine( + h, hash()(k.identifier)); + return h; + } +}; } // namespace std diff --git a/src/v/datalake/tests/record_schema_resolver_test.cc b/src/v/datalake/tests/record_schema_resolver_test.cc index 56ea44b858bad..de90bcff6c11e 100644 --- a/src/v/datalake/tests/record_schema_resolver_test.cc +++ b/src/v/datalake/tests/record_schema_resolver_test.cc @@ -415,6 +415,7 @@ TEST_F(RecordSchemaResolverTest, TestLatestSubjectSchema_Protobuf) { auto resolver = latest_subject_schema_resolver( *sr, + default_context, subject("latest-proto"), std::nullopt, config::mock_binding(std::chrono::milliseconds(0s)), @@ -441,6 +442,7 @@ TEST_F(RecordSchemaResolverTest, TestLatestSubjectSchema_Protobuf_MessageName) { auto resolver = latest_subject_schema_resolver( *sr, + default_context, subject("latest-proto"), "datalake.proto.nested_message.inner_message_t1", config::mock_binding(std::chrono::milliseconds(0s)), @@ -479,6 +481,7 @@ TEST_F(RecordSchemaResolverTest, TestLatestSubjectSchema_Avro) { auto resolver = latest_subject_schema_resolver( *sr, + default_context, subject("latest-avro"), std::nullopt, config::mock_binding(std::chrono::milliseconds(0s)), @@ -510,6 +513,7 @@ TEST_F(RecordSchemaResolverTest, TestLatestSubjectSchema_Json) { auto resolver = latest_subject_schema_resolver( *sr, + default_context, subject("latest-json"), std::nullopt, config::mock_binding(std::chrono::milliseconds(0s)), @@ -859,7 +863,9 @@ TEST(CachedRecordSchemaResolverTest, TestSchemaCacheEviction) { // Try to get schema from cache. auto cached_schema_opt = schema_cache.get_value( - pandaproxy::schema_registry::schema_id{2}); + datalake::context_schema_cache_key{ + .context = default_context, + .schema_id = pandaproxy::schema_registry::schema_id{2}}); EXPECT_TRUE(cached_schema_opt); auto schema_buf = cached_schema_opt->get()->raw()().copy(); @@ -874,8 +880,10 @@ TEST(CachedRecordSchemaResolverTest, TestSchemaCacheEviction) { resolve_buffer_fn(true, 5, {0}, schema_3_expected_type); // Ensure schema 2 was evicted. - EXPECT_FALSE( - schema_cache.get_value(pandaproxy::schema_registry::schema_id{2})); + EXPECT_FALSE(schema_cache.get_value( + datalake::context_schema_cache_key{ + .context = default_context, + .schema_id = pandaproxy::schema_registry::schema_id{2}})); // Ensure that the shared pointer to the evicted schema is still valid. EXPECT_EQ(cached_schema_opt->get()->raw()(), schema_buf); @@ -884,3 +892,210 @@ TEST(CachedRecordSchemaResolverTest, TestSchemaCacheEviction) { // Ensure that schema 2 is once again in the cache. resolve_buffer_fn(false, 2, {2, 0}, schema_2_expected_type); } + +namespace { + +// A distinct Avro schema so the fake registry assigns it a new id rather +// than reusing one of the schemas created in the default context. +constexpr std::string_view avro_prod_record_schema = R"({ + "type": "record", + "name": "ProdRecord", + "fields" : [ + {"name": "prod_value", "type": "long"} + ] +})"; + +const context prod_context{".prod"}; + +} // namespace + +TEST(ContextAwareResolverTest, TestResolveSchemaInNonDefaultContext) { + auto sr = std::make_unique(); + auto ctx_schema_id + = sr->create_schema( + subject_schema{ + context_subject{prod_context, subject{"foo-value"}}, + schema_definition{avro_prod_record_schema, schema_type::avro}}) + .get(); + ASSERT_EQ(1, ctx_schema_id.id()); + ASSERT_EQ(prod_context, ctx_schema_id.ctx); + + iobuf buf; + buf.append("\0\0\0\0\1", 5); + buf.append(generate_dummy_body()); + + auto resolver = record_schema_resolver( + *sr, std::nullopt, std::nullopt, prod_context); + auto res = resolver.resolve_buf_type(buf.copy()).get(); + ASSERT_FALSE(res.has_error()); + auto& resolved_buf = res.value(); + ASSERT_TRUE(resolved_buf.type.has_value()); + EXPECT_EQ(1, (*resolved_buf.type)->id.schema_id()); +} + +TEST(ContextAwareResolverTest, TestSchemaNotInConfiguredContextIsStrict) { + // A schema registered only in the default context should not be visible + // to a resolver bound to a different context: no cross-context fallback. + auto sr = std::make_unique(); + auto default_id = sr->create_schema( + subject_schema{ + context_subject::unqualified("foo-value"), + schema_definition{ + avro_record_schema, schema_type::avro}}) + .get(); + ASSERT_EQ(1, default_id.id()); + + iobuf buf; + buf.append("\0\0\0\0\1", 5); + buf.append(generate_dummy_body()); + + auto resolver = record_schema_resolver( + *sr, std::nullopt, std::nullopt, prod_context); + auto res = resolver.resolve_buf_type(buf.copy()).get(); + ASSERT_TRUE(res.has_error()); + EXPECT_EQ(res.error(), type_resolver::errc::bad_input); +} + +TEST(ContextAwareResolverTest, TestLatestSubjectSchemaInNonDefaultContext) { + using namespace std::chrono_literals; + auto sr = std::make_unique(); + sr->create_schema( + subject_schema{ + context_subject{prod_context, subject{"prod-subject-value"}}, + schema_definition{avro_prod_record_schema, schema_type::avro}}) + .get(); + + iobuf buf; + buf.append(generate_dummy_body()); + + auto resolver = latest_subject_schema_resolver( + *sr, + prod_context, + subject("prod-subject-value"), + std::nullopt, + config::mock_binding(std::chrono::milliseconds(0s)), + std::nullopt, + std::nullopt); + auto res = resolver.resolve_buf_type(buf.copy()).get(); + ASSERT_FALSE(res.has_error()); + auto& resolved_buf = res.value(); + ASSERT_TRUE(resolved_buf.type.has_value()); + EXPECT_EQ(1, (*resolved_buf.type)->id.schema_id()); +} + +TEST(ContextAwareResolverTest, TestLatestSubjectSchemaStrictContext) { + // The configured subject exists in the default context but not in + // ".prod", so a ".prod"-bound latest_subject resolver must not fall back. + using namespace std::chrono_literals; + auto sr = std::make_unique(); + sr->create_schema( + subject_schema{ + context_subject::unqualified("subject-value"), + schema_definition{avro_record_schema, schema_type::avro}}) + .get(); + iobuf buf; + buf.append(generate_dummy_body()); + + auto resolver = latest_subject_schema_resolver( + *sr, + prod_context, + subject("subject-value"), + std::nullopt, + config::mock_binding(std::chrono::milliseconds(0s)), + std::nullopt, + std::nullopt); + auto res = resolver.resolve_buf_type(buf.copy()).get(); + ASSERT_TRUE(res.has_error()); + EXPECT_EQ(res.error(), type_resolver::errc::registry_error); +} + +TEST(ContextAwareResolverTest, TestSchemaCacheIsolationAcrossContexts) { + // Share a single schema cache between two resolvers bound to different + // contexts. Verify the cache isolates entries by (context, schema_id) + // so topics in different contexts cannot poison each other's lookups. + auto sr = std::make_unique(); + // Default-context schema → id=1. + auto default_id = sr->create_schema( + subject_schema{ + context_subject::unqualified("foo-value"), + schema_definition{ + avro_record_schema, schema_type::avro}}) + .get(); + ASSERT_EQ(1, default_id.id()); + ASSERT_EQ(default_context, default_id.ctx); + // ".prod" schema with different text → id=2. + auto prod_id = sr->create_schema( + subject_schema{ + context_subject{prod_context, subject{"bar-value"}}, + schema_definition{ + avro_prod_record_schema, schema_type::avro}}) + .get(); + ASSERT_EQ(2, prod_id.id()); + ASSERT_EQ(prod_context, prod_id.ctx); + + auto schema_cache = make_schema_cache(); + auto default_resolver = record_schema_resolver( + *sr, schema_cache, std::nullopt, default_context); + auto prod_resolver = record_schema_resolver( + *sr, schema_cache, std::nullopt, prod_context); + + // default_resolver successfully resolves id=1 from the default context. + { + iobuf buf; + buf.append("\0\0\0\0\1", 5); + buf.append(generate_dummy_body()); + auto res = default_resolver.resolve_buf_type(buf.copy()).get(); + ASSERT_FALSE(res.has_error()); + EXPECT_EQ(1, (*res.value().type)->id.schema_id()); + } + // prod_resolver successfully resolves id=2 from the ".prod" context. + { + iobuf buf; + buf.append("\0\0\0\0\2", 5); + buf.append(generate_dummy_body()); + auto res = prod_resolver.resolve_buf_type(buf.copy()).get(); + ASSERT_FALSE(res.has_error()); + EXPECT_EQ(2, (*res.value().type)->id.schema_id()); + } + + // Both entries are cached under their context-qualified keys. + EXPECT_TRUE(schema_cache.get_value( + datalake::context_schema_cache_key{ + .context = default_context, + .schema_id = pandaproxy::schema_registry::schema_id{1}})); + EXPECT_TRUE(schema_cache.get_value( + datalake::context_schema_cache_key{ + .context = prod_context, + .schema_id = pandaproxy::schema_registry::schema_id{2}})); + // The "mirrored" keys in the other context must be absent: otherwise + // a cross-context poisoning could let one topic observe the other's + // schema. + EXPECT_FALSE(schema_cache.get_value( + datalake::context_schema_cache_key{ + .context = default_context, + .schema_id = pandaproxy::schema_registry::schema_id{2}})); + EXPECT_FALSE(schema_cache.get_value( + datalake::context_schema_cache_key{ + .context = prod_context, + .schema_id = pandaproxy::schema_registry::schema_id{1}})); + + // A prod-bound resolver asking for id=1 must fail: id=1 lives only in + // the default context, and there is no fallback. + { + iobuf buf; + buf.append("\0\0\0\0\1", 5); + buf.append(generate_dummy_body()); + auto res = prod_resolver.resolve_buf_type(buf.copy()).get(); + ASSERT_TRUE(res.has_error()); + EXPECT_EQ(res.error(), type_resolver::errc::bad_input); + } + // Likewise, the default resolver must not see the ".prod" schema. + { + iobuf buf; + buf.append("\0\0\0\0\2", 5); + buf.append(generate_dummy_body()); + auto res = default_resolver.resolve_buf_type(buf.copy()).get(); + ASSERT_TRUE(res.has_error()); + EXPECT_EQ(res.error(), type_resolver::errc::bad_input); + } +} diff --git a/src/v/datalake/tests/test_utils.cc b/src/v/datalake/tests/test_utils.cc index 8c9f95ce58723..2edbce47de006 100644 --- a/src/v/datalake/tests/test_utils.cc +++ b/src/v/datalake/tests/test_utils.cc @@ -44,7 +44,8 @@ direct_table_creator::ensure_table( std::optional val_type; if (comps.val_identifier) { auto type_res = co_await type_resolver_.resolve_identifier( - comps.val_identifier.value()); + comps.val_identifier.value(), + pandaproxy::schema_registry::default_context); if (type_res.has_error()) { co_return errc::failed; } diff --git a/src/v/kafka/server/BUILD b/src/v/kafka/server/BUILD index 04ce1e723a2a1..c12eb1afd3e4a 100644 --- a/src/v/kafka/server/BUILD +++ b/src/v/kafka/server/BUILD @@ -36,6 +36,7 @@ redpanda_cc_library( "handlers/configs/config_response_utils.h", "handlers/configs/config_utils.h", "handlers/configs/storage_mode_properties.h", + "handlers/topics/sr_context_validator.h", "handlers/topics/topic_utils.h", "handlers/topics/types.h", "handlers/topics/validators.h", @@ -66,6 +67,7 @@ redpanda_cc_library( "//src/v/model", "//src/v/pandaproxy/schema_registry:config", "//src/v/pandaproxy/schema_registry:subject_name_strategy", + "//src/v/pandaproxy/schema_registry:types", "//src/v/security", "//src/v/serde:chrono", "//src/v/strings:string_switch", diff --git a/src/v/kafka/server/handlers/alter_configs.cc b/src/v/kafka/server/handlers/alter_configs.cc index 97057576fb530..a119216cdca45 100644 --- a/src/v/kafka/server/handlers/alter_configs.cc +++ b/src/v/kafka/server/handlers/alter_configs.cc @@ -26,6 +26,7 @@ #include "kafka/server/response.h" #include "model/fundamental.h" #include "model/metadata.h" +#include "pandaproxy/schema_registry/types.h" #include "strings/string_switch.h" #include @@ -98,7 +99,7 @@ create_topic_properties_update( std::apply(apply_op(op_t::none), update.custom_properties.serde_fields()); static_assert( - std::tuple_size_v == 44, + std::tuple_size_v == 45, "If you add a property, decide on its default alter config " "policy, and handle the update in the loop below"); static_assert( @@ -142,6 +143,7 @@ create_topic_properties_update( update.properties.delete_retention_ms.op = op_t::none; update.properties.storage_mode.op = op_t::none; + update.properties.schema_registry_context.op = op_t::none; // Now that the defaults are set, continue to set properties from the // request @@ -450,6 +452,34 @@ create_topic_properties_update( }); continue; } + if (cfg.name == topic_property_schema_registry_context) { + if ( + topic_cfg + && topic_cfg->properties.iceberg_mode + != model::iceberg_mode::disabled) { + return make_error_alter_config_resource_response< + alter_configs_resource_response>( + resource, + error_code::invalid_config, + "Cannot change redpanda.schema.registry.context while " + "Iceberg translation is enabled; set " + "redpanda.iceberg.mode=disabled first"); + } + // Empty string counts as unset. + const auto ctx_op = (!cfg.value || cfg.value->empty()) + ? kafka::config_resource_operation::remove + : kafka::config_resource_operation::set; + parse_and_set_property( + tp_ns, + update.properties.schema_registry_context, + cfg.value, + ctx_op, + schema_registry_context_validator{}, + [](const ss::sstring& s) { + return pandaproxy::schema_registry::context{s}; + }); + continue; + } if (cfg.name == topic_property_min_cleanable_dirty_ratio) { parse_and_set_tristate( diff --git a/src/v/kafka/server/handlers/configs/config_response_utils.cc b/src/v/kafka/server/handlers/configs/config_response_utils.cc index 0f77cb45051e0..972fc77a01e05 100644 --- a/src/v/kafka/server/handlers/configs/config_response_utils.cc +++ b/src/v/kafka/server/handlers/configs/config_response_utils.cc @@ -18,6 +18,7 @@ #include "config/node_config.h" #include "kafka/server/handlers/topics/types.h" #include "model/metadata.h" +#include "pandaproxy/schema_registry/types.h" #include @@ -181,6 +182,7 @@ consteval describe_configs_type property_config_type() { std::is_same_v || std::is_same_v || std::is_same_v || + std::is_same_v || std::is_same_v || std::is_same_v || std::is_same_v || std::is_same_v || @@ -1108,6 +1110,22 @@ config_response_container_t make_topic_configs( describe_as_string); } + add_topic_config_if_requested( + config_keys, + result, + topic_property_schema_registry_context, + pandaproxy::schema_registry::default_context, + topic_property_schema_registry_context, + topic_properties.schema_registry_context, + include_synonyms, + maybe_make_documentation( + include_documentation, + "Schema Registry context used to look up schemas referenced by " + "records in this topic (e.g. by the in-broker Iceberg translator). " + "Defaults to the Schema Registry default context ('.')."), + &describe_as_string, + /*hide_default_override=*/true); + add_topic_config_if_requested( config_keys, result, diff --git a/src/v/kafka/server/handlers/configs/config_utils.h b/src/v/kafka/server/handlers/configs/config_utils.h index 1b5a0e7fa96c7..53501130a8cdb 100644 --- a/src/v/kafka/server/handlers/configs/config_utils.h +++ b/src/v/kafka/server/handlers/configs/config_utils.h @@ -20,12 +20,14 @@ #include "datalake/partition_spec_parser.h" #include "kafka/protocol/errors.h" #include "kafka/protocol/fwd.h" +#include "kafka/server/handlers/topics/sr_context_validator.h" #include "kafka/server/handlers/topics/types.h" #include "model/fundamental.h" #include "model/metadata.h" #include "model/namespace.h" #include "pandaproxy/schema_registry/schema_id_validation.h" #include "pandaproxy/schema_registry/subject_name_strategy.h" +#include "pandaproxy/schema_registry/types.h" #include "security/acl.h" #include "serde/rw/chrono.h" @@ -307,6 +309,18 @@ struct iceberg_partition_spec_validator { } }; +struct schema_registry_context_validator { + std::optional operator()( + model::topic_namespace_view /*tns*/, + const ss::sstring& raw, + const std::optional& value) { + if (!value) { + return std::nullopt; + } + return validate_sr_context(raw); + } +}; + struct min_cleanable_dirty_ratio_validator { std::optional operator()(const ss::sstring&, const tristate& value) { diff --git a/src/v/kafka/server/handlers/configs/storage_mode_properties.h b/src/v/kafka/server/handlers/configs/storage_mode_properties.h index b0033d7806977..0880a41ba2e49 100644 --- a/src/v/kafka/server/handlers/configs/storage_mode_properties.h +++ b/src/v/kafka/server/handlers/configs/storage_mode_properties.h @@ -90,6 +90,7 @@ inline constexpr auto storage_mode_properties storage_mode_mask::all}, {topic_property_record_value_subject_name_strategy_compat, storage_mode_mask::all}, + {topic_property_schema_registry_context, storage_mode_mask::all}, // Properties valid for local, tiered, and tiered_cloud (NOT cloud). // In tiered_cloud mode data goes through raft and local storage, diff --git a/src/v/kafka/server/handlers/create_topics.cc b/src/v/kafka/server/handlers/create_topics.cc index d2e80b8084659..173576c689782 100644 --- a/src/v/kafka/server/handlers/create_topics.cc +++ b/src/v/kafka/server/handlers/create_topics.cc @@ -18,6 +18,7 @@ #include "kafka/protocol/timeout.h" #include "kafka/protocol/types.h" #include "kafka/server/connection_context.h" +#include "kafka/server/handlers/topics/sr_context_validator.h" #include "kafka/server/handlers/topics/topic_utils.h" #include "kafka/server/handlers/topics/types.h" #include "kafka/server/handlers/topics/validators.h" @@ -78,6 +79,7 @@ bool is_supported(std::string_view name) { topic_property_iceberg_partition_spec, topic_property_iceberg_invalid_record_action, topic_property_iceberg_target_lag_ms, + topic_property_schema_registry_context, topic_property_min_cleanable_dirty_ratio, topic_property_min_compaction_lag_ms, topic_property_max_compaction_lag_ms, @@ -118,6 +120,7 @@ using validators = make_validator_types< iceberg_config_validator, iceberg_invalid_record_action_validator, iceberg_target_lag_ms_validator, + schema_registry_context_create_validator, min_max_compaction_lag_ms_validator, storage_mode_config_validator>; diff --git a/src/v/kafka/server/handlers/incremental_alter_configs.cc b/src/v/kafka/server/handlers/incremental_alter_configs.cc index 62f4e1a66e3ff..a9b7e47cc9a2c 100644 --- a/src/v/kafka/server/handlers/incremental_alter_configs.cc +++ b/src/v/kafka/server/handlers/incremental_alter_configs.cc @@ -24,6 +24,7 @@ #include "kafka/server/response.h" #include "model/fundamental.h" #include "model/metadata.h" +#include "pandaproxy/schema_registry/types.h" #include "storage/ntp_config.h" #include "strings/string_switch.h" @@ -430,6 +431,35 @@ create_topic_properties_update( iceberg_target_lag_ms_validator); continue; } + if (cfg.name == topic_property_schema_registry_context) { + if ( + topic_cfg + && topic_cfg->properties.iceberg_mode + != model::iceberg_mode::disabled) { + return make_error_alter_config_resource_response< + resp_resource_t>( + resource, + error_code::invalid_config, + "Cannot change redpanda.schema.registry.context while " + "Iceberg translation is enabled; set " + "redpanda.iceberg.mode=disabled first"); + } + // Empty string counts as unset. + const auto ctx_op = (op == kafka::config_resource_operation::set + && cfg.value.value_or("").empty()) + ? kafka::config_resource_operation::remove + : op; + parse_and_set_property( + tp_ns, + update.properties.schema_registry_context, + cfg.value, + ctx_op, + schema_registry_context_validator{}, + [](const ss::sstring& s) { + return pandaproxy::schema_registry::context{s}; + }); + continue; + } if (cfg.name == topic_property_min_cleanable_dirty_ratio) { parse_and_set_tristate( diff --git a/src/v/kafka/server/handlers/topics/sr_context_validator.h b/src/v/kafka/server/handlers/topics/sr_context_validator.h new file mode 100644 index 0000000000000..8864ffdf07215 --- /dev/null +++ b/src/v/kafka/server/handlers/topics/sr_context_validator.h @@ -0,0 +1,63 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once +#include "features/feature_table.h" +#include "kafka/protocol/errors.h" +#include "kafka/server/handlers/topics/types.h" + +#include + +#include +#include + +namespace kafka { + +/// Returns an error message if the SR context string is invalid, nullopt if +/// valid. A valid context must start with '.', must not contain ':', and must +/// not be the reserved '.__GLOBAL' context. +inline std::optional validate_sr_context(std::string_view v) { + if ( + !v.starts_with('.') || v.find(':') != std::string_view::npos + || v == ".__GLOBAL") { + return fmt::format( + "redpanda.schema.registry.context `{}' is invalid: must start " + "with '.', must not contain ':', and must not be the reserved " + "'.__GLOBAL' context", + v); + } + return std::nullopt; +} + +struct schema_registry_context_create_validator { + static constexpr const char* error_message + = "Invalid redpanda.schema.registry.context: must start with '.', must " + "not contain ':', and must not be the reserved '.__GLOBAL' context."; + + static constexpr error_code ec = error_code::invalid_config; + + static bool is_valid(const creatable_topic& c, features::feature_table*) { + auto it = std::ranges::find( + c.configs, + topic_property_schema_registry_context, + &createable_topic_config::name); + if (it == c.configs.end() || !it->value.has_value()) { + return true; + } + const auto& v = it->value.value(); + if (v.empty()) { + return true; + } + return !validate_sr_context(v).has_value(); + } +}; + +} // namespace kafka diff --git a/src/v/kafka/server/handlers/topics/types.cc b/src/v/kafka/server/handlers/topics/types.cc index b5edaa0a8e766..ec7960efde772 100644 --- a/src/v/kafka/server/handlers/topics/types.cc +++ b/src/v/kafka/server/handlers/topics/types.cc @@ -18,6 +18,7 @@ #include "model/metadata.h" #include "model/namespace.h" #include "model/timestamp.h" +#include "pandaproxy/schema_registry/types.h" #include "strings/string_switch.h" #include "utils/tristate.h" @@ -317,6 +318,14 @@ cluster::topic_configuration to_topic_config( = get_duration_value( config_entries, topic_property_iceberg_target_lag_ms); + if ( + auto s = get_string_value( + config_entries, topic_property_schema_registry_context); + s.has_value() && !s->empty()) { + cfg.properties.schema_registry_context + = pandaproxy::schema_registry::context{std::move(*s)}; + } + cfg.properties.min_cleanable_dirty_ratio = get_tristate_value( config_entries, topic_property_min_cleanable_dirty_ratio); diff --git a/src/v/kafka/server/handlers/topics/types.h b/src/v/kafka/server/handlers/topics/types.h index aaf67d040962b..b877753d410db 100644 --- a/src/v/kafka/server/handlers/topics/types.h +++ b/src/v/kafka/server/handlers/topics/types.h @@ -56,6 +56,13 @@ inline constexpr std::string_view topic_property_write_caching inline constexpr std::string_view topic_property_flush_ms = "flush.ms"; inline constexpr std::string_view topic_property_flush_bytes = "flush.bytes"; +// Server side schema registry context. Binds a topic to a specific Schema +// Registry context; schema IDs are unique within a context but not across +// contexts, so any per-topic resolution of schema ids (Iceberg translator +// today, schema id validation in the future) must look schemas up here. +inline constexpr std::string_view topic_property_schema_registry_context + = "redpanda.schema.registry.context"; + // Server side schema id validation inline constexpr std::string_view topic_property_record_key_schema_id_validation = "redpanda.key.schema.id.validation"; diff --git a/src/v/kafka/server/tests/alter_config_test.cc b/src/v/kafka/server/tests/alter_config_test.cc index 16745b6add402..7d963d9cc81bb 100644 --- a/src/v/kafka/server/tests/alter_config_test.cc +++ b/src/v/kafka/server/tests/alter_config_test.cc @@ -750,6 +750,7 @@ FIXTURE_TEST( "flush.bytes", "redpanda.iceberg.mode", "redpanda.leaders.preference", + "redpanda.schema.registry.context", "delete.retention.ms", "min.cleanable.dirty.ratio", "redpanda.remote.allowgaps", @@ -1718,3 +1719,122 @@ FIXTURE_TEST(test_tristate_handling_alter_config, alter_config_test_fixture) { test_tp, "min.cleanable.dirty.ratio", "-1", describe_resp); } } + +FIXTURE_TEST( + test_schema_registry_context_locked_while_translation_enabled, + alter_config_test_fixture) { + scoped_config config; + config.get("iceberg_enabled").set_value(true); + + model::topic tp{"test-sr-ctx"}; + BOOST_REQUIRE_EQUAL( + create_topic(tp, {{"redpanda.iceberg.mode", "value_schema_id_prefix"}}) + .data.topics[0] + .error_code, + kafka::error_code::none); + + // Changing schema_registry_context while translation is enabled must fail. + { + absl::flat_hash_map props; + props.emplace("redpanda.schema.registry.context", ".mycontext"); + auto resp = alter_configs( + make_alter_topic_config_resource_cv(tp, props)); + BOOST_REQUIRE_EQUAL(resp.data.responses.size(), 1); + BOOST_REQUIRE_EQUAL( + resp.data.responses[0].error_code, kafka::error_code::invalid_config); + } + { + absl::flat_hash_map< + ss::sstring, + std:: + pair, kafka::config_resource_operation>> + props; + props.emplace( + "redpanda.schema.registry.context", + std::make_pair(".mycontext", kafka::config_resource_operation::set)); + auto resp = incremental_alter_configs( + make_incremental_alter_topic_config_resource_cv(tp, props)); + BOOST_REQUIRE_EQUAL(resp.data.responses.size(), 1); + BOOST_REQUIRE_EQUAL( + resp.data.responses[0].error_code, kafka::error_code::invalid_config); + } + + // Disable translation, then changing the context must succeed. + { + absl::flat_hash_map props; + props.emplace("redpanda.iceberg.mode", "disabled"); + BOOST_REQUIRE_EQUAL( + alter_configs(make_alter_topic_config_resource_cv(tp, props)) + .data.responses[0] + .error_code, + kafka::error_code::none); + } + { + absl::flat_hash_map props; + props.emplace("redpanda.schema.registry.context", ".mycontext"); + auto resp = alter_configs( + make_alter_topic_config_resource_cv(tp, props)); + BOOST_REQUIRE_EQUAL(resp.data.responses.size(), 1); + BOOST_REQUIRE_EQUAL( + resp.data.responses[0].error_code, kafka::error_code::none); + } +} + +FIXTURE_TEST( + test_schema_registry_context_and_iceberg_mode_at_creation, + alter_config_test_fixture) { + scoped_config config; + config.get("iceberg_enabled").set_value(true); + + // Setting both iceberg_mode and schema_registry_context in a single + // CreateTopics request is valid: the two validators are independent and + // both properties are applied atomically. + model::topic tp{"test-sr-ctx-create"}; + BOOST_REQUIRE_EQUAL( + create_topic( + tp, + {{"redpanda.iceberg.mode", "value_schema_id_prefix"}, + {"redpanda.schema.registry.context", ".mycontext"}}) + .data.topics[0] + .error_code, + kafka::error_code::none); + + auto describe_resp = describe_configs(tp); + assert_property_value( + tp, "redpanda.iceberg.mode", "value_schema_id_prefix", describe_resp); + assert_property_value( + tp, "redpanda.schema.registry.context", ".mycontext", describe_resp); +} + +FIXTURE_TEST( + test_schema_registry_context_sticky_in_alter_configs, + alter_config_test_fixture) { + scoped_config config; + config.get("iceberg_enabled").set_value(true); + + model::topic tp{"test-sr-ctx-sticky"}; + BOOST_REQUIRE_EQUAL( + create_topic( + tp, + {{"redpanda.iceberg.mode", "disabled"}, + {"redpanda.schema.registry.context", ".mycontext"}}) + .data.topics[0] + .error_code, + kafka::error_code::none); + + // AlterConfigs is full-replace, but schema_registry_context is sticky: + // omitting it must not silently reset it to default. + { + absl::flat_hash_map props; + props.emplace("retention.ms", "12345"); + BOOST_REQUIRE_EQUAL( + alter_configs(make_alter_topic_config_resource_cv(tp, props)) + .data.responses[0] + .error_code, + kafka::error_code::none); + } + + auto describe_resp = describe_configs(tp); + assert_property_value( + tp, "redpanda.schema.registry.context", ".mycontext", describe_resp); +} diff --git a/src/v/pandaproxy/schema_registry/types.cc b/src/v/pandaproxy/schema_registry/types.cc index 2b51a8a3f0b81..3ff59c8c9b43a 100644 --- a/src/v/pandaproxy/schema_registry/types.cc +++ b/src/v/pandaproxy/schema_registry/types.cc @@ -149,4 +149,18 @@ void validate_context_subject( } } +void validate_context(const context& ctx) { + static constexpr std::string_view global_context_name = ".__GLOBAL"; + const auto& v = ctx(); + if (v.empty() || v.front() != '.') { + throw as_exception(subject_invalid(v)); + } + if (v.contains(':')) { + throw as_exception(subject_invalid(v)); + } + if (v == global_context_name) { + throw as_exception(subject_invalid(v)); + } +} + } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/types.h b/src/v/pandaproxy/schema_registry/types.h index f1690d463d7ff..5afed4d2dfcef 100644 --- a/src/v/pandaproxy/schema_registry/types.h +++ b/src/v/pandaproxy/schema_registry/types.h @@ -240,6 +240,19 @@ void validate_context_subject( const context_subject& ctx_sub, is_config_or_mode is_config_or_mode = is_config_or_mode::no); +/// Validate that a context is well-formed. Enforces the minimum set of rules +/// required for the context to round-trip through the qualified-subject wire +/// format (`:.context:subject`, see parse_subject in types.cc): +/// - must start with '.' +/// - must not contain ':' +/// - must not be the reserved '.__GLOBAL' context +/// Other characters (including `/`, NUL, and whitespace) are intentionally +/// permitted: the format is implicitly constrained by the tooling that +/// consumes these strings, and policing "bananas but valid" context names is +/// out of scope for this check. Throws exception with +/// error_code::subject_invalid if invalid. +void validate_context(const context& ctx); + /// A reference subject that may be qualified or unqualified. /// Unqualified references are resolved relative to a parent schema's context. struct context_subject_reference { diff --git a/tests/rptest/clients/types.py b/tests/rptest/clients/types.py index f0d58e6affe92..63955b2f19033 100644 --- a/tests/rptest/clients/types.py +++ b/tests/rptest/clients/types.py @@ -49,6 +49,7 @@ class TopicSpec: PROPERTY_ICEBERG_INVALID_RECORD_ACTION = "redpanda.iceberg.invalid.record.action" PROPERTY_ICEBERG_TARGET_LAG_MS = "redpanda.iceberg.target.lag.ms" PROPERTY_ICEBERG_PARTITION_SPEC = "redpanda.iceberg.partition.spec" + PROPERTY_SCHEMA_REGISTRY_CONTEXT = "redpanda.schema.registry.context" PROPERTY_MIN_CLEANABLE_DIRTY_RATIO = "min.cleanable.dirty.ratio" PROPERTY_MIN_COMPACTION_LAG_MS = "min.compaction.lag.ms" PROPERTY_MAX_COMPACTION_LAG_MS = "max.compaction.lag.ms" diff --git a/tests/rptest/tests/datalake/schema_registry_context_test.py b/tests/rptest/tests/datalake/schema_registry_context_test.py new file mode 100644 index 0000000000000..be5eba7c4dad3 --- /dev/null +++ b/tests/rptest/tests/datalake/schema_registry_context_test.py @@ -0,0 +1,210 @@ +# Copyright 2025 Redpanda Data, Inc. +# +# Use of this software is governed by the Business Source License +# included in the file licenses/BSL.md +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0 + +import json + +from confluent_kafka import SerializingProducer +from confluent_kafka.schema_registry import SchemaRegistryClient +from confluent_kafka.schema_registry.avro import AvroSerializer +from ducktape.mark import matrix + +from rptest.clients.types import TopicSpec +from rptest.services.cluster import cluster +from rptest.services.redpanda import SISettings, SchemaRegistryConfig +from rptest.tests.datalake.catalog_service_factory import ( + filesystem_catalog_type, +) +from rptest.tests.datalake.datalake_services import DatalakeServices +from rptest.tests.datalake.query_engine_base import QueryEngineType +from rptest.tests.datalake.utils import supported_storage_types +from rptest.tests.redpanda_test import RedpandaTest + +SCHEMA_A = { + "type": "record", + "name": "RecordA", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "int"}, + ], +} + +SCHEMA_B = { + "type": "record", + "name": "RecordB", + "fields": [ + {"name": "color", "type": "string"}, + {"name": "size", "type": "double"}, + ], +} + +SCHEMA_C = { + "type": "record", + "name": "RecordC", + "fields": [ + {"name": "x", "type": "int"}, + ], +} + + +class DatalakeSchemaRegistryContextTest(RedpandaTest): + def __init__(self, test_context): + super().__init__( + test_context=test_context, + num_brokers=1, + extra_rp_conf={ + "iceberg_enabled": True, + "iceberg_catalog_commit_interval_ms": 5000, + "schema_registry_enable_qualified_subjects": True, + }, + schema_registry_config=SchemaRegistryConfig(), + si_settings=SISettings(test_context=test_context), + ) + + def setUp(self): + # DatalakeServices starts Redpanda. + pass + + def _sr_url(self): + return self.redpanda.schema_reg().split(",")[0] + + def _produce_confluent_records(self, topic, context, schema_dict, records): + """Produce Avro records using AvroSerializer with a context-qualified + subject name strategy. This mirrors how a real producer would use SR + contexts: the serializer registers/looks up the schema under the + context-qualified subject (e.g. ':.ctx1:topic-value') and encodes + records in standard Confluent wire format.""" + sr_client = SchemaRegistryClient({"url": self._sr_url()}) + + def subject_name_strategy(ctx, schema): + return f":{context}:{ctx.topic}-value" + + avro_serializer = AvroSerializer( + sr_client, + json.dumps(schema_dict), + conf={"subject.name.strategy": subject_name_strategy}, + ) + + producer = SerializingProducer( + { + "bootstrap.servers": self.redpanda.brokers(), + "value.serializer": avro_serializer, + } + ) + for record in records: + producer.produce(topic, value=record) + producer.flush() + + @cluster(num_nodes=3) + @matrix(cloud_storage_type=supported_storage_types()) + def test_per_topic_context_routing(self, cloud_storage_type): + """Each topic resolves schemas from its configured SR context.""" + + with DatalakeServices( + self.test_context, + redpanda=self.redpanda, + catalog_type=filesystem_catalog_type(), + include_query_engines=[QueryEngineType.SPARK], + ) as dl: + # Create topics with per-topic SR context. + dl.create_iceberg_enabled_topic( + "topic_a", + iceberg_mode="value_schema_id_prefix", + config={ + TopicSpec.PROPERTY_SCHEMA_REGISTRY_CONTEXT: ".ctx1", + }, + ) + dl.create_iceberg_enabled_topic( + "topic_b", + iceberg_mode="value_schema_id_prefix", + config={ + TopicSpec.PROPERTY_SCHEMA_REGISTRY_CONTEXT: ".ctx2", + }, + ) + + # Produce records — AvroSerializer registers SCHEMA_A under + # .ctx1 and SCHEMA_B under .ctx2. + records_a = [{"name": f"user_{i}", "age": 20 + i} for i in range(10)] + records_b = [{"color": f"color_{i}", "size": float(i)} for i in range(10)] + + self._produce_confluent_records("topic_a", ".ctx1", SCHEMA_A, records_a) + self._produce_confluent_records("topic_b", ".ctx2", SCHEMA_B, records_b) + + # Wait for translation. + dl.wait_for_translation("topic_a", msg_count=10) + dl.wait_for_translation("topic_b", msg_count=10) + + # Verify Iceberg table columns. + spark = dl.spark() + + desc_a = spark.run_query_fetch_all("describe redpanda.topic_a") + # Spark describe returns header row at [0] and partition info + # in the last 3 rows; strip those. + cols_a = {(r[0], r[1]) for r in desc_a[1:-3]} + assert ("name", "string") in cols_a, ( + f"Expected 'name' string column in topic_a, got {cols_a}" + ) + assert ("age", "int") in cols_a, ( + f"Expected 'age' int column in topic_a, got {cols_a}" + ) + + desc_b = spark.run_query_fetch_all("describe redpanda.topic_b") + cols_b = {(r[0], r[1]) for r in desc_b[1:-3]} + assert ("color", "string") in cols_b, ( + f"Expected 'color' string column in topic_b, got {cols_b}" + ) + assert ("size", "double") in cols_b, ( + f"Expected 'size' double column in topic_b, got {cols_b}" + ) + + # Negative: topic_a should NOT have topic_b's columns. + assert ("color", "string") not in cols_a, ( + "topic_a should not have topic_b's 'color' column" + ) + assert ("name", "string") not in cols_b, ( + "topic_b should not have topic_a's 'name' column" + ) + + @cluster(num_nodes=3) + @matrix(cloud_storage_type=supported_storage_types()) + def test_wrong_context_dlq(self, cloud_storage_type): + """Schema ID not present in the configured context sends records + to the dead-letter-queue table.""" + + with DatalakeServices( + self.test_context, + redpanda=self.redpanda, + catalog_type=filesystem_catalog_type(), + include_query_engines=[QueryEngineType.SPARK], + ) as dl: + # Create topic pointing to a context where the schema does + # not exist. + dl.create_iceberg_enabled_topic( + "topic_c", + iceberg_mode="value_schema_id_prefix", + config={ + TopicSpec.PROPERTY_SCHEMA_REGISTRY_CONTEXT: ".wrong", + TopicSpec.PROPERTY_ICEBERG_INVALID_RECORD_ACTION: "dlq_table", + }, + ) + + # Produce records encoded with the schema ID from .ctx1. + # The translator resolves IDs against .wrong, where SCHEMA_C + # does not exist, so all records are routed to the DLQ. + records = [{"x": i} for i in range(10)] + self._produce_confluent_records("topic_c", ".ctx1", SCHEMA_C, records) + + # Records should land in the DLQ table, not the main table. + dl.wait_for_translation( + "topic_c", + msg_count=10, + table_override="topic_c~dlq", + ) + assert dl.num_tables() == 1, ( + f"Expected only the DLQ table, got {dl.num_tables()} tables" + ) diff --git a/tests/rptest/tests/describe_topics_test.py b/tests/rptest/tests/describe_topics_test.py index e2c70970ec457..08b5a658bbae9 100644 --- a/tests/rptest/tests/describe_topics_test.py +++ b/tests/rptest/tests/describe_topics_test.py @@ -204,6 +204,13 @@ def test_describe_topics_with_documentation_and_types(self): value="io.confluent.kafka.serializers.subject.TopicNameStrategy", doc_string="The subject name strategy for values if confluent.value.schema.validation is enabled", ), + "redpanda.schema.registry.context": ConfigProperty( + config_type="STRING", + value=".", + doc_string="Schema Registry context used to look up schemas referenced by " + "records in this topic (e.g. by the in-broker Iceberg translator). " + "Defaults to the Schema Registry default context ('.').", + ), "initial.retention.local.target.bytes": ConfigProperty( config_type="LONG", value="-1", diff --git a/tools/offline_log_viewer/controller.py b/tools/offline_log_viewer/controller.py index 2a22518a1e355..768b0dafbf6bf 100644 --- a/tools/offline_log_viewer/controller.py +++ b/tools/offline_log_viewer/controller.py @@ -171,6 +171,11 @@ def read_topic_properties_serde(rdr: Reader, version): "storage_mode": rdr.read_serde_enum(), } + if version >= 14: + topic_properties |= { + "schema_registry_context": rdr.read_optional(Reader.read_string), + } + return topic_properties @@ -188,7 +193,7 @@ def read_topic_config(rdr: Reader, version): "topic": rdr.read_string(), "partitions": rdr.read_int32(), "replication_factor": rdr.read_int16(), - "properties": rdr.read_envelope(read_topic_properties_serde, reader_version=11), + "properties": rdr.read_envelope(read_topic_properties_serde, reader_version=14), } if version < 1: # see https://github.com/redpanda-data/redpanda/pull/6613 @@ -367,10 +372,40 @@ def incr_topic_upd(rdr: Reader, version): rdr, lambda r: r.read_optional(Reader.read_uuid) ), } + if version >= 9: + incr_obj |= { + "message_timestamp_before_max_ms": read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_int64) + ), + "message_timestamp_after_max_ms": read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_int64) + ), + } + if version >= 10: + incr_obj |= { + "remote_label": read_property_update_serde( + rdr, + lambda r: r.read_optional( + lambda r: r.read_envelope( + lambda r, _: {"cluster_uuid": r.read_uuid()}, + reader_version=0, + ) + ), + ), + "storage_mode": read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_serde_enum) + ), + } + if version >= 11: + incr_obj |= { + "schema_registry_context": read_property_update_serde( + rdr, lambda r: r.read_optional(Reader.read_string) + ), + } return incr_obj - return rdr.read_envelope(incr_topic_upd, reader_version=8) + return rdr.read_envelope(incr_topic_upd, reader_version=11) def read_create_partitions_serde(rdr: Reader): diff --git a/tools/type-checking/type-check-strictness.json b/tools/type-checking/type-check-strictness.json index 401c1b365c034..024404052873c 100644 --- a/tools/type-checking/type-check-strictness.json +++ b/tools/type-checking/type-check-strictness.json @@ -129,6 +129,7 @@ "rptest/tests/datalake/protobuf_json_test.py", "rptest/tests/datalake/recovery_mode_test.py", "rptest/tests/datalake/rest_catalog_connection_test.py", + "rptest/tests/datalake/schema_registry_context_test.py", "rptest/tests/datalake/simple_connect_test.py", "rptest/tests/datalake/spark_sql_server_smoke_test.py", "rptest/tests/datalake/throttling_test.py",