Skip to content
21 changes: 21 additions & 0 deletions src/v/pandaproxy/schema_registry/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,27 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "rpc_transport",
srcs = [
"rpc_transport.cc",
],
hdrs = [
"rpc_transport.h",
],
implementation_deps = [
":exceptions",
"//src/v/kafka/protocol",
"//src/v/model",
"@seastar",
],
visibility = ["//visibility:public"],
deps = [
":transport",
"//src/v/kafka/data/rpc",
],
)

redpanda_cc_library(
name = "compatibility",
srcs = ["compatibility.cc"],
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,6 @@ class sharded_store;
class store;
class transport;
class kafka_client_transport;
class rpc_transport;

} // namespace pandaproxy::schema_registry
147 changes: 147 additions & 0 deletions src/v/pandaproxy/schema_registry/rpc_transport.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "pandaproxy/schema_registry/rpc_transport.h"

#include "kafka/data/rpc/client.h"
#include "kafka/protocol/exceptions.h"
#include "model/fundamental.h"
#include "model/namespace.h"
#include "pandaproxy/schema_registry/exceptions.h"

#include <seastar/core/coroutine.hh>

namespace pandaproxy::schema_registry {

namespace {

/// Translate cluster::errc from the rpc layer into a pp::sr or kafka
/// exception with the goal of fitting directly into existing error
/// handling logic in service & seq_writer. timeout & not_leader are
/// of particular interest since most error paths will treat those
/// as retryable. unknown_server_error is the catchall for other faults
/// and is generally surfaced to the API. For that reason, faults
/// unique to the RPC layer may now appear in 500 error bodies.
/// TODO: get rid of the kafka::exception dependency if possible and
/// make error handling at the next layer up generic to "transport".
[[noreturn]] void
throw_as_kafka_error(std::string_view context, cluster::errc ec) {
switch (ec) {
case cluster::errc::topic_not_exists:
throw exception(
kafka::error_code::unknown_server_error,
"_schemas topic does not exist");
case cluster::errc::not_leader:
throw kafka::exception(
kafka::error_code::not_leader_for_partition,
fmt::format("{}: {}", context, ec));
case cluster::errc::timeout:
throw kafka::exception(
kafka::error_code::request_timed_out,
fmt::format("{}: {}", context, ec));
default:
throw kafka::exception(
kafka::error_code::unknown_server_error,
fmt::format("{}: {}", context, ec));
}
}

} // namespace

rpc_transport::rpc_transport(kafka::data::rpc::client& client)
: _client(client) {}

ss::future<produce_result> rpc_transport::produce(model::record_batch batch) {
auto res = co_await _client.produce_with_leader_mitigation(
model::schema_registry_internal_tp, std::move(batch));
if (res.ec != cluster::errc::success) {
throw_as_kafka_error("RPC produce failed", res.ec);
}
vassert(
res.base_offset.has_value(),
"Possible RPC produce version mismatch (response incomplete)");
co_return produce_result{.base_offset = *res.base_offset};
}

ss::future<model::offset> rpc_transport::get_high_watermark() {
auto result = co_await _client.get_single_partition_offsets(
model::schema_registry_internal_tp);
if (result.has_error()) {
throw_as_kafka_error(
"RPC get_partition_offsets failed", result.error());
}
co_return kafka::offset_cast(result.value().high_watermark);
}

ss::future<> rpc_transport::consume_range(
model::offset start,
model::offset end,
ss::noncopyable_function<ss::future<ss::stop_iteration>(model::record_batch)>
consumer) {
// The RPC consume API may not return all records in a single call,
// so loop until we've consumed up to the desired end offset.

// Matches the Kafka transport default. KIP-74 obligatory batch
// makes oversized batch handling identical across transports for
// the single-partition _schemas topic.
constexpr size_t consume_max_bytes = 1_MiB;
constexpr auto consume_timeout = 5s;
auto current = start;
// Adapt the existing exclusive upper bound semantics of the old kafka
// reader to the inclusive upper bound semantics of the record batch reader
// in the RPC service.
auto end_inclusive = model::prev_offset(end);
while (current < end) {
auto result = co_await _client.consume(
model::schema_registry_internal_tp,
offset_cast(current),
offset_cast(end_inclusive),
1,
consume_max_bytes,
consume_timeout);
if (result.has_error()) {
throw_as_kafka_error("RPC consume failed", result.error());
}
auto& reply = result.value();
Comment thread
oleiman marked this conversation as resolved.
if (reply.err != cluster::errc::success) {
throw_as_kafka_error("RPC consume error", reply.err);
}
if (reply.batches.empty()) {
// TODO: map to a retryable error_code (e.g. leader_not_available)
// so service::do_start retries this. Note that the kclient version
// has this issue in client_fetch_batch_reader.cc. Fix both
// together.
throw kafka::exception(
kafka::error_code::unknown_server_error,
fmt::format(
"No records returned in range [{}, {})", current, end));
}
Comment thread
oleiman marked this conversation as resolved.
for (auto& batch : reply.batches) {
auto last = batch.last_offset();
auto stop = co_await consumer(std::move(batch));
current = model::next_offset(last);
if (stop == ss::stop_iteration::yes) {
co_return;
}
}
}
}

ss::future<cluster::errc> rpc_transport::create_topic(
model::topic_namespace_view tp_ns,
int32_t partition_count,
cluster::topic_properties properties,
int16_t replication_factor) {
co_return co_await _client.create_topic(
tp_ns, std::move(properties), partition_count, replication_factor);
}

} // namespace pandaproxy::schema_registry
51 changes: 51 additions & 0 deletions src/v/pandaproxy/schema_registry/rpc_transport.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include "kafka/data/rpc/fwd.h"
#include "pandaproxy/schema_registry/transport.h"

namespace pandaproxy::schema_registry {

/// Transport implementation that wraps kafka::data::rpc::client for schema
/// registry internal topic I/O.
///
/// TODO: Add transport-layer retry for topic_not_exists with backoff,
/// analogous to how kafka_client_transport gets SR-specific retry via
/// gated_retry_with_mitigation + mitigate_error. The k/d/rpc client is
/// general-purpose and shouldn't retry missing topics, but this transport
/// knows the _schemas topic must exist and can retry at the right layer.
/// Currently the do_start loop in service.cc handles this as a fallback.
Comment on lines +22 to +27
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this needed before GA?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debatable. comment is awkwardly worded in the sense that currently neither transport does this specific thing. if the topic doesn't exist that gets raised as an error. what i meant by "analogous" is just that we have custom SR-specific retry code baked into the kafka side.

so this isn't a deficit in the rpc transport. it would be a net improvement (IMO) that we could handle in common code.

class rpc_transport final : public transport {
public:
explicit rpc_transport(kafka::data::rpc::client& client);

ss::future<> stop() final { return ss::now(); }

ss::future<produce_result> produce(model::record_batch batch) override;
ss::future<model::offset> get_high_watermark() override;
ss::future<> consume_range(
model::offset start,
model::offset end,
ss::noncopyable_function<
ss::future<ss::stop_iteration>(model::record_batch)> consumer) override;
ss::future<cluster::errc> create_topic(
model::topic_namespace_view,
int32_t partition_count,
cluster::topic_properties,
int16_t replication_factor) override;

private:
kafka::data::rpc::client& _client;
};

} // namespace pandaproxy::schema_registry