From 1935f09fd73ee6f21fb7d8988f0825a0edf4fe38 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Mon, 23 Feb 2026 23:00:04 -0800 Subject: [PATCH 01/11] sr: Introduce rpc_transport Signed-off-by: Oren Leiman --- src/v/pandaproxy/schema_registry/BUILD | 21 +++ src/v/pandaproxy/schema_registry/fwd.h | 1 + .../schema_registry/rpc_transport.cc | 147 ++++++++++++++++++ .../schema_registry/rpc_transport.h | 51 ++++++ 4 files changed, 220 insertions(+) create mode 100644 src/v/pandaproxy/schema_registry/rpc_transport.cc create mode 100644 src/v/pandaproxy/schema_registry/rpc_transport.h diff --git a/src/v/pandaproxy/schema_registry/BUILD b/src/v/pandaproxy/schema_registry/BUILD index 0f81066a4e258..6883852fa1b2c 100644 --- a/src/v/pandaproxy/schema_registry/BUILD +++ b/src/v/pandaproxy/schema_registry/BUILD @@ -132,6 +132,27 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "rpc_transport", + srcs = [ + "rpc_transport.cc", + ], + hdrs = [ + "rpc_transport.h", + ], + implementation_deps = [ + ":exceptions", + "//src/v/kafka/protocol", + "//src/v/model", + "@seastar", + ], + visibility = ["//visibility:public"], + deps = [ + ":transport", + "//src/v/kafka/data/rpc", + ], +) + redpanda_cc_library( name = "compatibility", srcs = ["compatibility.cc"], diff --git a/src/v/pandaproxy/schema_registry/fwd.h b/src/v/pandaproxy/schema_registry/fwd.h index 3b5f4cf11e5c8..45249d398069b 100644 --- a/src/v/pandaproxy/schema_registry/fwd.h +++ b/src/v/pandaproxy/schema_registry/fwd.h @@ -24,5 +24,6 @@ class sharded_store; class store; class transport; class kafka_client_transport; +class rpc_transport; } // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/rpc_transport.cc b/src/v/pandaproxy/schema_registry/rpc_transport.cc new file mode 100644 index 0000000000000..e5fffcb96d225 --- /dev/null +++ b/src/v/pandaproxy/schema_registry/rpc_transport.cc @@ -0,0 +1,147 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "pandaproxy/schema_registry/rpc_transport.h" + +#include "kafka/data/rpc/client.h" +#include "kafka/protocol/exceptions.h" +#include "model/fundamental.h" +#include "model/namespace.h" +#include "pandaproxy/schema_registry/exceptions.h" + +#include + +namespace pandaproxy::schema_registry { + +namespace { + +/// Translate cluster::errc from the rpc layer into a pp::sr or kafka +/// exception with the goal of fitting directly into existing error +/// handling logic in service & seq_writer. timeout & not_leader are +/// of particular interest since most error paths will treat those +/// as retryable. unknown_server_error is the catchall for other faults +/// and is generally surfaced to the API. For that reason, faults +/// unique to the RPC layer may now appear in 500 error bodies. +/// TODO: get rid of the kafka::exception dependency if possible and +/// make error handling at the next layer up generic to "transport". +[[noreturn]] void +throw_as_kafka_error(std::string_view context, cluster::errc ec) { + switch (ec) { + case cluster::errc::topic_not_exists: + throw exception( + kafka::error_code::unknown_server_error, + "_schemas topic does not exist"); + case cluster::errc::not_leader: + throw kafka::exception( + kafka::error_code::not_leader_for_partition, + fmt::format("{}: {}", context, ec)); + case cluster::errc::timeout: + throw kafka::exception( + kafka::error_code::request_timed_out, + fmt::format("{}: {}", context, ec)); + default: + throw kafka::exception( + kafka::error_code::unknown_server_error, + fmt::format("{}: {}", context, ec)); + } +} + +} // namespace + +rpc_transport::rpc_transport(kafka::data::rpc::client& client) + : _client(client) {} + +ss::future rpc_transport::produce(model::record_batch batch) { + auto res = co_await _client.produce_with_leader_mitigation( + model::schema_registry_internal_tp, std::move(batch)); + if (res.ec != cluster::errc::success) { + throw_as_kafka_error("RPC produce failed", res.ec); + } + vassert( + res.base_offset.has_value(), + "Possible RPC produce version mismatch (response incomplete)"); + co_return produce_result{.base_offset = *res.base_offset}; +} + +ss::future rpc_transport::get_high_watermark() { + auto result = co_await _client.get_single_partition_offsets( + model::schema_registry_internal_tp); + if (result.has_error()) { + throw_as_kafka_error( + "RPC get_partition_offsets failed", result.error()); + } + co_return kafka::offset_cast(result.value().high_watermark); +} + +ss::future<> rpc_transport::consume_range( + model::offset start, + model::offset end, + ss::noncopyable_function(model::record_batch)> + consumer) { + // The RPC consume API may not return all records in a single call, + // so loop until we've consumed up to the desired end offset. + + // Matches the Kafka transport default. KIP-74 obligatory batch + // makes oversized batch handling identical across transports for + // the single-partition _schemas topic. + constexpr size_t consume_max_bytes = 1_MiB; + constexpr auto consume_timeout = 5s; + auto current = start; + // Adapt the existing exclusive upper bound semantics of the old kafka + // reader to the inclusive upper bound semantics of the record batch reader + // in the RPC service. + auto end_inclusive = model::prev_offset(end); + while (current < end) { + auto result = co_await _client.consume( + model::schema_registry_internal_tp, + offset_cast(current), + offset_cast(end_inclusive), + 1, + consume_max_bytes, + consume_timeout); + if (result.has_error()) { + throw_as_kafka_error("RPC consume failed", result.error()); + } + auto& reply = result.value(); + if (reply.err != cluster::errc::success) { + throw_as_kafka_error("RPC consume error", reply.err); + } + if (reply.batches.empty()) { + // TODO: map to a retryable error_code (e.g. leader_not_available) + // so service::do_start retries this. Note that the kclient version + // has this issue in client_fetch_batch_reader.cc. Fix both + // together. + throw kafka::exception( + kafka::error_code::unknown_server_error, + fmt::format( + "No records returned in range [{}, {})", current, end)); + } + for (auto& batch : reply.batches) { + auto last = batch.last_offset(); + auto stop = co_await consumer(std::move(batch)); + current = model::next_offset(last); + if (stop == ss::stop_iteration::yes) { + co_return; + } + } + } +} + +ss::future rpc_transport::create_topic( + model::topic_namespace_view tp_ns, + int32_t partition_count, + cluster::topic_properties properties, + int16_t replication_factor) { + co_return co_await _client.create_topic( + tp_ns, std::move(properties), partition_count, replication_factor); +} + +} // namespace pandaproxy::schema_registry diff --git a/src/v/pandaproxy/schema_registry/rpc_transport.h b/src/v/pandaproxy/schema_registry/rpc_transport.h new file mode 100644 index 0000000000000..c3fc6c894d12c --- /dev/null +++ b/src/v/pandaproxy/schema_registry/rpc_transport.h @@ -0,0 +1,51 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include "kafka/data/rpc/fwd.h" +#include "pandaproxy/schema_registry/transport.h" + +namespace pandaproxy::schema_registry { + +/// Transport implementation that wraps kafka::data::rpc::client for schema +/// registry internal topic I/O. +/// +/// TODO: Add transport-layer retry for topic_not_exists with backoff, +/// analogous to how kafka_client_transport gets SR-specific retry via +/// gated_retry_with_mitigation + mitigate_error. The k/d/rpc client is +/// general-purpose and shouldn't retry missing topics, but this transport +/// knows the _schemas topic must exist and can retry at the right layer. +/// Currently the do_start loop in service.cc handles this as a fallback. +class rpc_transport final : public transport { +public: + explicit rpc_transport(kafka::data::rpc::client& client); + + ss::future<> stop() final { return ss::now(); } + + ss::future produce(model::record_batch batch) override; + ss::future get_high_watermark() override; + ss::future<> consume_range( + model::offset start, + model::offset end, + ss::noncopyable_function< + ss::future(model::record_batch)> consumer) override; + ss::future create_topic( + model::topic_namespace_view, + int32_t partition_count, + cluster::topic_properties, + int16_t replication_factor) override; + +private: + kafka::data::rpc::client& _client; +}; + +} // namespace pandaproxy::schema_registry From 8c82821406011acf20f9ffac35eb16db4718527d Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Tue, 24 Feb 2026 11:17:29 -0800 Subject: [PATCH 02/11] sr/test: Add rpc_transport_test Some of this might be better placed under k/d/rpc/tests, but this is the specific functionality required by schema registry itself. Signed-off-by: Oren Leiman --- src/v/pandaproxy/schema_registry/test/BUILD | 21 ++ .../test/rpc_transport_test.cc | 272 ++++++++++++++++++ 2 files changed, 293 insertions(+) create mode 100644 src/v/pandaproxy/schema_registry/test/rpc_transport_test.cc diff --git a/src/v/pandaproxy/schema_registry/test/BUILD b/src/v/pandaproxy/schema_registry/test/BUILD index 997396f757544..eb201333d81ee 100644 --- a/src/v/pandaproxy/schema_registry/test/BUILD +++ b/src/v/pandaproxy/schema_registry/test/BUILD @@ -600,6 +600,27 @@ redpanda_cc_bench( ], ) +redpanda_cc_gtest( + name = "rpc_transport_test", + timeout = "short", + srcs = ["rpc_transport_test.cc"], + cpu = 1, + memory = "256MiB", + deps = [ + "//src/v/kafka/data/rpc", + "//src/v/kafka/data/rpc/test:test_deps", + "//src/v/kafka/protocol", + "//src/v/model", + "//src/v/net", + "//src/v/pandaproxy/schema_registry:rpc_transport", + "//src/v/rpc", + "//src/v/storage:record_batch_builder", + "//src/v/test_utils:gtest", + "@googletest//:gtest", + "@seastar", + ], +) + redpanda_test_cc_library( name = "random", hdrs = [ diff --git a/src/v/pandaproxy/schema_registry/test/rpc_transport_test.cc b/src/v/pandaproxy/schema_registry/test/rpc_transport_test.cc new file mode 100644 index 0000000000000..3b36dcb31d23d --- /dev/null +++ b/src/v/pandaproxy/schema_registry/test/rpc_transport_test.cc @@ -0,0 +1,272 @@ +/* + * Copyright 2025 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "kafka/data/rpc/test/deps.h" +#include "kafka/protocol/exceptions.h" +#include "model/namespace.h" +#include "net/server.h" +#include "pandaproxy/schema_registry/rpc_transport.h" +#include "rpc/connection_cache.h" +#include "rpc/rpc_server.h" +#include "storage/record_batch_builder.h" + +#include + +#include +#include + +#include + +namespace pps = pandaproxy::schema_registry; +namespace kdr = kafka::data::rpc; + +namespace { + +constexpr uint16_t test_server_port = 8081; +constexpr model::node_id self_node = model::node_id(1); +constexpr model::node_id other_node = model::node_id(2); + +struct test_parameters { + model::node_id leader_node; + + friend std::ostream& + operator<<(std::ostream& os, const test_parameters& tp) { + return os << "{leader_node: " << tp.leader_node << "}"; + } +}; + +class SchemaRegistryRpcTransportTest + : public ::testing::TestWithParam { +public: + void SetUp() override { + _as.start().get(); + _kd = std::make_unique( + self_node, &_conn_cache, other_node); + _kd->wire_up_and_start(); + + net::server_configuration scfg("sr_rpc_transport_test_server"); + scfg.addrs.emplace_back( + ss::socket_address(ss::ipv4_addr("127.0.0.1", test_server_port))); + scfg.max_service_memory_per_core = 1_GiB; + scfg.disable_metrics = net::metrics_disabled::yes; + scfg.disable_public_metrics = net::public_metrics_disabled::yes; + _server = std::make_unique<::rpc::rpc_server>(scfg); + std::vector> rpc_services; + _kd->register_services(rpc_services); + _server->add_services(std::move(rpc_services)); + _server->start(); + + _conn_cache.start(std::ref(_as), std::nullopt).get(); + ::rpc::transport_configuration tcfg( + net::unresolved_address("127.0.0.1", test_server_port)); + tcfg.disable_metrics = net::metrics_disabled::yes; + _conn_cache.local() + .emplace( + other_node, + tcfg, + ::make_exponential_backoff_policy(1s, 3s)) + .get(); + + _kd->topic_creator()->set_default_new_topic_leader(leader_node()); + + _transport = std::make_unique( + _kd->client().local()); + + auto ec = _transport + ->create_topic( + model::topic_namespace_view{ + model::kafka_namespace, + model::schema_registry_internal_tp.topic}, + 1, + cluster::topic_properties{}, + 1) + .get(); + ASSERT_EQ(ec, cluster::errc::success); + } + + void TearDown() override { + _transport.reset(); + _conn_cache.stop().get(); + _server->stop().get(); + _server.reset(); + _as.stop().get(); + _kd->reset(); + } + + model::node_id leader_node() const { return GetParam().leader_node; } + + pps::rpc_transport& transport() { return *_transport; } + + model::offset hwm() { return transport().get_high_watermark().get(); } + + void set_errors(int n) { + _kd->local_partition_manager()->set_errors(n); + _kd->remote_partition_manager()->set_errors(n); + } + + model::record_batch make_batch(std::string_view key, std::string_view val) { + storage::record_batch_builder rb{ + model::record_batch_type::raft_data, model::offset{0}}; + iobuf key_buf; + key_buf.append(key.data(), key.size()); + iobuf val_buf; + val_buf.append(val.data(), val.size()); + rb.add_raw_kv(std::move(key_buf), std::move(val_buf)); + return std::move(rb).build(); + } + + model::record_batch make_multi_record_batch( + std::vector> kvs) { + storage::record_batch_builder rb{ + model::record_batch_type::raft_data, model::offset{0}}; + for (auto& [k, v] : kvs) { + iobuf key_buf; + key_buf.append(k.data(), k.size()); + iobuf val_buf; + val_buf.append(v.data(), v.size()); + rb.add_raw_kv(std::move(key_buf), std::move(val_buf)); + } + return std::move(rb).build(); + } + +private: + std::unique_ptr<::rpc::rpc_server> _server; + ss::sharded<::rpc::connection_cache> _conn_cache; + std::unique_ptr _kd; + ss::sharded _as; + std::unique_ptr _transport; +}; + +} // namespace + +TEST_P(SchemaRegistryRpcTransportTest, ProduceReturnsBaseOffset) { + auto batch = make_batch("key1", "val1"); + auto result = transport().produce(std::move(batch)).get(); + EXPECT_GE(result.base_offset, model::offset(0)); +} + +TEST_P(SchemaRegistryRpcTransportTest, ProduceMultiRecordReturnsBaseOffset) { + auto batch = make_multi_record_batch({{"k1", "v1"}, {"k2", "v2"}}); + ASSERT_EQ(batch.record_count(), 2); + auto result = transport().produce(std::move(batch)).get(); + // 2-record batch starting at offset 0: base=0, last=1. + EXPECT_EQ(result.base_offset, model::offset{0}); +} + +TEST_P(SchemaRegistryRpcTransportTest, GetHighWatermarkAfterProduce) { + // No records produced HWM should be 0 (next_offset of -1). + EXPECT_EQ(hwm(), model::offset(0)); + transport().produce(make_batch("k1", "v1")).get(); + EXPECT_EQ(hwm(), model::offset(1)); +} + +TEST_P( + SchemaRegistryRpcTransportTest, GetHighWatermarkRetriesOnTransientError) { + // Inject 2 transient errors; get_high_watermark should retry and succeed + transport().produce(make_batch("k1", "v1")).get(); + set_errors(2); + EXPECT_EQ(hwm(), model::offset(1)); +} + +TEST_P(SchemaRegistryRpcTransportTest, ConsumeRangeProducedBatches) { + std::vector records_produced; + records_produced.push_back(make_batch("k1", "v1")); + records_produced.push_back(make_batch("k2", "v2")); + records_produced.push_back(make_batch("k3", "v3")); + + model::offset max_produced{0}; + for (auto& b : records_produced) { + b.header().base_offset + = transport().produce(b.copy()).get().base_offset; + max_produced = std::max(max_produced, b.header().base_offset); + } + + std::vector records_consumed; + model::offset max_consumed{0}; + // Note that the offset range is partially open [beg, end) + transport() + .consume_range( + model::offset(0), + model::offset(3), + [&records_consumed, &max_consumed]( + this auto, model::record_batch b) -> ss::future { + max_consumed = std::max(max_consumed, b.base_offset()); + records_consumed.push_back(std::move(b)); + co_return ss::stop_iteration::no; + }) + .get(); + + EXPECT_EQ(records_consumed.size(), records_produced.size()); + for (const auto& [c, p] : + std::views::zip(records_consumed, records_produced)) { + EXPECT_EQ(c, p); + } + EXPECT_EQ(max_consumed, max_produced); +} + +TEST_P(SchemaRegistryRpcTransportTest, ConsumeRangeRetriesOnTransientError) { + transport().produce(make_batch("k1", "v1")).get(); + + set_errors(2); + int consumed_count = 0; + transport() + .consume_range( + model::offset(0), + model::offset(1), + [&consumed_count]( + this auto, model::record_batch) -> ss::future { + ++consumed_count; + co_return ss::stop_iteration::no; + }) + .get(); + EXPECT_EQ(consumed_count, 1); +} + +TEST_P(SchemaRegistryRpcTransportTest, ConsumeRangeEmpty) { + // Consuming an empty range should return without hitting the callback but + // NOT error. + int consumed_count = 0; + transport() + .consume_range( + model::offset(0), + model::offset(0), + [&consumed_count]( + this auto, model::record_batch) -> ss::future { + ++consumed_count; + co_return ss::stop_iteration::no; + }) + .get(); + EXPECT_EQ(consumed_count, 0); +} + +TEST_P(SchemaRegistryRpcTransportTest, ConsumeRangePastHwmThrows) { + // This is a schema registry thing. If we can't consume the whole range, + // then the in-memory store will be out of date. It's always an error. + transport().produce(make_batch("k1", "v1")).get(); + + EXPECT_THROW( + transport() + .consume_range( + model::offset(0), + hwm() + model::offset(10), + [](model::record_batch) -> ss::future { + co_return ss::stop_iteration::no; + }) + .get(), + kafka::exception); +} + +INSTANTIATE_TEST_SUITE_P( + LocalAndRemoteLeader, + SchemaRegistryRpcTransportTest, + ::testing::Values( + test_parameters{.leader_node = self_node}, + test_parameters{.leader_node = other_node})); From 39db2531850a00fb4eed2225fa53357476c3638d Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Mon, 23 Feb 2026 17:46:42 -0800 Subject: [PATCH 03/11] config: schema_registry_use_rpc Defaults to true. Signed-off-by: Oren Leiman --- src/v/config/configuration.cc | 8 ++++++++ src/v/config/configuration.h | 1 + 2 files changed, 9 insertions(+) diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index c7157909ef287..f3525dd75a0d9 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -2000,6 +2000,14 @@ configuration::configuration() "produce audit log messages using a Kafka client instead.", {.needs_restart = needs_restart::yes, .visibility = visibility::tunable}, true) + , schema_registry_use_rpc( + *this, + "schema_registry_use_rpc", + "Use internal Redpanda RPCs for schema registry internal topic I/O. " + "When disabled, use a Kafka client for schema registry internal topic " + "I/O instead.", + {.needs_restart = needs_restart::yes, .visibility = visibility::tunable}, + true) , cloud_storage_enabled( *this, true, diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 1af365c018b1e..08a4b9175f5d6 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -370,6 +370,7 @@ struct configuration final : public config_store { property> audit_excluded_principals; enum_property audit_failure_policy; property audit_use_rpc; + property schema_registry_use_rpc; // Archival storage enterprise> cloud_storage_enabled; From 97abf8727c127ce5692136ac025a1651660c11ac Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Mon, 23 Feb 2026 18:50:44 -0800 Subject: [PATCH 04/11] sr: Wire k/d/rpc/client into pp/sr/api Signed-off-by: Oren Leiman --- src/v/pandaproxy/schema_registry/BUILD | 2 +- src/v/pandaproxy/schema_registry/api.cc | 7 +++++-- src/v/pandaproxy/schema_registry/api.h | 5 ++++- src/v/redpanda/application_runtime.cc | 3 ++- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/BUILD b/src/v/pandaproxy/schema_registry/BUILD index 6883852fa1b2c..07eb1009113d5 100644 --- a/src/v/pandaproxy/schema_registry/BUILD +++ b/src/v/pandaproxy/schema_registry/BUILD @@ -375,7 +375,6 @@ redpanda_cc_library( "//src/v/cluster:node_status_table", "//src/v/config", "//src/v/kafka/client:configuration", - "//src/v/kafka/data/rpc", "//src/v/kafka/protocol:create_topics", "//src/v/kafka/server:topic_config_utils", "//src/v/metrics", @@ -391,6 +390,7 @@ redpanda_cc_library( ":types", "//src/v/base", "//src/v/cluster", + "//src/v/kafka/data/rpc", "//src/v/model", "//src/v/security", "@seastar", diff --git a/src/v/pandaproxy/schema_registry/api.cc b/src/v/pandaproxy/schema_registry/api.cc index 586cd8a569e5a..c46a7e8eab155 100644 --- a/src/v/pandaproxy/schema_registry/api.cc +++ b/src/v/pandaproxy/schema_registry/api.cc @@ -13,6 +13,7 @@ #include "cluster/controller.h" #include "config/configuration.h" #include "kafka/client/configuration.h" +#include "kafka/data/rpc/client.h" #include "kafka/data/rpc/deps.h" #include "pandaproxy/logger.h" #include "pandaproxy/schema_registry/configuration.h" @@ -56,7 +57,8 @@ api::api( configuration& cfg, ss::sharded* metadata_cache, std::unique_ptr& c, - ss::sharded& audit_mgr) noexcept + ss::sharded& audit_mgr, + ss::sharded* rpc_client) noexcept : _node_id{node_id} , _sg{sg} , _max_memory{max_memory} @@ -64,7 +66,8 @@ api::api( , _cfg{cfg} , _metadata_cache(metadata_cache) , _controller(c) - , _audit_mgr(audit_mgr) {} + , _audit_mgr(audit_mgr) + , _rpc_client(rpc_client) {} api::~api() noexcept = default; diff --git a/src/v/pandaproxy/schema_registry/api.h b/src/v/pandaproxy/schema_registry/api.h index 08cacc12b5f15..d1f0ce0f23ee3 100644 --- a/src/v/pandaproxy/schema_registry/api.h +++ b/src/v/pandaproxy/schema_registry/api.h @@ -13,6 +13,7 @@ #include "base/seastarx.h" #include "cluster/metrics_reporter.h" +#include "kafka/data/rpc/fwd.h" #include "model/metadata.h" #include "pandaproxy/schema_registry/fwd.h" #include "security/fwd.h" @@ -49,7 +50,8 @@ class api { configuration& cfg, ss::sharded* metadata_cache, std::unique_ptr&, - ss::sharded&) noexcept; + ss::sharded&, + ss::sharded* rpc_client) noexcept; ~api() noexcept; ss::future<> start(); @@ -83,6 +85,7 @@ class api { ss::sharded _service; ss::sharded _sequencer; ss::sharded& _audit_mgr; + ss::sharded* _rpc_client; // Metrics telemetry support - only used on shard 0 ss::gate _metrics_gate{}; diff --git a/src/v/redpanda/application_runtime.cc b/src/v/redpanda/application_runtime.cc index e26ba36babe11..5c629f5ce608c 100644 --- a/src/v/redpanda/application_runtime.cc +++ b/src/v/redpanda/application_runtime.cc @@ -67,7 +67,8 @@ void application::wire_up_runtime_services( *_schema_reg_config, &metadata_cache, std::reference_wrapper(controller), - std::ref(audit_mgr)); + std::ref(audit_mgr), + &_kafka_data_rpc_client); } if (wasm_data_transforms_enabled()) { From e4ab32e3097c646a918212ddc2ce3d34f56a5f4c Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Fri, 24 Apr 2026 00:24:12 -0700 Subject: [PATCH 05/11] sr/api: Introduce transport_impl Pimpl wrapper for kafka_client_transport and rpc_transport. Dispatch to various methods as appropriate. Signed-off-by: Oren Leiman --- src/v/pandaproxy/schema_registry/BUILD | 3 +- src/v/pandaproxy/schema_registry/api.cc | 115 ++++++++++++++++++++++++ src/v/pandaproxy/schema_registry/api.h | 2 + 3 files changed, 119 insertions(+), 1 deletion(-) diff --git a/src/v/pandaproxy/schema_registry/BUILD b/src/v/pandaproxy/schema_registry/BUILD index 07eb1009113d5..fc209ea69b4a3 100644 --- a/src/v/pandaproxy/schema_registry/BUILD +++ b/src/v/pandaproxy/schema_registry/BUILD @@ -367,6 +367,7 @@ redpanda_cc_library( ":configuration", ":exceptions", ":kafka_client_transport", + ":rpc_transport", ":seq_writer", ":server", ":store", @@ -374,6 +375,7 @@ redpanda_cc_library( "//src/v/cluster:members_table", "//src/v/cluster:node_status_table", "//src/v/config", + "//src/v/features", "//src/v/kafka/client:configuration", "//src/v/kafka/protocol:create_topics", "//src/v/kafka/server:topic_config_utils", @@ -436,7 +438,6 @@ redpanda_cc_library( ":avro", ":context_router", ":json_schema", - ":kafka_client_transport", ":protobuf_schema", ":seq_writer", ":store", diff --git a/src/v/pandaproxy/schema_registry/api.cc b/src/v/pandaproxy/schema_registry/api.cc index c46a7e8eab155..bc25d6752a61b 100644 --- a/src/v/pandaproxy/schema_registry/api.cc +++ b/src/v/pandaproxy/schema_registry/api.cc @@ -12,12 +12,14 @@ #include "cluster/cluster_link/frontend.h" #include "cluster/controller.h" #include "config/configuration.h" +#include "features/feature_table.h" #include "kafka/client/configuration.h" #include "kafka/data/rpc/client.h" #include "kafka/data/rpc/deps.h" #include "pandaproxy/logger.h" #include "pandaproxy/schema_registry/configuration.h" #include "pandaproxy/schema_registry/kafka_client_transport.h" +#include "pandaproxy/schema_registry/rpc_transport.h" #include "pandaproxy/schema_registry/schema_id_cache.h" #include "pandaproxy/schema_registry/seq_writer.h" #include "pandaproxy/schema_registry/service.h" @@ -30,9 +32,122 @@ #include #include #include +#include namespace pandaproxy::schema_registry { +struct api::transport_impl { + transport_impl( + ss::sharded* rpc_client, + kafka::client::configuration& client_cfg, + cluster::controller& controller) + : _rpc_client(rpc_client) + , _client_cfg(client_cfg) + , _controller(controller) { + if (!should_use_rpc(rpc_client, controller)) { + _v.emplace>(); + } + // else: _v default-constructs to sharded. + vlog( + srlog.info, + "Schema registry in {} mode", + std::holds_alternative>(_v) + ? "RPC" + : "Kafka client"); + } + + /// Decide whether to use the RPC transport at startup. Falls back to + /// the kafka::client transport (with a warning) if the config asks for + /// RPC but preconditions aren't met. + static bool should_use_rpc( + const ss::sharded* rpc_client, + cluster::controller& controller) { + if (!config::shard_local_cfg().schema_registry_use_rpc()) { + return false; + } + if (!rpc_client) { + vlog( + srlog.info, + "schema_registry_use_rpc enabled but RPC client not available. " + "Falling back to Kafka client."); + return false; + } + const bool rpc_available + = controller.get_feature_table().local().get_active_version() + >= features::to_cluster_version(features::release_version::v26_2_1); + if (!rpc_available) { + vlog( + srlog.info, + "schema_registry_use_rpc enabled but cluster version too old. " + "Falling back to Kafka client. RPC mode will be available " + "on the next restart after all brokers are upgraded."); + return false; + } + return true; + } + + ss::future<> start() { + return ss::visit( + _v, + [this](ss::sharded& t) { + return t.start(ss::sharded_parameter([this] { + return std::ref(_rpc_client->local()); + })); + }, + [this](ss::sharded& t) { + return t.start( + std::ref(_client_cfg), + std::ref(_controller), + ss::sharded_parameter([this] { + return kafka::data::rpc::topic_creator::make_default( + &_controller); + })); + }); + } + + /// Only kafka_client_transport has per-shard credentials to load; the + /// RPC path needs no configuration. + ss::future<> configure() { + return ss::visit( + _v, + [](ss::sharded&) { return ss::now(); }, + [](ss::sharded& t) { + return t.invoke_on_all(&kafka_client_transport::configure); + }); + } + + ss::future<> invoke_stop_on_all() { + return ss::visit(_v, [](auto& t) { + return t.invoke_on_all([](auto& local) { return local.stop(); }); + }); + } + + ss::future<> stop() { + return ss::visit(_v, [](auto& t) { return t.stop(); }); + } + + transport& local() { + return ss::visit(_v, [](auto& t) -> transport& { return t.local(); }); + } + + bool has_ephemeral_credentials() const { + return ss::visit( + _v, + [](const ss::sharded&) { return false; }, + [](const ss::sharded& t) { + return t.local().has_ephemeral_credentials(); + }); + } + +private: + std:: + variant, ss::sharded> + _v; + ss::sharded* _rpc_client; + kafka::client::configuration& _client_cfg; + cluster::controller& _controller; +}; + class sequence_state_checker_impl : public sequence_state_checker { public: explicit sequence_state_checker_impl( diff --git a/src/v/pandaproxy/schema_registry/api.h b/src/v/pandaproxy/schema_registry/api.h index d1f0ce0f23ee3..fac6c0c41a2a9 100644 --- a/src/v/pandaproxy/schema_registry/api.h +++ b/src/v/pandaproxy/schema_registry/api.h @@ -68,6 +68,8 @@ class api { contribute_metrics(cluster::metrics_reporter::metrics_snapshot&) const; private: + struct transport_impl; + friend class schema_id_validator; friend class schema::registry; model::node_id _node_id; From 1604d8956fea389260187181e4fead73aa3c8b16 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Mon, 23 Feb 2026 20:47:02 -0800 Subject: [PATCH 06/11] sr/api: Wire up transport_impl And plumb into seq_writer & service. Transport selected at runtime dependent on config & cluster version. Signed-off-by: Oren Leiman --- src/v/pandaproxy/schema_registry/api.cc | 33 +++++++++++++++---------- src/v/pandaproxy/schema_registry/api.h | 4 ++- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/src/v/pandaproxy/schema_registry/api.cc b/src/v/pandaproxy/schema_registry/api.cc index bc25d6752a61b..aeabeb7c575ef 100644 --- a/src/v/pandaproxy/schema_registry/api.cc +++ b/src/v/pandaproxy/schema_registry/api.cc @@ -201,17 +201,18 @@ ss::future<> api::start() { return config::shard_local_cfg() .kafka_schema_id_validation_cache_capacity.bind(); })); - co_await _transport.start( - std::ref(_client_cfg), - std::ref(*_controller), - ss::sharded_parameter([this]() { - return kafka::data::rpc::topic_creator::make_default( - _controller.get()); - })); + // Build the transport locally and only install it into _transport + // on successful start. If start() throws, the local impl unwinds and + // the partially-initialized sharded<> goes with it. + auto impl = std::make_unique( + _rpc_client, _client_cfg, *_controller); + co_await impl->start(); + _transport = std::move(impl); + co_await _sequencer.start( _node_id, _sg, - ss::sharded_parameter([this] { return std::ref(_transport.local()); }), + ss::sharded_parameter([this] { return std::ref(_transport->local()); }), std::ref(*_store), ss::sharded_parameter([this] { return std::make_unique(_controller); @@ -220,7 +221,7 @@ ss::future<> api::start() { config::to_yaml(_cfg, config::redact_secrets::no), _sg, _max_memory, - ss::sharded_parameter([this] { return std::ref(_transport.local()); }), + ss::sharded_parameter([this] { return std::ref(_transport->local()); }), std::ref(*_store), std::ref(_sequencer), ss::sharded_parameter([this]() { @@ -230,7 +231,7 @@ ss::future<> api::start() { std::ref(_controller), std::ref(_audit_mgr)); - co_await _transport.invoke_on_all(&kafka_client_transport::configure); + co_await _transport->configure(); co_await _service.invoke_on_all(&service::start); if (ss::this_shard_id() == 0) { @@ -259,10 +260,16 @@ ss::future<> api::stop() { // Reset gate to support api restart _metrics_gate = ss::gate{}; } - co_await _transport.invoke_on_all(&kafka_client_transport::stop); + if (_transport) { + co_await _transport->invoke_stop_on_all(); + } co_await _service.stop(); co_await _sequencer.stop(); - co_await _transport.stop(); + if (_transport) { + co_await _transport->stop(); + _transport.reset(); + } + co_await _schema_id_cache.stop(); co_await _schema_id_validation_probe.stop(); if (_store) { @@ -284,7 +291,7 @@ const kafka::client::configuration& api::get_client_config() const { } bool api::has_ephemeral_credentials() const { - return _transport.local().has_ephemeral_credentials(); + return _transport && _transport->has_ephemeral_credentials(); } ss::future<> api::contribute_metrics( diff --git a/src/v/pandaproxy/schema_registry/api.h b/src/v/pandaproxy/schema_registry/api.h index fac6c0c41a2a9..9813d4122f98f 100644 --- a/src/v/pandaproxy/schema_registry/api.h +++ b/src/v/pandaproxy/schema_registry/api.h @@ -21,6 +21,8 @@ #include #include +#include + namespace YAML { class Node; } @@ -80,7 +82,7 @@ class api { ss::sharded* _metadata_cache; std::unique_ptr& _controller; - ss::sharded _transport; + std::unique_ptr _transport; std::unique_ptr _store; ss::sharded _schema_id_validation_probe; ss::sharded _schema_id_cache; From 9abbfa5ca6df765082ab25a9ea24a5df9b2cff49 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Mon, 23 Feb 2026 22:38:47 -0800 Subject: [PATCH 07/11] dt/sr: SchemaRegistryRpcTransportTest & friends Signed-off-by: Oren Leiman --- tests/rptest/tests/schema_registry_test.py | 124 +++++++++++++++++++-- 1 file changed, 116 insertions(+), 8 deletions(-) diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 659dc4438bcb3..6ef80058aeaa4 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -5181,20 +5181,27 @@ def test_id_lookup_multiple_matches(self): self.assert_equal(result_raw.json()["id"], 2) -class SchemaRegistryContextTest(SchemaRegistryEndpoints): +class SchemaRegistryContextTestBase(SchemaRegistryEndpoints): """ Tests for context-qualified subject functionality. These tests verify that Schema Registry correctly handles context-qualified subjects (e.g., ":.ctx:subject") for isolation, references, config, and mode. + + Base class. Use SchemaRegistryContextTest (kafka client) + or SchemaRegistryContextRpcTransportTest (RPC). """ def __init__(self, context: TestContext, **kwargs: Any): schema_registry_config = SchemaRegistryConfig() schema_registry_config.mode_mutability = True + extra_rp_conf = {} + if "extra_rp_conf" in kwargs: + extra_rp_conf.update(kwargs.pop("extra_rp_conf")) super().__init__( context, schema_registry_config=schema_registry_config, + extra_rp_conf=extra_rp_conf, **kwargs, ) @@ -7320,12 +7327,37 @@ def test_context_prefix_serde_client(self, client_type): ) -class SchemaRegistryBasicAuthTest(SchemaRegistryEndpoints): +class SchemaRegistryContextTest(SchemaRegistryContextTestBase): + """Kafka client transport variant of the context tests.""" + + def __init__(self, context: TestContext, **kwargs: Any): + super().__init__( + context, + extra_rp_conf={"schema_registry_use_rpc": False}, + **kwargs, + ) + + +class SchemaRegistryContextRpcTransportTest(SchemaRegistryContextTestBase): + """RPC transport variant of the context tests.""" + + def __init__(self, context: TestContext, **kwargs: Any): + super().__init__( + context, + extra_rp_conf={"schema_registry_use_rpc": True}, + **kwargs, + ) + + +class SchemaRegistryBasicAuthTestBase(SchemaRegistryEndpoints): """ Test schema registry against a redpanda cluster with HTTP Basic Auth enabled. + + Base class. Use SchemaRegistryBasicAuthTest (kafka client) + or SchemaRegistryBasicAuthRpcTransportTest (RPC). """ - def __init__(self, context): + def __init__(self, context, **kwargs): security = SecurityConfig() security.enable_sasl = True security.endpoint_authn_method = "sasl" @@ -7334,8 +7366,16 @@ def __init__(self, context): schema_registry_config.authn_method = "http_basic" schema_registry_config.mode_mutability = True - super(SchemaRegistryBasicAuthTest, self).__init__( - context, security=security, schema_registry_config=schema_registry_config + extra_rp_conf = {} + if "extra_rp_conf" in kwargs: + extra_rp_conf.update(kwargs.pop("extra_rp_conf")) + + super(SchemaRegistryBasicAuthTestBase, self).__init__( + context, + security=security, + schema_registry_config=schema_registry_config, + extra_rp_conf=extra_rp_conf, + **kwargs, ) superuser = self.redpanda.SUPERUSER_CREDENTIALS @@ -8115,15 +8155,40 @@ def schema_no_longer_present(): ) +class SchemaRegistryBasicAuthTest(SchemaRegistryBasicAuthTestBase): + """Kafka client transport variant of the basic auth tests.""" + + def __init__(self, context, **kwargs): + super().__init__( + context, + extra_rp_conf={"schema_registry_use_rpc": False}, + **kwargs, + ) + + +class SchemaRegistryBasicAuthRpcTransportTest(SchemaRegistryBasicAuthTestBase): + """RPC transport variant of the basic auth tests.""" + + def __init__(self, context, **kwargs): + super().__init__( + context, + extra_rp_conf={"schema_registry_use_rpc": True}, + **kwargs, + ) + + class SchemaRegistryTest(SchemaRegistryTestMethods): """ Test schema registry against a redpanda cluster without auth. - This derived class inherits all the tests from SchemaRegistryTestMethods. + Uses the Kafka client transport. The RPC transport variant is + SchemaRegistryRpcTransportTest. """ def __init__(self, context): - super(SchemaRegistryTest, self).__init__(context) + super(SchemaRegistryTest, self).__init__( + context, extra_rp_conf={"schema_registry_use_rpc": False} + ) @cluster(num_nodes=3) def test_nodejs_serde_client(self): @@ -8139,6 +8204,20 @@ def test_nodejs_serde_client(self): ) +class SchemaRegistryRpcTransportTest(SchemaRegistryTestMethods): + """ + Test schema registry using the internal RPC transport instead of the + Kafka client transport. + + This derived class inherits all the tests from SchemaRegistryTestMethods. + """ + + def __init__(self, context): + super(SchemaRegistryRpcTransportTest, self).__init__( + context, extra_rp_conf={"schema_registry_use_rpc": True} + ) + + class SchemaRegistryAutoAuthTest(SchemaRegistryTestMethods): """ Test schema registry against a redpanda cluster with Auto Auth enabled. @@ -10921,17 +11000,24 @@ def test_enterprise_sanctions(self): ) -class SchemaRegistryContextAuthzTest(SchemaRegistryAclAuthzTestBase): +class SchemaRegistryContextAuthzTestBase(SchemaRegistryAclAuthzTestBase): """ Authorization tests for context-qualified subject functionality. These tests verify that Schema Registry correctly enforces ACL authorization when using context-qualified subjects and the subject query parameter. + + Base class. Use SchemaRegistryContextAuthzTest (kafka client) + or SchemaRegistryContextAuthzRpcTransportTest (RPC). """ def __init__(self, context: TestContext, **kwargs: Any): + extra_rp_conf = {} + if "extra_rp_conf" in kwargs: + extra_rp_conf.update(kwargs.pop("extra_rp_conf")) super().__init__( context, + extra_rp_conf=extra_rp_conf, **kwargs, ) @@ -11283,3 +11369,25 @@ def test_context_prefix_all_subject_operations_protected(self): 403, f"GET schemas/ids/{sid}/schema should be 403, got {result.status_code}", ) + + +class SchemaRegistryContextAuthzTest(SchemaRegistryContextAuthzTestBase): + """Kafka client transport variant of the context authz tests.""" + + def __init__(self, context: TestContext, **kwargs: Any): + super().__init__( + context, + extra_rp_conf={"schema_registry_use_rpc": False}, + **kwargs, + ) + + +class SchemaRegistryContextAuthzRpcTransportTest(SchemaRegistryContextAuthzTestBase): + """RPC transport variant of the context authz tests.""" + + def __init__(self, context: TestContext, **kwargs: Any): + super().__init__( + context, + extra_rp_conf={"schema_registry_use_rpc": True}, + **kwargs, + ) From 2af7072cde8c663088d6240b7ece4ef1b2c4c85b Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Tue, 3 Mar 2026 19:23:58 -0800 Subject: [PATCH 08/11] dt: Enable RPC transport for relevant tests, pin the rest to KClient SchemaRegistryEndpoints now asserts that every subclass explicitly sets schema_registry_use_rpc. RPC-transport-enabled tests: - SchemaRegistryModeNotMutableTest - SchemaRegistryModeMutableTest - SchemaRegistryContextRpcTransportTest - SchemaRegistryBasicAuthRpcTransportTest - SchemaRegistryRpcTransportTest - SchemaRegistryAutoAuthTest - SchemaRegistryConfluentClient - SchemaRegistryCompatibilityModes - SchemaRegistryACLTest - SchemaRegistryContextAuthzRpcTransportTest - SchemaRegistryRpcTransportStressTest - ClusterLinkingSchemaRegistry - SchemaRegistryContextMetricsTest Signed-off-by: Oren Leiman --- tests/rptest/tests/admin_api_auth_test.py | 6 ++- tests/rptest/tests/audit_log_test.py | 4 ++ .../cluster_linking_topic_syncing_test.py | 1 + tests/rptest/tests/crl_test.py | 3 +- tests/rptest/tests/metrics_reporter_test.py | 1 + tests/rptest/tests/redpanda_oauth_test.py | 1 + tests/rptest/tests/rpk_registry_test.py | 1 + tests/rptest/tests/schema_registry_test.py | 46 +++++++++++++++---- tests/rptest/tests/security_report_test.py | 15 +++++- tests/rptest/tests/tls_metrics_test.py | 4 +- tests/rptest/tests/tls_version_test.py | 5 +- 11 files changed, 73 insertions(+), 14 deletions(-) diff --git a/tests/rptest/tests/admin_api_auth_test.py b/tests/rptest/tests/admin_api_auth_test.py index 260fc3cc0a408..cd88d6a52aef1 100644 --- a/tests/rptest/tests/admin_api_auth_test.py +++ b/tests/rptest/tests/admin_api_auth_test.py @@ -336,7 +336,11 @@ def __init__(self, context): security.endpoint_authn_method = "sasl" security.auto_auth = True - super(AdminApiListUsersTest, self).__init__(context, security=security) + super(AdminApiListUsersTest, self).__init__( + context, + security=security, + extra_rp_conf={"schema_registry_use_rpc": False}, + ) self.superuser = self.redpanda.SUPERUSER_CREDENTIALS self.superuser_admin = Admin( diff --git a/tests/rptest/tests/audit_log_test.py b/tests/rptest/tests/audit_log_test.py index cb7167a591499..c6780ea02874b 100644 --- a/tests/rptest/tests/audit_log_test.py +++ b/tests/rptest/tests/audit_log_test.py @@ -2898,6 +2898,9 @@ def __init__(self, test_context, **kwargs): sr_config = SchemaRegistryConfig() sr_config.authn_method = "http_basic" sr_config.mode_mutability = True + extra_rp_conf = {"schema_registry_use_rpc": False} + if "extra_rp_conf" in kwargs: + extra_rp_conf.update(kwargs.pop("extra_rp_conf")) super(AuditLogTestSchemaRegistryBase, self).__init__( test_context=test_context, audit_log_config=AuditLogConfig( @@ -2907,6 +2910,7 @@ def __init__(self, test_context, **kwargs): "info", logger_levels={"auditing": "trace", "schemaregistry": "trace"} ), schema_registry_config=sr_config, + extra_rp_conf=extra_rp_conf, **kwargs, ) diff --git a/tests/rptest/tests/cluster_linking_topic_syncing_test.py b/tests/rptest/tests/cluster_linking_topic_syncing_test.py index 4f897b0b5ce3e..3ff6df4485c58 100644 --- a/tests/rptest/tests/cluster_linking_topic_syncing_test.py +++ b/tests/rptest/tests/cluster_linking_topic_syncing_test.py @@ -810,6 +810,7 @@ def __init__(self, test_context, *args, **kwargs): secondary_cluster_args=SecondaryClusterArgs( schema_registry_config=SchemaRegistryConfig() ), + extra_rp_conf={"schema_registry_use_rpc": True}, log_config=LoggingConfig( "info", logger_levels={ diff --git a/tests/rptest/tests/crl_test.py b/tests/rptest/tests/crl_test.py index 91e3d04625f64..b239b5adc793b 100644 --- a/tests/rptest/tests/crl_test.py +++ b/tests/rptest/tests/crl_test.py @@ -104,7 +104,8 @@ def __init__(self, *args, **kwargs): { "kafka_mtls_principal_mapping_rules": [ self.security.principal_mapping_rules - ] + ], + "schema_registry_use_rpc": False, } ) diff --git a/tests/rptest/tests/metrics_reporter_test.py b/tests/rptest/tests/metrics_reporter_test.py index e032312c71743..066792f904dc6 100644 --- a/tests/rptest/tests/metrics_reporter_test.py +++ b/tests/rptest/tests/metrics_reporter_test.py @@ -404,6 +404,7 @@ def __init__(self, test_ctx): num_brokers=1, extra_rp_conf={ "health_monitor_max_metadata_age": 1000, + "schema_registry_use_rpc": True, **self.metrics.rp_conf(), }, schema_registry_config=SchemaRegistryConfig(), diff --git a/tests/rptest/tests/redpanda_oauth_test.py b/tests/rptest/tests/redpanda_oauth_test.py index 383cb97932fa1..f9e51edba1b41 100644 --- a/tests/rptest/tests/redpanda_oauth_test.py +++ b/tests/rptest/tests/redpanda_oauth_test.py @@ -152,6 +152,7 @@ def __init__( "oidc_token_audience": TOKEN_AUDIENCE, "kafka_sasl_max_reauth_ms": sasl_max_reauth_ms, "group_initial_rebalance_delay": 0, + "schema_registry_use_rpc": False, }, security=security, pandaproxy_config=pandaproxy_config, diff --git a/tests/rptest/tests/rpk_registry_test.py b/tests/rptest/tests/rpk_registry_test.py index 68d123adf57ac..965d5f69bf414 100644 --- a/tests/rptest/tests/rpk_registry_test.py +++ b/tests/rptest/tests/rpk_registry_test.py @@ -56,6 +56,7 @@ def __init__(self, ctx, schema_registry_config=SchemaRegistryConfig()): super(RpkRegistryTest, self).__init__( test_context=ctx, schema_registry_config=schema_registry_config, + extra_rp_conf={"schema_registry_use_rpc": False}, node_ready_timeout_s=60, ) # SASL Config diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 6ef80058aeaa4..5f17710d15341 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -1400,6 +1400,9 @@ def __init__( merged_rp_conf = {"auto_create_topics_enabled": False} if extra_rp_conf: merged_rp_conf.update(extra_rp_conf) + assert "schema_registry_use_rpc" in merged_rp_conf, ( + "schema_registry_use_rpc must be explicitly set by each test class" + ) super(SchemaRegistryEndpoints, self).__init__( context, extra_rp_conf=merged_rp_conf, @@ -3972,7 +3975,10 @@ def __init__(self, context, **kwargs): self.schema_registry_config.mode_mutability = False super(SchemaRegistryModeNotMutableTest, self).__init__( - context, schema_registry_config=self.schema_registry_config, **kwargs + context, + schema_registry_config=self.schema_registry_config, + extra_rp_conf={"schema_registry_use_rpc": True}, + **kwargs, ) @cluster(num_nodes=3) @@ -4020,7 +4026,10 @@ def __init__(self, context: TestContext, **kwargs: Any): self.schema_registry_config = SchemaRegistryConfig() self.schema_registry_config.mode_mutability = True super(SchemaRegistryModeMutableTest, self).__init__( - context, schema_registry_config=self.schema_registry_config, **kwargs + context, + schema_registry_config=self.schema_registry_config, + extra_rp_conf={"schema_registry_use_rpc": True}, + **kwargs, ) @cluster(num_nodes=3) @@ -8223,6 +8232,8 @@ class SchemaRegistryAutoAuthTest(SchemaRegistryTestMethods): Test schema registry against a redpanda cluster with Auto Auth enabled. This derived class inherits all the tests from SchemaRegistryTestMethods. + + No RPC variant in this case. RPC transport doesn't require auth at all.. """ def __init__(self, context): @@ -8231,7 +8242,11 @@ def __init__(self, context): security.endpoint_authn_method = "sasl" security.auto_auth = True - super(SchemaRegistryAutoAuthTest, self).__init__(context, security=security) + super(SchemaRegistryAutoAuthTest, self).__init__( + context, + security=security, + extra_rp_conf={"schema_registry_use_rpc": False}, + ) class SchemaRegistryMTLSBase(SchemaRegistryEndpoints): @@ -8240,7 +8255,9 @@ class SchemaRegistryMTLSBase(SchemaRegistryEndpoints): ] def __init__(self, *args, **kwargs): - super(SchemaRegistryMTLSBase, self).__init__(*args, **kwargs) + super(SchemaRegistryMTLSBase, self).__init__( + *args, extra_rp_conf={"schema_registry_use_rpc": False}, **kwargs + ) self.security = SecurityConfig() @@ -8666,7 +8683,9 @@ class SchemaRegistryConfluentClient(SchemaRegistryEndpoints): """ def __init__(self, context, **kwargs): - super(SchemaRegistryConfluentClient, self).__init__(context, **kwargs) + super(SchemaRegistryConfluentClient, self).__init__( + context, extra_rp_conf={"schema_registry_use_rpc": True}, **kwargs + ) # Replace the Redpanda SR client. self._base_uri = self.sr_client.base_uri() @@ -8878,7 +8897,12 @@ def test_references(self): class SchemaRegistryCompatibilityModes(SchemaRegistryEndpoints): def __init__(self, test_context, **kwargs): - super().__init__(test_context, num_brokers=1, **kwargs) + super().__init__( + test_context, + num_brokers=1, + extra_rp_conf={"schema_registry_use_rpc": True}, + **kwargs, + ) self._csr_client = SchemaRegistryClient({"url": self.sr_client.base_uri()}) self._topic = "test-topic" @@ -9127,7 +9151,9 @@ class SchemaRegistryACLTest(SchemaRegistryEndpoints): VALID_PATTERN_TYPES = ["LITERAL", "PREFIXED"] def __init__(self, context, **kwargs): - super(SchemaRegistryACLTest, self).__init__(context, **kwargs) + super(SchemaRegistryACLTest, self).__init__( + context, extra_rp_conf={"schema_registry_use_rpc": True}, **kwargs + ) def _create_test_acl( self, @@ -10427,12 +10453,16 @@ def __init__(self, context, extra_rp_conf: dict | None = None, **kwargs): schema_registry_config.authn_method = "http_basic" schema_registry_config.mode_mutability = True + merged_rp_conf = {"schema_registry_use_rpc": False} + if extra_rp_conf: + merged_rp_conf.update(extra_rp_conf) + super().__init__( context, security=security, num_brokers=1, schema_registry_config=schema_registry_config, - extra_rp_conf=extra_rp_conf, + extra_rp_conf=merged_rp_conf, **kwargs, ) diff --git a/tests/rptest/tests/security_report_test.py b/tests/rptest/tests/security_report_test.py index a2308c450c5ac..b6405ba6a6da9 100644 --- a/tests/rptest/tests/security_report_test.py +++ b/tests/rptest/tests/security_report_test.py @@ -844,7 +844,12 @@ def test_security_report(self, auto_auth, enable_auth): class SchemaRegistryNoSecurityReportTest(RedpandaTest): def __init__(self, *args, **kwargs): - super().__init__(*args, schema_registry_config=SchemaRegistryConfig(), **kwargs) + super().__init__( + *args, + schema_registry_config=SchemaRegistryConfig(), + extra_rp_conf={"schema_registry_use_rpc": False}, + **kwargs, + ) def setUp(self): super().setUp() @@ -952,6 +957,7 @@ def __init__(self, *args, **kwargs): *args, extra_rp_conf={ "schema_registry_enable_authorization": True, + "schema_registry_use_rpc": False, }, **kwargs, ) @@ -1014,7 +1020,12 @@ def test_security_report(self, auto_auth): class SchemaRegistryClientSecurityReportTest(RedpandaTest): def __init__(self, *args, **kwargs): - super().__init__(*args, schema_registry_config=SchemaRegistryConfig(), **kwargs) + super().__init__( + *args, + schema_registry_config=SchemaRegistryConfig(), + extra_rp_conf={"schema_registry_use_rpc": False}, + **kwargs, + ) def setUp(self): super().setUp() diff --git a/tests/rptest/tests/tls_metrics_test.py b/tests/rptest/tests/tls_metrics_test.py index 4162989f5d9b7..a6af936394525 100644 --- a/tests/rptest/tests/tls_metrics_test.py +++ b/tests/rptest/tests/tls_metrics_test.py @@ -95,7 +95,9 @@ class TLSMetricsTestBase(RedpandaTest): ] def __init__(self, *args, broker_faketime="-0d", client_faketime="-0d", **kwargs): - super().__init__(*args, **kwargs) + super().__init__( + *args, extra_rp_conf={"schema_registry_use_rpc": False}, **kwargs + ) self.broker_faketime = broker_faketime self.client_faketime = client_faketime diff --git a/tests/rptest/tests/tls_version_test.py b/tests/rptest/tests/tls_version_test.py index 7f76a169700ab..959d4ce9e63b9 100644 --- a/tests/rptest/tests/tls_version_test.py +++ b/tests/rptest/tests/tls_version_test.py @@ -96,7 +96,10 @@ class TLSVersionTestBase(RedpandaTest): """ def __init__(self, test_context, key_type: TLSKeyType): - super(TLSVersionTestBase, self).__init__(test_context=test_context) + super(TLSVersionTestBase, self).__init__( + test_context=test_context, + extra_rp_conf={"schema_registry_use_rpc": False}, + ) self.security = SecurityConfig() self.tls = TLSCertManager(self.logger, key_type=key_type) self.key_type = key_type From 7182a377320eeeaf461ccb7a5a4c21898f77b41d Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Sun, 15 Mar 2026 02:02:26 -0700 Subject: [PATCH 09/11] chore: Clean up unused includes kafka/data/rpc & pp/schema_registry Signed-off-by: Oren Leiman --- src/v/kafka/data/rpc/deps.cc | 1 - src/v/kafka/data/rpc/deps.h | 2 -- src/v/kafka/data/rpc/serde.h | 2 -- src/v/kafka/data/rpc/service.cc | 1 - src/v/kafka/data/rpc/service.h | 1 - src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc | 1 - 6 files changed, 8 deletions(-) diff --git a/src/v/kafka/data/rpc/deps.cc b/src/v/kafka/data/rpc/deps.cc index 2ea30f6b8ec61..a186c3372706e 100644 --- a/src/v/kafka/data/rpc/deps.cc +++ b/src/v/kafka/data/rpc/deps.cc @@ -13,7 +13,6 @@ #include "cluster/cluster_link/frontend.h" #include "cluster/controller.h" -#include "cluster/fwd.h" #include "cluster/metadata_cache.h" #include "cluster/partition_manager.h" #include "cluster/shard_table.h" diff --git a/src/v/kafka/data/rpc/deps.h b/src/v/kafka/data/rpc/deps.h index 6cd4e9fe8e773..03c2238aa14e2 100644 --- a/src/v/kafka/data/rpc/deps.h +++ b/src/v/kafka/data/rpc/deps.h @@ -17,8 +17,6 @@ #include "kafka/data/rpc/serde.h" #include "model/fundamental.h" #include "model/ktp.h" -#include "model/metadata.h" -#include "model/transform.h" #include diff --git a/src/v/kafka/data/rpc/serde.h b/src/v/kafka/data/rpc/serde.h index 8888fd4bd7dfc..6d8a8fe639d34 100644 --- a/src/v/kafka/data/rpc/serde.h +++ b/src/v/kafka/data/rpc/serde.h @@ -21,8 +21,6 @@ #include -#include - namespace kafka::data::rpc { struct kafka_topic_data diff --git a/src/v/kafka/data/rpc/service.cc b/src/v/kafka/data/rpc/service.cc index fd40718dca23c..11d5a623b8fda 100644 --- a/src/v/kafka/data/rpc/service.cc +++ b/src/v/kafka/data/rpc/service.cc @@ -16,7 +16,6 @@ #include "kafka/data/partition_proxy.h" #include "logger.h" #include "model/ktp.h" -#include "model/metadata.h" #include "model/record.h" #include "model/record_batch_reader.h" #include "model/timeout_clock.h" diff --git a/src/v/kafka/data/rpc/service.h b/src/v/kafka/data/rpc/service.h index efe56be4c0bb4..ca187b8ff0d36 100644 --- a/src/v/kafka/data/rpc/service.h +++ b/src/v/kafka/data/rpc/service.h @@ -12,7 +12,6 @@ #include "kafka/data/rpc/deps.h" #include "kafka/data/rpc/rpc_service.h" #include "kafka/data/rpc/serde.h" -#include "model/fundamental.h" #include "ssx/semaphore.h" #include diff --git a/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc b/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc index 50623014b5352..a63d5412cacb0 100644 --- a/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc +++ b/src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc @@ -10,7 +10,6 @@ */ #include "kafka/data/rpc/test/deps.h" -#include "model/metadata.h" #include "model/namespace.h" #include "net/server.h" #include "rpc/connection_cache.h" From 10c800d11961289a581f652c2d6101b4490907aa Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Wed, 4 Mar 2026 11:29:00 -0800 Subject: [PATCH 10/11] dt/sr: Stress test for RPC resilience to leadership transfers Restricted to release builds Signed-off-by: Oren Leiman --- tests/rptest/tests/schema_registry_test.py | 261 ++++++++++++++++++++- 1 file changed, 260 insertions(+), 1 deletion(-) diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 5f17710d15341..7ac66c1c86bed 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -62,7 +62,7 @@ wait_until_result, ) from rptest.utils.log_utils import wait_until_nag_is_set -from rptest.utils.mode_checks import skip_fips_mode +from rptest.utils.mode_checks import skip_debug_mode, skip_fips_mode Headers: TypeAlias = dict[str, str] | None @@ -11421,3 +11421,262 @@ def __init__(self, context: TestContext, **kwargs: Any): extra_rp_conf={"schema_registry_use_rpc": True}, **kwargs, ) + + +class SchemaRegistryTransportStressTest(SchemaRegistryEndpoints): + """ + Stress test for schema registry transport resilience. Performs + concurrent SR read/write operations while transferring leadership of + the _schemas topic. 500 errors should be infrequent and SR should + stay queryable. + """ + + def __init__(self, context: TestContext, **kwargs): + super().__init__( + context, + **kwargs, + ) + + @cluster(num_nodes=3) + @skip_debug_mode + def test_no_errors_during_leadership_transfers(self): + import threading + + admin = Admin(self.redpanda) + + # --- Setup: register initial schemas so reads have data --- + num_subjects = 3 + schema_ids = [] + subjects = [] + for i in range(num_subjects): + subject = f"stress-test-subject-{i}" + subjects.append(subject) + data = json.dumps( + { + "schema": json.dumps( + { + "type": "record", + "name": f"rec{i}", + "fields": [{"name": "f1", "type": "string"}], + } + ), + } + ) + result = self.sr_client.post_subjects_subject_versions( + subject=subject, data=data + ) + assert result.status_code == 200, ( + f"Setup: failed to register schema: {result.status_code} {result.text}" + ) + schema_ids.append(result.json()["id"]) + + self.logger.info( + f"Setup complete: {num_subjects} subjects, schema_ids={schema_ids}" + ) + + # --- Background workers --- + request_counter = 0 + request_counter_lock = threading.Lock() + errors: list[str] = [] + stop_event = threading.Event() + + def count_request(): + nonlocal request_counter + with request_counter_lock: + request_counter += 1 + + # Short timeout so threads don't block teardown. + req_timeout = 10 + + def reader_worker(): + """Continuously read subjects and schemas from random nodes.""" + while not stop_event.is_set(): + for node in self.redpanda.nodes: + if stop_event.is_set(): + break + hostname = node.account.hostname + try: + count_request() + r = self.sr_client.get_subjects( + hostname=hostname, timeout=req_timeout + ) + if r.status_code == 500: + errors.append(f"GET /subjects on {hostname}: 500 {r.text}") + for sid in schema_ids: + if stop_event.is_set(): + break + count_request() + r = self.sr_client.request( + "GET", + f"schemas/ids/{sid}", + hostname=hostname, + headers=HTTP_GET_HEADERS, + timeout=req_timeout, + ) + if r.status_code == 500: + errors.append( + f"GET /schemas/ids/{sid} on {hostname}: " + f"500 {r.text}" + ) + except Exception as e: + self.logger.warn(f"Reader exception on {hostname}: {e}") + + def writer_worker(): + """Continuously register new schema versions.""" + seq = 0 + while not stop_event.is_set(): + seq += 1 + subject = subjects[seq % num_subjects] + data = json.dumps( + { + "schema": json.dumps( + { + "type": "record", + "name": f"rec{seq % num_subjects}", + "fields": [ + {"name": "f1", "type": ["null", "string"]}, + { + "name": f"f_write_{seq}", + "type": "string", + "default": "x", + }, + ], + } + ), + } + ) + try: + count_request() + r = self.sr_client.post_subjects_subject_versions( + subject=subject, + data=data, + timeout=req_timeout, + ) + if r.status_code == 500: + errors.append( + f"POST /subjects/{subject}/versions: 500 {r.text}" + ) + except Exception as e: + self.logger.warn(f"Writer exception: {e}") + + # Pace writes to avoid overwhelming the cluster. Using + # stop_event.wait lets teardown cancel the pause instead of + # running out a full 0.5s of sleep. + if stop_event.wait(0.5): + break + + # Start 2 reader threads and 1 writer thread + threads = [] + for _ in range(2): + t = threading.Thread(target=reader_worker) + t.start() + threads.append(t) + t = threading.Thread(target=writer_worker) + t.start() + threads.append(t) + + # --- Perturbation: leadership transfers --- + num_transfers = 20 + node_ids = [self.redpanda.node_id(n) for n in self.redpanda.nodes] + for i in range(num_transfers): + leader = admin.get_partition_leader( + namespace="kafka", topic="_schemas", partition=0 + ) + # Pick a specific target so the same node doesn't re-elect itself. + targets = [n for n in node_ids if n != leader] + target = targets[i % len(targets)] + self.logger.info( + f"Transfer {i + 1}/{num_transfers}: moving leadership " + f"from node {leader} to node {target}" + ) + admin.partition_transfer_leadership( + namespace="kafka", + topic="_schemas", + partition=0, + target_id=target, + ) + + # Wait for the specific target to become leader, not just "any + # leader other than the old one". + wait_until( + lambda: admin.get_partition_leader( + namespace="kafka", topic="_schemas", partition=0 + ) + == target, + timeout_sec=10, + backoff_sec=1, + err_msg=f"Leadership did not transfer to node {target}", + ) + + # --- Teardown --- + stop_event.set() + for t in threads: + t.join(timeout=req_timeout + 5) + alive = [t for t in threads if t.is_alive()] + assert not alive, f"{len(alive)} worker thread(s) still alive after join" + + total_requests = request_counter + error_rate = len(errors) / total_requests if total_requests else 0 + self.logger.info( + f"Stress test complete: {num_transfers} leadership transfers, " + f"{total_requests} requests, {len(errors)} errors " + f"({error_rate:.2%})" + ) + + # A small number of transient 500s during rapid leadership + # transfers is acceptable. The internal retry budget can be + # exhausted if a transfer is slow to propagate. The important + # thing is that the error rate is low: the system recovers + # quickly and subsequent requests succeed. A real retry-path + # regression spikes well above 1%, so this catches meaningful + # breakage without too much CI noise. + assert error_rate < 0.01, ( + f"Error rate {error_rate:.2%} exceeds 1% threshold " + f"({len(errors)} errors in {total_requests} requests):\n" + + "\n".join(errors[:20]) + ) + + # Verify the system recovers after transfers complete: every + # node must be able to serve a basic read within a reasonable + # window. wait_until absorbs transient CI slowness while still + # catching real breakage. + def all_nodes_healthy(): + for node in self.redpanda.nodes: + r = self.sr_client.get_subjects( + hostname=node.account.hostname, timeout=req_timeout + ) + if r.status_code != 200: + return False + return True + + wait_until( + all_nodes_healthy, + timeout_sec=30, + backoff_sec=2, + err_msg="Schema registry did not recover on all nodes " + "after leadership transfers", + ) + + +class SchemaRegistryRpcTransportStressTest(SchemaRegistryTransportStressTest): + """ + RPC transport variant of the leadership transfer stress test. + """ + + def __init__(self, context: TestContext): + super().__init__( + context, + extra_rp_conf={"schema_registry_use_rpc": True}, + ) + + +class SchemaRegistryKafkaClientTransportStressTest(SchemaRegistryTransportStressTest): + """ + Kafka client transport variant of the leadership transfer stress test. + """ + + def __init__(self, context: TestContext): + super().__init__( + context, + extra_rp_conf={"schema_registry_use_rpc": False}, + ) From 9da449d0de3f6e24f24cf9bda1d8bc1564bfb7a8 Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Thu, 7 May 2026 18:54:41 -0700 Subject: [PATCH 11/11] dt/sr: Test transport compatibility across upgrade and runtime flips SchemaRegistryTransportCompatTest exercises cross-transport correctness on _schemas: kafka-client writes survive a rolling upgrade and replay under rpc, and runtime flips (rpc -> kafka -> rpc -> kafka) preserve content with writes under each transport. Both guard against drift between a record's embedded key.seq and its topic offset, which surfaces only on cross-transport replay. Signed-off-by: Oren Leiman --- tests/rptest/tests/schema_registry_test.py | 252 +++++++++++++++++++++ 1 file changed, 252 insertions(+) diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index 7ac66c1c86bed..d97a1a325a738 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -46,11 +46,17 @@ LoggingConfig, MetricsEndpoint, PandaproxyConfig, + PREV_VERSION_LOG_ALLOW_LIST, + RESTART_LOG_ALLOW_LIST, RedpandaService, ResourceSettings, SchemaRegistryConfig, SecurityConfig, ) +from rptest.services.redpanda_installer import ( + RedpandaInstaller, + wait_for_num_versions, +) from rptest.services.redpanda_types import SaslCredentials from rptest.services.serde_client import SerdeClient from rptest.tests.pandaproxy_test import PandaProxyTLSProvider, User @@ -11680,3 +11686,249 @@ def __init__(self, context: TestContext): context, extra_rp_conf={"schema_registry_use_rpc": False}, ) + + +class SchemaRegistryTransportCompatTest(RedpandaTest): + """ + Cross-transport correctness for the _schemas topic. + + General idea is to verify that RPC and kafka produce agree + on offset assignment. + """ + + # Last release line before the v26.2.1 SR rpc_transport gate. + INITIAL_LINE: tuple[int, int] = (26, 1) + + def __init__(self, test_context: TestContext): + super().__init__( + test_context=test_context, + num_brokers=3, + extra_rp_conf={"auto_create_topics_enabled": False}, + resource_settings=ResourceSettings(num_cpus=1), + log_config=log_config, + pandaproxy_config=PandaproxyConfig(), + schema_registry_config=SchemaRegistryConfig(), + ) + self.sr_client = SchemaRegistryRedpandaClient(redpanda=self.redpanda) + self.installer = self.redpanda._installer + + def setUp(self): + if self.test_context.function_name == "test_upgrade_kafka_to_rpc": + # released_versions is descending; first hit on INITIAL_LINE + # is the latest patch. We walk the list directly because dev + # builds report v0.0.0-dev, which makes the feature-line + # install path silently no-op. + candidates = [ + v + for v in self.installer.released_versions + if v[:2] == self.INITIAL_LINE + ] + assert candidates, ( + f"no v{self.INITIAL_LINE[0]}.{self.INITIAL_LINE[1]}.x in " + f"{self.installer.released_versions[:5]}" + ) + self.initial_version: tuple[int, int, int] = candidates[0] + self.installer.install(self.redpanda.nodes, self.initial_version) + super().setUp() + + def _register_schema(self, subject: str, record_name: str) -> int: + schema = json.dumps( + { + "schema": json.dumps( + { + "type": "record", + "name": record_name, + "fields": [{"name": "f1", "type": "string"}], + } + ) + } + ) + r = self.sr_client.post_subjects_subject_versions(subject=subject, data=schema) + assert r.status_code == 200, f"register {subject}: {r.status_code} {r.text}" + return r.json()["id"] + + def _read_schema(self, sid: int, hostname: str) -> str: + r = self.sr_client.get_schemas_ids_id(id=sid, hostname=hostname) + assert r.status_code == 200, ( + f"read schemas/ids/{sid} on {hostname}: {r.status_code} {r.text}" + ) + return r.json()["schema"] + + def _wait_for_sr_responsive(self, hostname: str) -> None: + def ok(): + try: + return ( + self.sr_client.get_subjects( + hostname=hostname, timeout=10 + ).status_code + == 200 + ) + except Exception: + return False + + wait_until( + ok, + timeout_sec=60, + backoff_sec=2, + err_msg=f"SR not responsive on {hostname}", + ) + + def _flip_transport(self, *, use_rpc: bool) -> None: + mode_log = ( + "Schema registry in RPC mode" + if use_rpc + else "Schema registry in Kafka client mode" + ) + # Snapshot per-node counts; require a strict increase after + # restart so a flip-back can't match an earlier boot's line. + pre_counts = { + node.name: self.redpanda.count_log_node(node, mode_log) + for node in self.redpanda.nodes + } + # needs_restart=yes; the rolling restart re-runs api::start. + self.redpanda.set_cluster_config( + {"schema_registry_use_rpc": use_rpc}, + expect_restart=True, + ) + self.redpanda.rolling_restart_nodes(self.redpanda.nodes) + for node in self.redpanda.nodes: + wait_until( + lambda n=node: self.redpanda.count_log_node(n, mode_log) + > pre_counts[n.name], + timeout_sec=30, + backoff_sec=2, + err_msg=f"{node.name} did not log a new {mode_log!r}", + ) + self._wait_for_sr_responsive(node.account.hostname) + + def _verify_phase( + self, + prior: list[tuple[int, str]], + prefix: str, + record_name: str, + n: int, + ) -> list[tuple[int, str]]: + """ + Read every `prior` schema on every node (cross-transport + replay), write `n` new schemas with strictly greater ids, + then read the new ones on every node. Return prior + new. + """ + for node in self.redpanda.nodes: + for sid, payload in prior: + got = self._read_schema(sid, node.account.hostname) + assert got == payload, ( + f"id {sid} on {node.name}: payload mismatch " + f"(expected={payload!r}, got={got!r})" + ) + + max_prior = max((sid for sid, _ in prior), default=0) + host = self.redpanda.nodes[0].account.hostname + new_writes: list[tuple[int, str]] = [] + for i in range(n): + sid = self._register_schema(f"sr-{prefix}-{i}", f"{record_name}{i}") + assert sid > max_prior, ( + f"new id {sid} <= max prior {max_prior}; " + f"loaded_offset reconstruction regressed" + ) + new_writes.append((sid, self._read_schema(sid, host))) + + for node in self.redpanda.nodes: + for sid, payload in new_writes: + got = self._read_schema(sid, node.account.hostname) + assert got == payload, ( + f"new id {sid} on {node.name}: payload mismatch " + f"(expected={payload!r}, got={got!r})" + ) + + return prior + new_writes + + @cluster( + num_nodes=3, + log_allow_list=RESTART_LOG_ALLOW_LIST + PREV_VERSION_LOG_ALLOW_LIST, + ) + @skip_fips_mode + def test_upgrade_kafka_to_rpc(self): + """ + kafka-client writes from a prior version survive a rolling upgrade and + replay correctly under the post-upgrade rpc transport; new + rpc-transport writes propagate to every node. + """ + initial_version_str = "v{}.{}.{}".format(*self.initial_version) + + # install() silently falls back to HEAD on dev builds; verify + # the cluster is actually on the prior version before writing. + unique_versions = wait_for_num_versions(self.redpanda, 1) + assert initial_version_str in unique_versions, ( + f"expected {initial_version_str}, got {unique_versions}" + ) + + for node in self.redpanda.nodes: + self._wait_for_sr_responsive(node.account.hostname) + + num_subjects = 4 + pre_ids = [ + self._register_schema(f"sr-upgrade-pre-{i}", f"PreRec{i}") + for i in range(num_subjects) + ] + + self.installer.install(self.redpanda.nodes, RedpandaInstaller.HEAD) + self.redpanda.rolling_restart_nodes(self.redpanda.nodes) + wait_for_num_versions(self.redpanda, 1) + + # api::start picks the transport once at process start; rolling- + # restart re-runs it now that the active version is past the + # v26.2.1 gate. + self.redpanda.rolling_restart_nodes(self.redpanda.nodes) + + for node in self.redpanda.nodes: + wait_until( + lambda n=node: self.redpanda.search_log_node( + n, "Schema registry in RPC mode" + ), + timeout_sec=30, + backoff_sec=2, + err_msg=f"{node.name} did not log RPC transport mode", + ) + + for node in self.redpanda.nodes: + self._wait_for_sr_responsive(node.account.hostname) + for sid in pre_ids: + self._read_schema(sid, node.account.hostname) + + post_ids = [ + self._register_schema(f"sr-upgrade-post-{i}", f"PostRec{i}") + for i in range(num_subjects) + ] + + # Schema ids are monotonic; collisions would mean state was lost. + max_pre = max(pre_ids) + for new_id in post_ids: + assert new_id > max_pre, ( + f"post id {new_id} <= max pre {max_pre} (pre={pre_ids} post={post_ids})" + ) + + for node in self.redpanda.nodes: + for sid in post_ids: + self._read_schema(sid, node.account.hostname) + + @cluster(num_nodes=3, log_allow_list=RESTART_LOG_ALLOW_LIST) + def test_transport_compatibility(self): + """ + runtime flips between rpc and kafka client transports preserve _schemas + content. Cycles RPC -> kafka -> RPC -> kafka, exercising both directions + of cross-transport replay with writes under each transport. + """ + for node in self.redpanda.nodes: + self._wait_for_sr_responsive(node.account.hostname) + + n = 4 + after_rpc1 = self._verify_phase([], "rpc1", "Rpc1Rec", n) + + self._flip_transport(use_rpc=False) + after_kafka1 = self._verify_phase(after_rpc1, "kafka1", "Kafka1Rec", n) + + self._flip_transport(use_rpc=True) + after_rpc2 = self._verify_phase(after_kafka1, "rpc2", "Rpc2Rec", n) + + self._flip_transport(use_rpc=False) + self._verify_phase(after_rpc2, "kafka2", "Kafka2Rec", n)