Skip to content
Merged
8 changes: 8 additions & 0 deletions src/v/config/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2000,6 +2000,14 @@ configuration::configuration()
"produce audit log messages using a Kafka client instead.",
{.needs_restart = needs_restart::yes, .visibility = visibility::tunable},
true)
, schema_registry_use_rpc(
*this,
"schema_registry_use_rpc",
"Use internal Redpanda RPCs for schema registry internal topic I/O. "
"When disabled, use a Kafka client for schema registry internal topic "
"I/O instead.",
{.needs_restart = needs_restart::yes, .visibility = visibility::tunable},
true)
, cloud_storage_enabled(
Comment on lines +2003 to 2011
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: default true for a new feature? This means all existing deployments will switch to the RPC transport path on upgrade (assuming cluster version is sufficient). While the feature-gating on v26_1_1 in api.cc is a good safety net for mixed-version clusters, defaulting to true is aggressive for a brand new code path.

Consider defaulting to false for the initial release and switching to true once the RPC path has been validated in production. This gives operators an opt-in experience rather than a surprise behavioral change on upgrade.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤠

*this,
true,
Expand Down
1 change: 1 addition & 0 deletions src/v/config/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ struct configuration final : public config_store {
property<std::vector<ss::sstring>> audit_excluded_principals;
enum_property<audit_failure_policy> audit_failure_policy;
property<bool> audit_use_rpc;
property<bool> schema_registry_use_rpc;

// Archival storage
enterprise<property<bool>> cloud_storage_enabled;
Expand Down
1 change: 0 additions & 1 deletion src/v/kafka/data/rpc/deps.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

#include "cluster/cluster_link/frontend.h"
#include "cluster/controller.h"
#include "cluster/fwd.h"
#include "cluster/metadata_cache.h"
#include "cluster/partition_manager.h"
#include "cluster/shard_table.h"
Expand Down
2 changes: 0 additions & 2 deletions src/v/kafka/data/rpc/deps.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
#include "kafka/data/rpc/serde.h"
#include "model/fundamental.h"
#include "model/ktp.h"
#include "model/metadata.h"
#include "model/transform.h"

#include <seastar/util/noncopyable_function.hh>

Expand Down
2 changes: 0 additions & 2 deletions src/v/kafka/data/rpc/serde.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@

#include <seastar/core/chunked_fifo.hh>

#include <iosfwd>

namespace kafka::data::rpc {

struct kafka_topic_data
Expand Down
1 change: 0 additions & 1 deletion src/v/kafka/data/rpc/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#include "kafka/data/partition_proxy.h"
#include "logger.h"
#include "model/ktp.h"
#include "model/metadata.h"
#include "model/record.h"
#include "model/record_batch_reader.h"
#include "model/timeout_clock.h"
Expand Down
1 change: 0 additions & 1 deletion src/v/kafka/data/rpc/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "kafka/data/rpc/deps.h"
#include "kafka/data/rpc/rpc_service.h"
#include "kafka/data/rpc/serde.h"
#include "model/fundamental.h"
#include "ssx/semaphore.h"

#include <seastar/core/chunked_fifo.hh>
Expand Down
1 change: 0 additions & 1 deletion src/v/kafka/data/rpc/test/kafka_data_rpc_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
*/

#include "kafka/data/rpc/test/deps.h"
#include "model/metadata.h"
#include "model/namespace.h"
#include "net/server.h"
#include "rpc/connection_cache.h"
Expand Down
26 changes: 24 additions & 2 deletions src/v/pandaproxy/schema_registry/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,27 @@ redpanda_cc_library(
],
)

redpanda_cc_library(
name = "rpc_transport",
srcs = [
"rpc_transport.cc",
],
hdrs = [
"rpc_transport.h",
],
implementation_deps = [
":exceptions",
"//src/v/kafka/protocol",
"//src/v/model",
"@seastar",
],
visibility = ["//visibility:public"],
deps = [
":transport",
"//src/v/kafka/data/rpc",
],
)

redpanda_cc_library(
name = "compatibility",
srcs = ["compatibility.cc"],
Expand Down Expand Up @@ -346,15 +367,16 @@ redpanda_cc_library(
":configuration",
":exceptions",
":kafka_client_transport",
":rpc_transport",
":seq_writer",
":server",
":store",
"//src/v/cluster:ephemeral_credential_frontend",
"//src/v/cluster:members_table",
"//src/v/cluster:node_status_table",
"//src/v/config",
"//src/v/features",
"//src/v/kafka/client:configuration",
"//src/v/kafka/data/rpc",
"//src/v/kafka/protocol:create_topics",
"//src/v/kafka/server:topic_config_utils",
"//src/v/metrics",
Expand All @@ -370,6 +392,7 @@ redpanda_cc_library(
":types",
"//src/v/base",
"//src/v/cluster",
"//src/v/kafka/data/rpc",
"//src/v/model",
"//src/v/security",
"@seastar",
Expand Down Expand Up @@ -415,7 +438,6 @@ redpanda_cc_library(
":avro",
":context_router",
":json_schema",
":kafka_client_transport",
":protobuf_schema",
":seq_writer",
":store",
Expand Down
155 changes: 140 additions & 15 deletions src/v/pandaproxy/schema_registry/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,14 @@
#include "cluster/cluster_link/frontend.h"
#include "cluster/controller.h"
#include "config/configuration.h"
#include "features/feature_table.h"
Comment thread
oleiman marked this conversation as resolved.
#include "kafka/client/configuration.h"
#include "kafka/data/rpc/client.h"
#include "kafka/data/rpc/deps.h"
#include "pandaproxy/logger.h"
#include "pandaproxy/schema_registry/configuration.h"
#include "pandaproxy/schema_registry/kafka_client_transport.h"
#include "pandaproxy/schema_registry/rpc_transport.h"
#include "pandaproxy/schema_registry/schema_id_cache.h"
#include "pandaproxy/schema_registry/seq_writer.h"
#include "pandaproxy/schema_registry/service.h"
Expand All @@ -29,9 +32,122 @@
#include <algorithm>
#include <functional>
#include <memory>
#include <variant>

namespace pandaproxy::schema_registry {

struct api::transport_impl {
transport_impl(
ss::sharded<kafka::data::rpc::client>* rpc_client,
kafka::client::configuration& client_cfg,
cluster::controller& controller)
: _rpc_client(rpc_client)
, _client_cfg(client_cfg)
, _controller(controller) {
if (!should_use_rpc(rpc_client, controller)) {
_v.emplace<ss::sharded<kafka_client_transport>>();
}
// else: _v default-constructs to sharded<rpc_transport>.
vlog(
srlog.info,
"Schema registry in {} mode",
std::holds_alternative<ss::sharded<rpc_transport>>(_v)
? "RPC"
: "Kafka client");
}

/// Decide whether to use the RPC transport at startup. Falls back to
/// the kafka::client transport (with a warning) if the config asks for
/// RPC but preconditions aren't met.
static bool should_use_rpc(
const ss::sharded<kafka::data::rpc::client>* rpc_client,
cluster::controller& controller) {
if (!config::shard_local_cfg().schema_registry_use_rpc()) {
return false;
}
if (!rpc_client) {
vlog(
srlog.info,
"schema_registry_use_rpc enabled but RPC client not available. "
"Falling back to Kafka client.");
return false;
}
const bool rpc_available
= controller.get_feature_table().local().get_active_version()
>= features::to_cluster_version(features::release_version::v26_2_1);
if (!rpc_available) {
vlog(
srlog.info,
"schema_registry_use_rpc enabled but cluster version too old. "
"Falling back to Kafka client. RPC mode will be available "
"on the next restart after all brokers are upgraded.");
return false;
Comment thread
oleiman marked this conversation as resolved.
}
return true;
}

ss::future<> start() {
return ss::visit(
_v,
[this](ss::sharded<rpc_transport>& t) {
return t.start(ss::sharded_parameter([this] {
return std::ref(_rpc_client->local());
}));
},
[this](ss::sharded<kafka_client_transport>& t) {
return t.start(
std::ref(_client_cfg),
std::ref(_controller),
ss::sharded_parameter([this] {
return kafka::data::rpc::topic_creator::make_default(
&_controller);
}));
});
}

/// Only kafka_client_transport has per-shard credentials to load; the
/// RPC path needs no configuration.
ss::future<> configure() {
return ss::visit(
_v,
[](ss::sharded<rpc_transport>&) { return ss::now(); },
[](ss::sharded<kafka_client_transport>& t) {
return t.invoke_on_all(&kafka_client_transport::configure);
});
}

ss::future<> invoke_stop_on_all() {
return ss::visit(_v, [](auto& t) {
return t.invoke_on_all([](auto& local) { return local.stop(); });
});
}

ss::future<> stop() {
return ss::visit(_v, [](auto& t) { return t.stop(); });
}

transport& local() {
return ss::visit(_v, [](auto& t) -> transport& { return t.local(); });
}

bool has_ephemeral_credentials() const {
return ss::visit(
_v,
[](const ss::sharded<rpc_transport>&) { return false; },
[](const ss::sharded<kafka_client_transport>& t) {
return t.local().has_ephemeral_credentials();
});
}

private:
std::
variant<ss::sharded<rpc_transport>, ss::sharded<kafka_client_transport>>
_v;
ss::sharded<kafka::data::rpc::client>* _rpc_client;
kafka::client::configuration& _client_cfg;
cluster::controller& _controller;
};

class sequence_state_checker_impl : public sequence_state_checker {
public:
explicit sequence_state_checker_impl(
Expand All @@ -56,15 +172,17 @@ api::api(
configuration& cfg,
ss::sharded<cluster::metadata_cache>* metadata_cache,
std::unique_ptr<cluster::controller>& c,
ss::sharded<security::audit::audit_log_manager>& audit_mgr) noexcept
ss::sharded<security::audit::audit_log_manager>& audit_mgr,
ss::sharded<kafka::data::rpc::client>* rpc_client) noexcept
: _node_id{node_id}
, _sg{sg}
, _max_memory{max_memory}
, _client_cfg{client_cfg}
, _cfg{cfg}
, _metadata_cache(metadata_cache)
, _controller(c)
, _audit_mgr(audit_mgr) {}
, _audit_mgr(audit_mgr)
, _rpc_client(rpc_client) {}

api::~api() noexcept = default;

Expand All @@ -83,17 +201,18 @@ ss::future<> api::start() {
return config::shard_local_cfg()
.kafka_schema_id_validation_cache_capacity.bind();
}));
co_await _transport.start(
std::ref(_client_cfg),
std::ref(*_controller),
ss::sharded_parameter([this]() {
return kafka::data::rpc::topic_creator::make_default(
_controller.get());
}));
// Build the transport locally and only install it into _transport
// on successful start. If start() throws, the local impl unwinds and
// the partially-initialized sharded<> goes with it.
auto impl = std::make_unique<transport_impl>(
_rpc_client, _client_cfg, *_controller);
co_await impl->start();
_transport = std::move(impl);
Comment thread
oleiman marked this conversation as resolved.

co_await _sequencer.start(
_node_id,
_sg,
ss::sharded_parameter([this] { return std::ref(_transport.local()); }),
ss::sharded_parameter([this] { return std::ref(_transport->local()); }),
std::ref(*_store),
ss::sharded_parameter([this] {
return std::make_unique<sequence_state_checker_impl>(_controller);
Expand All @@ -102,7 +221,7 @@ ss::future<> api::start() {
config::to_yaml(_cfg, config::redact_secrets::no),
_sg,
_max_memory,
ss::sharded_parameter([this] { return std::ref(_transport.local()); }),
ss::sharded_parameter([this] { return std::ref(_transport->local()); }),
std::ref(*_store),
std::ref(_sequencer),
ss::sharded_parameter([this]() {
Expand All @@ -112,7 +231,7 @@ ss::future<> api::start() {
std::ref(_controller),
std::ref(_audit_mgr));

co_await _transport.invoke_on_all(&kafka_client_transport::configure);
co_await _transport->configure();
co_await _service.invoke_on_all(&service::start);

if (ss::this_shard_id() == 0) {
Expand Down Expand Up @@ -141,10 +260,16 @@ ss::future<> api::stop() {
// Reset gate to support api restart
_metrics_gate = ss::gate{};
}
co_await _transport.invoke_on_all(&kafka_client_transport::stop);
if (_transport) {
co_await _transport->invoke_stop_on_all();
}
co_await _service.stop();
co_await _sequencer.stop();
co_await _transport.stop();
if (_transport) {
co_await _transport->stop();
_transport.reset();
}

co_await _schema_id_cache.stop();
co_await _schema_id_validation_probe.stop();
if (_store) {
Expand All @@ -166,7 +291,7 @@ const kafka::client::configuration& api::get_client_config() const {
}

bool api::has_ephemeral_credentials() const {
return _transport.local().has_ephemeral_credentials();
return _transport && _transport->has_ephemeral_credentials();
}

ss::future<> api::contribute_metrics(
Expand Down
Loading
Loading