Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/v/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,7 @@ redpanda_cc_library(
"//src/v/cloud_storage:remote_label",
"//src/v/model",
"//src/v/pandaproxy/schema_registry:subject_name_strategy",
"//src/v/pandaproxy/schema_registry:types",
"//src/v/reflection:adl",
"//src/v/serde",
"//src/v/serde:chrono",
Expand Down Expand Up @@ -777,6 +778,7 @@ redpanda_cc_library(
"//src/v/container:contiguous_range_map",
"//src/v/model",
"//src/v/pandaproxy/schema_registry:subject_name_strategy",
"//src/v/pandaproxy/schema_registry:types",
"//src/v/raft",
"//src/v/security",
"//src/v/security:license",
Expand Down Expand Up @@ -2037,6 +2039,7 @@ redpanda_cc_library(
"//src/v/base",
"//src/v/config",
"//src/v/metrics",
"//src/v/pandaproxy/schema_registry:types",
"//src/v/storage",
"//src/v/utils:uuid",
"@seastar",
Expand Down Expand Up @@ -2447,6 +2450,7 @@ redpanda_cc_library(
"//src/v/net:types",
"//src/v/pandaproxy/schema_registry:config",
"//src/v/pandaproxy/schema_registry:subject_name_strategy",
"//src/v/pandaproxy/schema_registry:types",
"//src/v/raft",
"//src/v/random:generators",
"//src/v/random:time_jitter",
Expand Down
8 changes: 8 additions & 0 deletions src/v/cluster/tests/topic_properties_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "base/units.h"
#include "cluster/types.h"
#include "model/tests/randoms.h"
#include "pandaproxy/schema_registry/types.h"
#include "random/generators.h"
#include "test_utils/randoms.h"

Expand Down Expand Up @@ -79,5 +80,12 @@ inline cluster::topic_properties random_topic_properties() {
model::redpanda_storage_mode::tiered,
model::redpanda_storage_mode::cloud});

// Mix of default context and randomly generated non-default contexts.
if (tests::random_bool()) {
properties.schema_registry_context
= pandaproxy::schema_registry::context{
"." + random_generators::gen_alphanum_string(8)};
}

return properties;
}
5 changes: 4 additions & 1 deletion src/v/cluster/topic_properties.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ fmt::iterator topic_properties::format_to(fmt::iterator it) const {
"remote_topic_allow_gaps: {}, "
"batch_max_bytes: {}, retention_local_target_bytes: {}, "
"retention_local_target_ms: {}, remote_delete: {}, segment_ms: {}, "
"schema_registry_context: {}, "
"record_key_schema_id_validation: {}, "
"record_key_schema_id_validation_compat: {}, "
"record_key_subject_name_strategy: {}, "
Expand Down Expand Up @@ -73,6 +74,7 @@ fmt::iterator topic_properties::format_to(fmt::iterator it) const {
retention_local_target_ms,
remote_delete,
segment_ms,
schema_registry_context,
record_key_schema_id_validation,
record_key_schema_id_validation_compat,
record_key_subject_name_strategy,
Expand Down Expand Up @@ -165,7 +167,8 @@ bool topic_properties::has_overrides() const {
|| remote_topic_allow_gaps.has_value()
|| message_timestamp_before_max_ms.has_value()
|| message_timestamp_after_max_ms.has_value()
|| storage_mode != storage::ntp_config::default_storage_mode;
|| storage_mode != storage::ntp_config::default_storage_mode
|| schema_registry_context.has_value();

return overrides;
}
Expand Down
16 changes: 14 additions & 2 deletions src/v/cluster/topic_properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "model/metadata.h"
#include "model/timestamp.h"
#include "pandaproxy/schema_registry/subject_name_strategy.h"
#include "pandaproxy/schema_registry/types.h"
#include "reflection/adl.h"
#include "serde/rw/chrono.h"
#include "serde/rw/envelope.h"
Expand All @@ -34,7 +35,7 @@ namespace cluster {
*/
struct topic_properties
: serde::
envelope<topic_properties, serde::version<13>, serde::compat_version<0>> {
envelope<topic_properties, serde::version<14>, serde::compat_version<0>> {
topic_properties() noexcept = default;
topic_properties(
std::optional<model::compression> compression,
Expand Down Expand Up @@ -176,6 +177,16 @@ struct topic_properties

tristate<std::chrono::milliseconds> segment_ms{std::nullopt};

// Schema Registry context — the namespace within which schema ids and
// subjects resolve for this topic. Contexts are a Schema Registry
// namespacing mechanism: schema ids are unique within a context but not
// across them. Consumed by the in-broker Iceberg translator (today) and
// intended to also cover record_{key,value}_schema_id_validation below
// in the future, since a given schema id on a given record can only
// resolve to one schema. std::nullopt means the SR default context (".");
// has_overrides/describe treat nullopt as unset.
std::optional<pandaproxy::schema_registry::context> schema_registry_context;

std::optional<bool> record_key_schema_id_validation;
std::optional<bool> record_key_schema_id_validation_compat;
std::optional<pandaproxy::schema_registry::subject_name_strategy>
Expand Down Expand Up @@ -317,7 +328,8 @@ struct topic_properties
max_compaction_lag_ms,
message_timestamp_before_max_ms,
message_timestamp_after_max_ms,
storage_mode);
storage_mode,
schema_registry_context);
}

friend bool
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/topic_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "container/chunked_hash_map.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "pandaproxy/schema_registry/types.h"
#include "storage/ntp_config.h"

#include <seastar/coroutine/maybe_yield.hh>
Expand Down Expand Up @@ -1231,6 +1232,9 @@ topic_properties topic_table::update_topic_properties(
updated_properties.storage_mode,
overrides.storage_mode,
storage::ntp_config::default_storage_mode);
incremental_update(
updated_properties.schema_registry_context,
overrides.schema_registry_context);
return updated_properties;
}

Expand Down
2 changes: 2 additions & 0 deletions src/v/cluster/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,7 @@ fmt::iterator incremental_topic_updates::format_to(fmt::iterator it) const {
"segment_size: {} retention_bytes: {} retention_duration: {} "
"shadow_indexing: {}, batch_max_bytes: {}, retention_local_target_bytes: "
"{}, retention_local_target_ms: {}, remote_delete: {}, segment_ms: {}, "
"schema_registry_context: {}, "
"record_key_schema_id_validation: {}"
"record_key_schema_id_validation_compat: {}"
"record_key_subject_name_strategy: {}"
Expand Down Expand Up @@ -355,6 +356,7 @@ fmt::iterator incremental_topic_updates::format_to(fmt::iterator it) const {
retention_local_target_ms,
remote_delete,
segment_ms,
schema_registry_context,
record_key_schema_id_validation,
record_key_schema_id_validation_compat,
record_key_subject_name_strategy,
Expand Down
9 changes: 7 additions & 2 deletions src/v/cluster/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "model/timeout_clock.h"
#include "model/transform.h"
#include "pandaproxy/schema_registry/subject_name_strategy.h"
#include "pandaproxy/schema_registry/types.h"
#include "raft/errc.h"
#include "raft/fwd.h"
#include "raft/transfer_leadership.h"
Expand Down Expand Up @@ -591,7 +592,7 @@ struct property_update<tristate<T>>
struct incremental_topic_updates
: serde::envelope<
incremental_topic_updates,
serde::version<10>,
serde::version<11>,
serde::compat_version<0>> {
static constexpr int8_t version_with_data_policy = -1;
static constexpr int8_t version_with_shadow_indexing = -3;
Expand Down Expand Up @@ -683,6 +684,9 @@ struct incremental_topic_updates
message_timestamp_after_max_ms;
property_update<std::optional<model::redpanda_storage_mode>> storage_mode;

property_update<std::optional<pandaproxy::schema_registry::context>>
schema_registry_context;

// Not a regular topic property. Used to assign topic UUIDs to pre-25-2
// topics that were created without one.
property_update<std::optional<model::topic_id>> topic_id;
Expand Down Expand Up @@ -745,7 +749,8 @@ struct incremental_topic_updates
message_timestamp_before_max_ms,
message_timestamp_after_max_ms,
remote_label,
storage_mode);
storage_mode,
schema_registry_context);
}

fmt::iterator format_to(fmt::iterator it) const;
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster_link/utils/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ redpanda_cc_library(
"//src/v/base",
"//src/v/cluster:types",
"//src/v/kafka/server:topic_config_utils",
"//src/v/pandaproxy/schema_registry:types",
"//src/v/reflection:type_traits",
],
)
16 changes: 16 additions & 0 deletions src/v/cluster_link/utils/topic_properties_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "base/type_traits.h"
#include "kafka/server/handlers/configs/config_utils.h"
#include "pandaproxy/schema_registry/types.h"
#include "reflection/type_traits.h"

namespace {
Expand Down Expand Up @@ -289,6 +290,21 @@ bool maybe_append_update(
topic_config.properties.iceberg_target_lag_ms,
kafka::iceberg_target_lag_ms_validator);
}
if (config_name == kafka::topic_property_schema_registry_context) {
// context is named_type<ss::sstring>, not ss::sstring, so we need an
// explicit parse function.
kafka::parse_and_set_property(
topic_config.tp_ns,
update.properties.schema_registry_context,
config_value,
kafka::config_resource_operation::set,
kafka::schema_registry_context_validator{},
[](const ss::sstring& s) {
return pandaproxy::schema_registry::context{s};
});
return update.properties.schema_registry_context.value
!= topic_config.properties.schema_registry_context;
}

if (config_name == kafka::topic_property_min_cleanable_dirty_ratio) {
return parse_and_set(
Expand Down
20 changes: 16 additions & 4 deletions src/v/datalake/coordinator/coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "datalake/coordinator/state_update.h"
#include "datalake/logger.h"
#include "datalake/partition_spec_parser.h"
#include "datalake/record_schema_resolver.h"
#include "datalake/record_translator.h"
#include "datalake/table_id_provider.h"
#include "model/fundamental.h"
Expand Down Expand Up @@ -401,8 +402,10 @@ coordinator::do_ensure_table_exists(

struct coordinator::main_table_schema_provider
: public coordinator::table_schema_provider {
explicit main_table_schema_provider(coordinator& parent)
: parent(parent) {}
main_table_schema_provider(
coordinator& parent, pandaproxy::schema_registry::context ctx)
: parent(parent)
, ctx_(std::move(ctx)) {}

iceberg::table_identifier
get_table_id(const model::topic& topic) const final {
Expand All @@ -414,7 +417,7 @@ struct coordinator::main_table_schema_provider
std::optional<shared_resolved_type_t> val_type;
if (comps.val_identifier) {
auto type_res = co_await parent.type_resolver_.resolve_identifier(
comps.val_identifier.value());
comps.val_identifier.value(), ctx_);
if (type_res.has_error()) {
co_return errc::failed;
}
Expand All @@ -432,6 +435,7 @@ struct coordinator::main_table_schema_provider
}

const coordinator& parent;
pandaproxy::schema_registry::context ctx_;
};

ss::future<checked<std::nullopt_t, coordinator::errc>>
Expand All @@ -447,12 +451,20 @@ coordinator::sync_ensure_table_exists(
if (waiter_fut.has_value()) {
co_return co_await std::move(*waiter_fut);
}
auto topic_md = topic_table_.get_topic_metadata_ref(
model::topic_namespace_view{model::kafka_namespace, topic});
auto sr_ctx = topic_md ? topic_md->get()
.get_configuration()
.properties.schema_registry_context.value_or(
pandaproxy::schema_registry::default_context)
: pandaproxy::schema_registry::default_context;
Comment on lines +454 to +460
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you consider having the translators pass the context to the coordinator in the RPC? It's a bit surprising to me that that isn't the case, particularly because the RPC request contains other topic + schema information already. I guess we would expect them to always be the same...

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but the coordinator already has access to the topic config so that it can read the context info. There's an invariant that context doesn't change while translation is active, so there can't be a mismatch between what the coordinator sees as the context and what the translator did.


auto res_fut = co_await ss::coroutine::as_future(do_ensure_table_exists(
topic,
topic_revision,
std::move(comps),
"sync_ensure_table_exists",
main_table_schema_provider{*this}));
main_table_schema_provider{*this, std::move(sr_ctx)}));

if (res_fut.failed()) {
// NOTE: we don't expect any exceptions given we're using result types,
Expand Down
7 changes: 6 additions & 1 deletion src/v/datalake/datalake_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ namespace {
static std::unique_ptr<type_resolver> make_type_resolver(
const model::iceberg_mode& mode,
model::topic_view topic_name,
pandaproxy::schema_registry::context sr_context,
schema::registry& sr,
schema_cache& cache,
resolved_type_cache& type_cache) {
Expand All @@ -50,7 +51,8 @@ static std::unique_ptr<type_resolver> make_type_resolver(
case model::iceberg_mode::variant::key_value:
return std::make_unique<binary_type_resolver>();
case model::iceberg_mode::variant::value_schema_id_prefix:
return std::make_unique<record_schema_resolver>(sr, cache, type_cache);
return std::make_unique<record_schema_resolver>(
sr, cache, type_cache, std::move(sr_context));
case model::iceberg_mode::variant::value_schema_latest:
auto subject = pandaproxy::schema_registry::subject(
fmt::format("{}-value", topic_name));
Expand All @@ -59,6 +61,7 @@ static std::unique_ptr<type_resolver> make_type_resolver(
}
return std::make_unique<latest_subject_schema_resolver>(
sr,
std::move(sr_context),
subject,
mode.protobuf_full_name(),
config::shard_local_cfg().iceberg_latest_schema_cache_ttl_ms.bind(),
Expand Down Expand Up @@ -633,6 +636,8 @@ ss::future<> datalake_manager::handle_translator_state_change(
auto type_resolver = make_type_resolver(
mode,
ntp.tp.topic,
partition->topic_cfg->properties.schema_registry_context.value_or(
pandaproxy::schema_registry::default_context),
*_schema_registry,
*_schema_cache,
*_resolved_type_cache);
Expand Down
Loading
Loading