diff --git a/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc b/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc index f8df65c453158..d15fed42d16c7 100644 --- a/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc +++ b/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc @@ -126,6 +126,7 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) { // Update the cluster config (via the controller, rather than shard local). cluster::config_update_request req; req.upsert.emplace_back("log_segment_size_jitter_percent", "1"); + req.upsert.emplace_back("log_segment_size", "2147483649"); app.controller->get_config_frontend() .local() .patch(std::move(req), model::timeout_clock::now() + 30s) @@ -228,6 +229,7 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) { raft0 = nullptr; restart(should_wipe::yes); task_local_cfg.get("log_segment_size_jitter_percent").reset(); + task_local_cfg.get("log_segment_size").reset(); RPTEST_REQUIRE_EVENTUALLY(5s, [this] { return app.storage.local().get_cluster_uuid().has_value(); }); @@ -239,6 +241,7 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) { .has_value()); ASSERT_NE( 1, config::shard_local_cfg().log_segment_size_jitter_percent.value()); + ASSERT_NE(2147483649, config::shard_local_cfg().log_segment_size.value()); ASSERT_TRUE(!app.controller->get_credential_store().local().contains( security::credential_user{"userguy"})); ASSERT_EQ( @@ -274,14 +277,26 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) { .is_recovery_active(); }); + bool has_restarted = false; // Validate the controller state is restored. auto validate_post_recovery = [&] { ASSERT_TRUE(app.controller->get_feature_table() .local() .get_configured_license() .has_value()); + // log_segment_size_jitter_percent is marked as needs_restart::yes. We + // won't see its recovered value reflected until the node is restarted. + auto log_segment_size_jitter_expected + = has_restarted ? 1 + : config::shard_local_cfg() + .log_segment_size_jitter_percent.default_value(); ASSERT_EQ( - 1, config::shard_local_cfg().log_segment_size_jitter_percent.value()); + log_segment_size_jitter_expected, + config::shard_local_cfg().log_segment_size_jitter_percent.value()); + // On the other hand, log_segment_size is marked as needs_restart::no, + // so we will see its value reflected immediately. + ASSERT_EQ( + 2147483649, config::shard_local_cfg().log_segment_size.value()); // Validate User restoration. ASSERT_TRUE(app.controller->get_credential_store().local().contains( @@ -339,6 +354,7 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) { // Sanity check that the above invariants still hold after restarting. restart(should_wipe::no); + has_restarted = true; RPTEST_REQUIRE_EVENTUALLY(5s, [this] { auto latest_recovery = app.controller->get_cluster_recovery_table() .local() diff --git a/src/v/cluster/cluster_discovery.cc b/src/v/cluster/cluster_discovery.cc index faeedf6db9586..ab41b2cae1431 100644 --- a/src/v/cluster/cluster_discovery.cc +++ b/src/v/cluster/cluster_discovery.cc @@ -19,7 +19,7 @@ #include "features/feature_table.h" #include "model/fundamental.h" #include "model/metadata.h" -#include "storage/api.h" +#include "storage/ntp_config.h" #include "utils/directory_walker.h" #include @@ -33,12 +33,12 @@ namespace cluster { cluster_discovery::cluster_discovery( const model::node_uuid& node_uuid, - storage::api& storage, + std::optional cluster_uuid, ss::abort_source& as) : _node_uuid(node_uuid) - , _join_retry_jitter(config::shard_local_cfg().join_retry_timeout_ms()) + , _cluster_uuid(std::move(cluster_uuid)) + , _join_retry_jitter(config::shard_local_cfg_unsafe().join_retry_timeout_ms()) , _join_timeout(std::chrono::seconds(2)) - , _storage(storage) , _as(as) {} ss::future @@ -115,7 +115,7 @@ ss::future cluster_discovery::is_cluster_founder() { } _founding_brokers = brokers{make_self_broker(config::node())}; _node_ids_by_uuid = node_ids_by_uuid{ - {_storage.node_uuid(), _founding_brokers.front().id()}}; + {_node_uuid, _founding_brokers.front().id()}}; _is_cluster_founder = true; co_return *_is_cluster_founder; } @@ -209,6 +209,67 @@ cluster_discovery::dispatch_node_uuid_registration_to_seeds() { co_return std::nullopt; } +ss::future> +cluster_discovery::fetch_controller_snapshot_from_leader( + const std::vector& peers) { + constexpr auto fetch_timeout = std::chrono::seconds(2); + for (const auto& broker : peers) { + const auto& addr = broker.rpc_address(); + vlog( + clusterlog.info, + "Fetching controller snapshot from {} ({})", + broker.id(), + addr); + result r( + fetch_controller_snapshot_reply{}); + try { + r = co_await do_with_client_one_shot( + addr, + config::node().rpc_server_tls(), + fetch_timeout, + rpc::transport_version::v2, + [fetch_timeout](controller_client_protocol c) { + return c + .fetch_controller_snapshot( + fetch_controller_snapshot_request{ + features::feature_table::get_earliest_logical_version(), + features::feature_table::get_latest_logical_version()}, + rpc::client_opts(rpc::clock_type::now() + fetch_timeout)) + .then(&rpc::get_ctx_data); + }); + } catch (...) { + vlog( + clusterlog.warn, + "Error fetching controller snapshot from {} ({}), retrying: {}", + broker.id(), + addr, + std::current_exception()); + continue; + } + if (r.has_error()) { + vlog( + clusterlog.warn, + "Error fetching controller snapshot from {} ({}): {}, retrying", + broker.id(), + addr, + r.error().message()); + continue; + } + auto& reply = r.value(); + if (!reply.controller_snapshot.has_value()) { + vlog( + clusterlog.debug, + "Peer {} ({}) not ready to produce controller snapshot, trying " + "next peer", + broker.id(), + addr); + continue; + } + co_return std::move(reply.controller_snapshot); + } + co_return std::nullopt; +} + ss::future cluster_discovery::request_cluster_bootstrap_info_single( net::unresolved_address addr) const { @@ -350,7 +411,7 @@ ss::future<> cluster_discovery::discover_founding_brokers() { _is_cluster_founder = false; co_return; } - if (_storage.get_cluster_uuid().has_value()) { + if (_cluster_uuid.has_value()) { _is_cluster_founder = false; co_return; } @@ -449,7 +510,7 @@ ss::future<> cluster_discovery::discover_founding_brokers() { vassert( broker.id() != model::unassigned_node_id, "Should have been assigned before"); - node_uuid = _storage.node_uuid(); + node_uuid = _node_uuid; } else { cluster::cluster_bootstrap_info_reply& reply = replies[seed.addr]; diff --git a/src/v/cluster/cluster_discovery.h b/src/v/cluster/cluster_discovery.h index 7672decb48a45..fa8b1c5369a59 100644 --- a/src/v/cluster/cluster_discovery.h +++ b/src/v/cluster/cluster_discovery.h @@ -20,11 +20,6 @@ #include #include -namespace storage { -class kvstore; -class api; -} // namespace storage - namespace cluster { struct cluster_bootstrap_info_reply; @@ -80,7 +75,9 @@ class cluster_discovery { }; cluster_discovery( - const model::node_uuid& node_uuid, storage::api&, ss::abort_source&); + const model::node_uuid& node_uuid, + std::optional cluster_uuid, + ss::abort_source&); // Register with the cluster: // - If we are a fresh cluster founder, broadcast to other founders @@ -124,6 +121,22 @@ class cluster_discovery { // \pre get_node_ids_by_uuid() has never been called node_ids_by_uuid& get_node_ids_by_uuid(); + // Fetch a fresh controller_join_snapshot from a peer. Used by + // restarting nodes (those that already have a node_id and are not + // going through register_with_cluster) so that bootstrap can apply + // the current cluster-config view to shard_local_cfg before any + // downstream service reads it. + // + // Iterates `peers` in order, returning the first valid response. + // The responder forwards to the controller leader if it is not the + // leader itself, so the result is leader-authoritative regardless + // of which peer answered. Returns nullopt if every peer fails or + // none have a snapshot ready; the caller falls through to whatever + // shard_local_cfg view was loaded from the local cache. + static ss::future> + fetch_controller_snapshot_from_leader( + const std::vector& peers); + private: // Sends requests to each seed server to register the local node UUID // until one succeeds. Returns nullopt if registration did not succeed. @@ -147,11 +160,11 @@ class cluster_discovery { ss::future<> discover_founding_brokers(); const model::node_uuid _node_uuid; + const std::optional _cluster_uuid; simple_time_jitter _join_retry_jitter; const std::chrono::milliseconds _join_timeout; std::optional _is_cluster_founder; - storage::api& _storage; ss::abort_source& _as; brokers _founding_brokers; node_ids_by_uuid _node_ids_by_uuid; diff --git a/src/v/cluster/config_manager.cc b/src/v/cluster/config_manager.cc index c7eb506c77090..8e2b5cb604114 100644 --- a/src/v/cluster/config_manager.cc +++ b/src/v/cluster/config_manager.cc @@ -12,7 +12,6 @@ #include "config_manager.h" #include "base/vlog.h" -#include "cluster/cluster_recovery_table.h" #include "cluster/config_frontend.h" #include "cluster/controller_service.h" #include "cluster/controller_snapshot.h" @@ -57,15 +56,13 @@ config_manager::config_manager( ss::sharded& cc, ss::sharded& pl, ss::sharded& mt, - ss::sharded& as, - ss::sharded& rt) + ss::sharded& as) : _self(*config::node().node_id()) , _frontend(cf) , _connection_cache(cc) , _leaders(pl) , _members(mt) - , _as(as) - , _recovery_table(rt) { + , _as(as) { if (ss::this_shard_id() == controller_stm_shard) { // Only the controller stm shard handles updates: leave these // members in default initialized state on other shards. @@ -148,7 +145,7 @@ ss::future<> config_manager::wait_for_bootstrap() { ss::future<> config_manager::do_bootstrap() { config_update_request update; - config::shard_local_cfg().for_each( + config::shard_local_cfg_unsafe().for_each( [&update](const config::base_property& p) { if (!p.is_default()) { json::StringBuffer buf; @@ -189,41 +186,7 @@ ss::future<> config_manager::do_bootstrap() { } } -namespace { - -/// needs_restart properties accumulate pending values via set_pending_value() -/// during apply_local(). They are promoted (pending -> active) at three points: -/// -/// 1. start() — after STM replay on node restart -/// preload() seeds active values from the config cache, then STM replay -/// stages any values newer than cache state as pending. start() promotes -/// them all once replay is complete. -/// -/// 2. apply_delta() — during initial bootstrap or cluster recovery -/// The node is fresh with no running workload, so pending values are -/// promoted immediately and the restart flag is cleared. -/// -/// 3. apply_snapshot() — after controller snapshot installation -/// Wholesale state replacement; pending values are promoted -/// unconditionally. May overlap with (2) when recovery is active, but -/// promote_pending() on a property with no pending value is a no-op. -/// -/// During normal operation (a delta applied to a running node outside -/// bootstrap/recovery), needs_restart properties stay pending until the node -/// restarts and hits promotion point 1. -ss::future<> promote_all_pending() { - co_await ss::smp::invoke_on_all( - [] { config::shard_local_cfg().promote_pending(); }); -} -} // namespace - ss::future<> config_manager::start() { - // Promote any pending values accumulated during STM replay. - // With a fresh config cache, this is a no-op (replay values match - // preloaded active values). With a stale cache, this promotes - // values that replay set as pending. - co_await promote_all_pending(); - if (_seen_version == config_version_unset) { vlog(clusterlog.trace, "Starting config_manager... (initial)"); @@ -329,7 +292,7 @@ static void preload_local( const ss::sstring& raw_value, std::optional> result) { - auto& cfg = config::shard_local_cfg(); + auto& cfg = config::shard_local_cfg_unsafe(); if (cfg.contains(key)) { auto& property = cfg.get(key); try { @@ -344,7 +307,7 @@ static void preload_local( // for needs_restart properties: preload runs before anything // reads the config, so values go straight into _value. // STM replay later calls set_pending_value for any updates - // beyond the cache, and start() promotes them. + // beyond the cache. if (result.has_value() && set) { vlog( @@ -376,7 +339,7 @@ static void preload_local( const YAML::Node& value, std::optional> result) { - auto& cfg = config::shard_local_cfg(); + auto& cfg = config::shard_local_cfg_unsafe(); if (cfg.contains(key)) { auto& property = cfg.get(key); std::string raw_value; @@ -445,7 +408,7 @@ config_manager::preload(const YAML::Node& legacy_config) { // to set something in redpanda.yaml and it's not working. if (legacy_config["redpanda"]) { const auto nag_properties - = config::shard_local_cfg().property_names_and_aliases(); + = config::shard_local_cfg_unsafe().property_names_and_aliases(); for (const auto& node : legacy_config["redpanda"]) { auto name = node.first.as(); if (nag_properties.contains(name)) { @@ -484,7 +447,7 @@ ss::future config_manager::load_bootstrap() { // This node has never seen a cluster configuration message. // Bootstrap configuration from local yaml file. - auto errors = config::shard_local_cfg().read_yaml(config); + auto errors = config::shard_local_cfg_unsafe().read_yaml(config); for (const auto& i : errors) { vlog( clusterlog.warn, @@ -494,18 +457,19 @@ ss::future config_manager::load_bootstrap() { } co_await ss::smp::invoke_on_all( - [&config] { config::shard_local_cfg().read_yaml(config); }); + [&config] { config::shard_local_cfg_unsafe().read_yaml(config); }); co_return true; } ss::future<> config_manager::load_legacy(const YAML::Node& legacy_config) { - co_await ss::smp::invoke_on_all( - [&legacy_config] { config::shard_local_cfg().load(legacy_config); }); + co_await ss::smp::invoke_on_all([&legacy_config] { + config::shard_local_cfg_unsafe().load(legacy_config); + }); // This node has never seen a cluster configuration message. // Bootstrap configuration from local yaml file. - auto errors = config::shard_local_cfg().load(legacy_config); + auto errors = config::shard_local_cfg_unsafe().load(legacy_config); // Report any invalid properties. Do not refuse to start redpanda, // as the properties will have been either ignored or clamped @@ -662,7 +626,7 @@ ss::future<> config_manager::reconcile_status() { */ config_manager::apply_result apply_local(const cluster_config_delta_cmd_data& data, bool silent) { - auto& cfg = config::shard_local_cfg(); + auto& cfg = config::shard_local_cfg_unsafe(); auto result = config_manager::apply_result{}; for (const auto& u : data.upsert) { if (!cfg.contains(u.key)) { @@ -850,7 +814,7 @@ void config_manager::merge_apply_result( */ ss::future<> config_manager::store_delta(const cluster_config_delta_cmd_data& data) { - auto& cfg = config::shard_local_cfg(); + auto& cfg = config::shard_local_cfg_unsafe(); for (const auto& u : data.upsert) { /// skip section @@ -946,7 +910,6 @@ config_manager::apply_delta(cluster_config_delta_cmd&& cmd_in) { _seen_version); co_return errc::success; } - const bool is_initial_bootstrap = _seen_version == config_version_unset; _seen_version = delta_version; // version_shard is chosen to match controller_stm_shard, so // our raft0 stm apply operations do not need a core jump to @@ -972,16 +935,6 @@ config_manager::apply_delta(cluster_config_delta_cmd&& cmd_in) { co_await ss::smp::invoke_on_all([&data] { apply_local(data, true); }); - // During initial bootstrap or cluster recovery, the node is fresh and - // has nothing to "restart" from — promote pending values immediately - // so needs_restart properties take effect. - if (is_initial_bootstrap || _recovery_table.local().is_recovery_active()) { - co_await promote_all_pending(); - // Pending values have been promoted, so any computed restart - // requirement no longer applies. - apply_r.restart = false; - } - // Merge results from this delta into our status. my_latest_status.version = delta_version; merge_apply_result(my_latest_status, data, apply_r); @@ -1095,11 +1048,6 @@ config_manager::apply_snapshot(model::offset, const controller_snapshot& snap) { ec.message(), ec)); } - - // Snapshot application is a wholesale state replacement — promote - // all pending values immediately so needs_restart properties take - // effect (e.g. during cluster recovery). - co_await promote_all_pending(); } } // namespace cluster diff --git a/src/v/cluster/config_manager.h b/src/v/cluster/config_manager.h index 59302ebdc1ddb..c97ba08c45fe2 100644 --- a/src/v/cluster/config_manager.h +++ b/src/v/cluster/config_manager.h @@ -54,8 +54,7 @@ class config_manager final { ss::sharded&, ss::sharded&, ss::sharded&, - ss::sharded&, - ss::sharded&); + ss::sharded&); // Preload early in startup, from bootstrap file or config cache static ss::future preload(const YAML::Node&); @@ -140,7 +139,6 @@ class config_manager final { ss::condition_variable _reconcile_wait; ss::sharded& _as; - ss::sharded& _recovery_table; ss::gate _gate; }; diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 6f67e4643003a..7b6b949f742c2 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -336,8 +336,7 @@ ss::future<> controller::start( std::ref(_connections), std::ref(_partition_leaders), std::ref(_members_table), - std::ref(_as), - std::ref(_recovery_table)); + std::ref(_as)); if (auto bucket_opt = get_configured_bucket(); bucket_opt.has_value()) { co_await _topic_mount_handler.start( diff --git a/src/v/cluster/controller.json b/src/v/cluster/controller.json index 02bc00aa4d937..d034ac00ce1ad 100644 --- a/src/v/cluster/controller.json +++ b/src/v/cluster/controller.json @@ -15,6 +15,11 @@ "input_type": "join_node_request", "output_type": "join_node_reply" }, + { + "name": "fetch_controller_snapshot", + "input_type": "fetch_controller_snapshot_request", + "output_type": "fetch_controller_snapshot_reply" + }, { "name": "update_node_configuration", "input_type": "configuration_update_request", diff --git a/src/v/cluster/controller_stm.cc b/src/v/cluster/controller_stm.cc index c2a2cb6586c53..60bd4b7104ce4 100644 --- a/src/v/cluster/controller_stm.cc +++ b/src/v/cluster/controller_stm.cc @@ -67,44 +67,6 @@ bool controller_stm::ready_to_snapshot() const { return _metrics_reporter_cluster_info.is_initialized(); } -ss::future> controller_stm::maybe_make_join_snapshot() { - // We do **not** check the controller_snapshots feature flag here: - // even if snapshotting in general is turned off, it is still safe - // to send snapshots to joining nodes: they will just ignore - // it if they don't want it. - - // This function is a very quick operation, but because the generic - // fill_snapshot methods are async, we must hold mutex+gate to ensure - // safety in the same way as the more general - // mux_state_stm::maybe_write_snapshot does - auto gate_holder = _gate.hold(); - auto write_snapshot_mtx_holder = co_await _write_snapshot_mtx.get_units(); - - if (!ready_to_snapshot()) { - vlog(clusterlog.debug, "skipping join snapshotting, not ready"); - co_return std::nullopt; - } - - // The various stms fill_snapshot methods expect a full controller - // snapshot, so we will partially populate this and then move - // out the parts we want for the join snapshot. - controller_snapshot snapshot; - - auto apply_mtx_holder = co_await _apply_mtx.get_units(); - model::offset last_applied = get_last_applied_offset(); - co_await std::get(_state).fill_snapshot(snapshot); - co_await std::get(_state).fill_snapshot(snapshot); - co_await std::get(_state).fill_snapshot(snapshot); - apply_mtx_holder.return_all(); - - co_return serde::to_iobuf( - controller_join_snapshot{ - .last_applied = last_applied, - .bootstrap = std::move(snapshot.bootstrap), - .features = std::move(snapshot.features), - .config = std::move(snapshot.config)}); -} - ss::future> controller_stm::maybe_make_snapshot(ssx::semaphore_units apply_mtx_holder) { auto started_at = ss::steady_clock_type::now(); @@ -171,6 +133,10 @@ ss::future<> controller_stm::apply_snapshot( // applying the rest of the snapshot. co_await std::get(_state).apply_snapshot( offset, snapshot); + // apply config_manager next so that downstream backends see a + // fresh shard_local_cfg during their own apply_snapshot work. + co_await std::get(_state).apply_snapshot( + offset, snapshot); // apply members early so that we have rpc clients to all cluster nodes. co_await std::get(_state).apply_snapshot( offset, snapshot); @@ -181,7 +147,6 @@ ss::future<> controller_stm::apply_snapshot( // apply everything else in no particular order. co_await ss::when_all( - std::get(_state).apply_snapshot(offset, snapshot), std::get(_state).apply_snapshot(offset, snapshot), std::get(_state).apply_snapshot( offset, snapshot), diff --git a/src/v/cluster/controller_stm.h b/src/v/cluster/controller_stm.h index 9515175d34087..d036fc0a2f096 100644 --- a/src/v/cluster/controller_stm.h +++ b/src/v/cluster/controller_stm.h @@ -94,8 +94,24 @@ class controller_stm final bool ready_to_snapshot() const; - /// Compose a mini-snapshot for joining nodes: this is a specialized - /// peer of the more general maybe_make_snapshot + /// Compose a mini-snapshot for joining or restarting nodes: this is + /// a specialized peer of the more general maybe_make_snapshot. + /// + /// The Backends template pack selects which controller_stm sub-stms + /// contribute to the snapshot via fill_snapshot. The unselected + /// sub-stms' corresponding fields on controller_join_snapshot are + /// left default-constructed (the receiver applies whatever's + /// present and ignores the rest). + /// + /// Typical specializations: + /// - for the + /// join_node response (newly-registering nodes need the + /// cluster_uuid, the feature table, and the cluster config). + /// - for the + /// fetch_controller_snapshot RPC, used by restarting nodes to + /// refresh shard_local_cfg and the feature table from the + /// controller leader before downstream services come up. + template ss::future> maybe_make_join_snapshot(); /** @@ -125,4 +141,48 @@ class controller_stm final inline constexpr ss::shard_id controller_stm_shard = 0; +template +ss::future> controller_stm::maybe_make_join_snapshot() { + // We do **not** check the controller_snapshots feature flag here: + // even if snapshotting in general is turned off, it is still safe + // to send snapshots to joining nodes: they will just ignore + // it if they don't want it. + // + // Hold the gate + write_snapshot_mtx for the same reason + // mux_state_stm::maybe_write_snapshot does — guarantee a coherent + // point-in-time view across backends. + auto gate_holder = _gate.hold(); + auto write_snapshot_mtx_holder = co_await _write_snapshot_mtx.get_units(); + + if (!ready_to_snapshot()) { + vlog(clusterlog.debug, "skipping join snapshotting, not ready"); + co_return std::nullopt; + } + + controller_snapshot snapshot; + + auto apply_mtx_holder = co_await _apply_mtx.get_units(); + model::offset last_applied = get_last_applied_offset(); + + // Build a sequential future chain calling fill_snapshot on each + // selected backend. Sequential because fill_snapshot mutations on + // the shared controller_snapshot must not race. + ss::future<> fill_fut = ss::now(); + auto call_fill = [&snapshot, &fill_fut](auto& backend) { + fill_fut = fill_fut.then( + [&backend, &snapshot] { return backend.fill_snapshot(snapshot); }); + }; + (call_fill(std::get(_state)), ...); + co_await std::move(fill_fut); + + apply_mtx_holder.return_all(); + + co_return serde::to_iobuf( + controller_join_snapshot{ + .last_applied = last_applied, + .bootstrap = std::move(snapshot.bootstrap), + .features = std::move(snapshot.features), + .config = std::move(snapshot.config)}); +} + } // namespace cluster diff --git a/src/v/cluster/members_manager.cc b/src/v/cluster/members_manager.cc index ab6690f23520b..7df28ee4298a7 100644 --- a/src/v/cluster/members_manager.cc +++ b/src/v/cluster/members_manager.cc @@ -97,7 +97,7 @@ ss::future<> members_manager::start(std::vector brokers) { _last_connection_update_offset = _raft0->get_latest_configuration_offset(); } else { - auto snapshot = read_members_from_kvstore(); + auto snapshot = read_members_from_kvstore(_storage.local().kvs()); brokers = std::move(snapshot.members); _last_connection_update_offset = snapshot.update_offset; } @@ -962,8 +962,12 @@ members_manager::make_join_node_success_reply(model::node_id id) { // Provide the joining node with a controller snapshot, so // that it may load correct configuration + feature table // before applying the controller log. - return _controller_stm.local().maybe_make_join_snapshot().then( - [id](std::optional snapshot) { + return _controller_stm.local() + .maybe_make_join_snapshot< + bootstrap_backend, + feature_backend, + config_manager>() + .then([id](std::optional snapshot) { vlog( clusterlog.debug, "Responding to node {} join with {} byte snapshot", @@ -1181,6 +1185,56 @@ auto members_manager::dispatch_rpc_to_leader( std::forward(f)); } +ss::future +members_manager::handle_fetch_controller_snapshot( + fetch_controller_snapshot_request req) { + vlog( + clusterlog.info, "Processing fetch_controller_snapshot request: {}", req); + + if (!_raft0->is_elected_leader()) { + vlog( + clusterlog.debug, + "Not the leader; dispatching fetch_controller_snapshot to leader " + "node"); + co_return co_await dispatch_rpc_to_leader( + _join_timeout, + [req, tout = rpc::clock_type::now() + _join_timeout]( + controller_client_protocol c) mutable { + return c + .fetch_controller_snapshot( + fetch_controller_snapshot_request(req), + rpc::client_opts(tout)) + .then(&rpc::get_ctx_data); + }) + .then([](result r) { + if (r.has_error()) { + vlog( + clusterlog.warn, + "Error dispatching fetch_controller_snapshot to leader: {}", + r.error().message()); + return fetch_controller_snapshot_reply{}; + } + return std::move(r.value()); + }) + .handle_exception([](const std::exception_ptr& e) { + vlog( + clusterlog.warn, + "Exception dispatching fetch_controller_snapshot to leader: " + "{}", + e); + return fetch_controller_snapshot_reply{}; + }); + } + + // This controller snapshot only carries the up-to-date feature table and + // cluster config state for restarting or joining nodes to establish a + // consistent view of the cluster-wide state early on in the bootstrap + // process. + auto snap = co_await _controller_stm.local() + .maybe_make_join_snapshot(); + co_return fetch_controller_snapshot_reply{std::move(snap)}; +} + ss::future> members_manager::replicate_new_node_uuid( const model::node_uuid& node_uuid, const std::optional& node_id) { @@ -1668,7 +1722,8 @@ ss::future<> members_manager::persist_members_in_kvstore(model::offset update_offset) { static const auto cluster_members_key = bytes::from_string( "cluster_members"); - auto current_members_snapshot = read_members_from_kvstore(); + auto current_members_snapshot = read_members_from_kvstore( + _storage.local().kvs()); if (current_members_snapshot.update_offset >= update_offset) { vlog( clusterlog.trace, @@ -1699,10 +1754,11 @@ members_manager::persist_members_in_kvstore(model::offset update_offset) { .members = std::move(brokers), .update_offset = update_offset})); } -members_manager::members_snapshot members_manager::read_members_from_kvstore() { +members_manager::members_snapshot +members_manager::read_members_from_kvstore(storage::kvstore& kvs) { static const auto cluster_members_key = bytes::from_string( "cluster_members"); - auto buffer = _storage.local().kvs().get( + auto buffer = kvs.get( storage::kvstore::key_space::controller, cluster_members_key); if (buffer) { return serde::from_iobuf(std::move(*buffer)); diff --git a/src/v/cluster/members_manager.h b/src/v/cluster/members_manager.h index ff49d7c0faa32..e509dd7acf1f0 100644 --- a/src/v/cluster/members_manager.h +++ b/src/v/cluster/members_manager.h @@ -152,6 +152,16 @@ class members_manager { ss::future> handle_join_request(const join_node_request r); + // Produce a controller_join_snapshot for a peer's + // fetch_controller_snapshot RPC, dispatching to the leader if this + // node is not the leader. The leader's view is authoritative; serving + // a follower's potentially-stale state is the bug this avoids. The + // reply's snapshot is std::nullopt on dispatch failure (no leader + // known, RPC error, etc.) so the client can fall through to the next + // seed. + ss::future + handle_fetch_controller_snapshot(fetch_controller_snapshot_request r); + // Applies a committed record batch, specializing handling based on the // batch type. ss::future apply_update(model::record_batch); @@ -252,6 +262,7 @@ class members_manager { ss::future make_join_node_success_reply(model::node_id id); +public: struct members_snapshot : serde::envelope< members_snapshot, @@ -261,6 +272,16 @@ class members_manager { model::offset update_offset; auto serde_fields() { return std::tie(members, update_offset); } }; + + // Reads the cluster-members snapshot persisted by + // persist_members_in_kvstore. Returns an empty snapshot if the key + // has never been written (e.g. on a brand-new node before its first + // membership-changing apply). Available pre-controller-start so that + // bootstrap can use the last-known peer set without depending on + // members_manager itself being constructed. + static members_snapshot read_members_from_kvstore(storage::kvstore&); + +private: /** * In order to be able to determine the current cluster configuration before * raft-0 log is replied or controller snapshot is applied we store @@ -268,7 +289,6 @@ class members_manager { * configuration changes. */ ss::future<> persist_members_in_kvstore(model::offset); - members_snapshot read_members_from_kvstore(); const std::vector _seed_servers; const model::broker _self; diff --git a/src/v/cluster/service.cc b/src/v/cluster/service.cc index e842cbaa3dbd7..3375ff2ddcbda 100644 --- a/src/v/cluster/service.cc +++ b/src/v/cluster/service.cc @@ -40,6 +40,7 @@ #include "model/timeout_clock.h" #include "rpc/connection_cache.h" #include "rpc/errc.h" +#include "ssx/sformat.h" #include #include @@ -87,36 +88,57 @@ service::service( , _quotas_frontend(quotas_frontend) , _cluster_link_frontend(cluster_link_frontend) {} -ss::future -service::join_node(join_node_request req, rpc::streaming_context&) { - cluster_version expect_version - = _feature_table.local().get_active_version(); - if (expect_version == invalid_version) { - // Feature table isn't initialized, fall back to requiring that - // joining node is as recent as this node. +namespace { + +/// Returns true if the requester's [earliest, latest] logical-version range +/// is compatible with our active cluster version, OR if +/// `config::node().upgrade_override_checks` is set. This gate is required for +/// RPCs that attempt to fetch a controller snapshot for application on the +/// requesting node. +bool is_request_logical_version_compatible( + const features::feature_table& ft, + cluster_version req_earliest, + cluster_version req_latest, + std::string_view rpc_label) { + cluster_version expect_version = ft.get_active_version(); + if (expect_version == cluster::invalid_version) { + // Feature table isn't initialized, fall back to requiring that the + // requester is as recent as this node. expect_version = features::feature_table::get_latest_logical_version(); } - if ( - (req.earliest_logical_version != cluster::invalid_version - && req.earliest_logical_version > expect_version) - || (req.latest_logical_version != cluster::invalid_version && req.latest_logical_version < expect_version)) { - // Our active version is outside the range of versions the - // joining node is compatible with. - bool permit_join = config::node().upgrade_override_checks(); - vlog( - clusterlog.warn, - "{}join request from incompatible node {}, our version {} vs " - "their {}-{}", - permit_join ? "" : "Rejecting ", - req.node, - expect_version, + const bool incompatible + = (req_earliest != cluster::invalid_version + && req_earliest > expect_version) + || (req_latest != cluster::invalid_version && req_latest < expect_version); + + if (!incompatible) { + return true; + } + + const bool permit = config::node().upgrade_override_checks(); + vlog( + clusterlog.warn, + "{}{} from incompatible node, our version {} vs their {}-{}", + permit ? "" : "Rejecting ", + rpc_label, + expect_version, + req_earliest, + req_latest); + return permit; +} + +} // namespace + +ss::future +service::join_node(join_node_request req, rpc::streaming_context&) { + if (!is_request_logical_version_compatible( + _feature_table.local(), req.earliest_logical_version, - req.latest_logical_version); - if (!permit_join) { - return ss::make_ready_future(join_node_reply{ - join_node_reply::status_code::incompatible, model::node_id{-1}}); - } + req.latest_logical_version, + ssx::sformat("join request from node {}", req.node))) { + return ss::make_ready_future(join_node_reply{ + join_node_reply::status_code::incompatible, model::node_id{-1}}); } return ss::with_scheduling_group( @@ -142,6 +164,28 @@ service::join_node(join_node_request req, rpc::streaming_context&) { }); } +ss::future service::fetch_controller_snapshot( + fetch_controller_snapshot_request req, rpc::streaming_context&) { + if (!is_request_logical_version_compatible( + _feature_table.local(), + req.earliest_logical_version, + req.latest_logical_version, + "fetch_controller_snapshot request")) { + return ss::make_ready_future( + fetch_controller_snapshot_reply{}); + } + + return ss::with_scheduling_group( + get_scheduling_group(), [this, req]() mutable { + return _members_manager.invoke_on( + members_manager::shard, + get_smp_service_group(), + [req](members_manager& mm) mutable { + return mm.handle_fetch_controller_snapshot(req); + }); + }); +} + ss::future service::create_topics(create_topics_request r, rpc::streaming_context&) { return ss::with_scheduling_group( diff --git a/src/v/cluster/service.h b/src/v/cluster/service.h index 43769a4ef3d55..a4c757f7ca825 100644 --- a/src/v/cluster/service.h +++ b/src/v/cluster/service.h @@ -52,6 +52,9 @@ class service : public controller_service { virtual ss::future join_node(join_node_request, rpc::streaming_context&) override; + ss::future fetch_controller_snapshot( + fetch_controller_snapshot_request, rpc::streaming_context&) final; + virtual ss::future create_topics(create_topics_request, rpc::streaming_context&) override; diff --git a/src/v/cluster/types.h b/src/v/cluster/types.h index 1883ed99ce5e1..c85bef6be80a4 100644 --- a/src/v/cluster/types.h +++ b/src/v/cluster/types.h @@ -381,6 +381,78 @@ struct join_node_reply } }; +/// Request a fresh controller_join_snapshot from a peer. Issued by +/// restarting nodes (those that already have a node_id and are not +/// going through register_with_cluster) so that bootstrap can apply +/// the current cluster-config view to shard_local_cfg before any +/// downstream service reads it. +/// +/// The responder forwards to the controller leader if it is not the +/// leader itself, so the snapshot is leader-authoritative regardless +/// of which seed the client happened to ask. Empty payload — the +/// request carries no requester identification; mTLS at the +/// connection layer is the only access control. +struct fetch_controller_snapshot_request + : serde::envelope< + fetch_controller_snapshot_request, + serde::version<0>, + serde::compat_version<0>> { + fetch_controller_snapshot_request() noexcept = default; + + fetch_controller_snapshot_request( + cluster_version earliest, cluster_version latest) + : earliest_logical_version(earliest) + , latest_logical_version(latest) {} + + // The lowest logical version the requester supports; its feature + // table is already initialized to at least this version. + cluster_version earliest_logical_version{cluster::invalid_version}; + // The highest logical version the requester supports. + cluster_version latest_logical_version{cluster::invalid_version}; + + friend bool operator==( + const fetch_controller_snapshot_request&, + const fetch_controller_snapshot_request&) = default; + + fmt::iterator format_to(fmt::iterator it) const { + return fmt::format_to( + it, + "fetch_controller_snapshot_request{{logical_version {}-{}}}", + earliest_logical_version, + latest_logical_version); + } + + auto serde_fields() { + return std::tie(earliest_logical_version, latest_logical_version); + } +}; + +struct fetch_controller_snapshot_reply + : serde::envelope< + fetch_controller_snapshot_reply, + serde::version<0>, + serde::compat_version<0>> { + fetch_controller_snapshot_reply() noexcept = default; + + explicit fetch_controller_snapshot_reply(std::optional snap) + : controller_snapshot(std::move(snap)) {} + + /// The serialized controller_join_snapshot, or nullopt if the + /// responder is not yet ready to produce one (e.g. still bootstrapping). + std::optional controller_snapshot; + + fmt::iterator format_to(fmt::iterator it) const { + return fmt::format_to( + it, + "snap {}", + controller_snapshot.has_value() + ? controller_snapshot.value().size_bytes() + : 0); + } + + auto serde_fields() { return std::tie(controller_snapshot); } +}; + struct configuration_update_request : serde::envelope< configuration_update_request, diff --git a/src/v/config/configuration.cc b/src/v/config/configuration.cc index 5ac089f41481f..450cfcbb4c05d 100644 --- a/src/v/config/configuration.cc +++ b/src/v/config/configuration.cc @@ -4981,8 +4981,18 @@ std::unique_ptr make_config() { } } -configuration& shard_local_cfg() { +configuration& shard_local_cfg_unsafe() { static thread_local std::unique_ptr cfg = make_config(); return *cfg; } + +configuration& shard_local_cfg() { + auto& cfg = shard_local_cfg_unsafe(); + vassert( + cfg.is_ready(), + "Used shard_local_cfg() before it was marked as ready by the startup " + "process"); + return cfg; +} + } // namespace config diff --git a/src/v/config/configuration.h b/src/v/config/configuration.h index 62da9981137f5..24d7bafd76252 100644 --- a/src/v/config/configuration.h +++ b/src/v/config/configuration.h @@ -846,6 +846,9 @@ struct configuration final : public config_store { development_feature_property development_feature_property_testing_only; + void mark_ready(bool ready) { _ready = ready; } + bool is_ready() const { return _ready; } + private: // to query if experimental features are enabled in order to log a nag. it // does not use the query to control any experimental feature. @@ -874,6 +877,8 @@ struct configuration final : public config_store { } ss::sstring store_name() const override { return "cluster"; } + + bool _ready{true}; }; template @@ -884,6 +889,21 @@ bool development_feature_property::development_features_enabled( std::unique_ptr make_config(); +// Accessors for the shard local cluster configuration. +// shard_local_cfg() contains a vassert which strictly enforces that the +// configuration has been marked as ready by the bootstrap process after we have +// successfully preloaded from our node local state, and furthermore, recieved +// an up to date snapshot from the controller leader of the cluster wide view of +// the configuration. +// However, some configurations are somewhat required to be accessed in the +// bootstrapping process before this can happen. For that reason, we provide a +// shard_local_cfg_unsafe() accessor, which does not perform this same check. +// It is highly recommended that use of this is kept to a minimum, and if you do +// need to use it, that you are confident that using a potentially stale +// configuration value during the bootstrapping process will not have any +// ramifications on the behavior of the system after its value is updated during +// bootstrapping. configuration& shard_local_cfg(); +configuration& shard_local_cfg_unsafe(); } // namespace config diff --git a/src/v/config/node_config.cc b/src/v/config/node_config.cc index e25cebd51bd3b..3a6f1e522288f 100644 --- a/src/v/config/node_config.cc +++ b/src/v/config/node_config.cc @@ -341,7 +341,7 @@ node_config::error_map_t node_config::load(const YAML::Node& root_node) { throw std::invalid_argument("'redpanda' root is required"); } - auto ignore = shard_local_cfg().property_names_and_aliases(); + auto ignore = shard_local_cfg_unsafe().property_names_and_aliases(); auto errors = config_store::read_yaml(root_node["redpanda"]); validate_multi_node_property_config(errors); diff --git a/src/v/config/tls_config.cc b/src/v/config/tls_config.cc index 9009ef3d36a4c..3dc7337fced2b 100644 --- a/src/v/config/tls_config.cc +++ b/src/v/config/tls_config.cc @@ -63,7 +63,7 @@ bool ssl_clean_room(auto func) { ss::future> tls_config::get_credentials_builder() const& { if (_enabled) { - const auto& cfg = config::shard_local_cfg(); + const auto& cfg = config::shard_local_cfg_unsafe(); auto builder = co_await net::get_credentials_builder({ .truststore = _truststore_file.transform( [](auto& f) { return net::certificate(std::filesystem::path(f)); }), diff --git a/src/v/features/feature_table.cc b/src/v/features/feature_table.cc index f278f73c9f93e..603a3120356c4 100644 --- a/src/v/features/feature_table.cc +++ b/src/v/features/feature_table.cc @@ -139,6 +139,8 @@ std::string_view to_string_view(feature f) { return "cloud_topics"; case feature::tiered_cloud_topics: return "tiered_cloud_topics"; + case feature::fetch_controller_snapshot_rpc: + return "fetch_controller_snapshot_rpc"; /* * testing features @@ -384,7 +386,9 @@ feature_table::feature_table() { } } } +} +void feature_table::setup_metrics() { _probe = std::make_unique(*this); _probe->setup_metrics(); } @@ -824,7 +828,7 @@ feature_table::decode_version_fence(model::record_batch batch) { void feature_table::set_original_version(cluster::cluster_version v) { _original_version = v; if (v != cluster::invalid_version) { - config::shard_local_cfg().notify_original_version( + config::shard_local_cfg_unsafe().notify_original_version( config::legacy_version{v}); } } diff --git a/src/v/features/feature_table.h b/src/v/features/feature_table.h index 1e28eed824e1a..5588ac5cb377f 100644 --- a/src/v/features/feature_table.h +++ b/src/v/features/feature_table.h @@ -57,6 +57,7 @@ enum class feature : std::uint64_t { cloud_topics = 1ULL << 13U, tiered_cloud_topics = 1ULL << 14U, batch_mirror_topic_status = 1ULL << 15U, + fetch_controller_snapshot_rpc = 1ULL << 16U, node_isolation = 1ULL << 19U, group_offset_retention = 1ULL << 20U, membership_change_controller_cmds = 1ULL << 22U, @@ -565,6 +566,12 @@ inline constexpr std::array feature_schema{ feature::batch_mirror_topic_status, feature_spec::available_policy::always, feature_spec::prepare_policy::always}, + feature_spec{ + release_version::v26_2_1, + "fetch_controller_snapshot_rpc", + feature::fetch_controller_snapshot_rpc, + feature_spec::available_policy::always, + feature_spec::prepare_policy::always}, }; std::string_view to_string_view(feature); @@ -662,7 +669,7 @@ class feature_table { std::optional resolve_name(std::string_view feature_name) const; void set_license(security::license license); - + void setup_metrics(); /// Sets the builtin trial license based on the cluster creation time void set_builtin_trial_license(model::timestamp cluster_creation_timestamp); diff --git a/src/v/kafka/client/test/fixture.h b/src/v/kafka/client/test/fixture.h index f7b7bde56a754..4dd040a4fc811 100644 --- a/src/v/kafka/client/test/fixture.h +++ b/src/v/kafka/client/test/fixture.h @@ -31,9 +31,14 @@ class kafka_client_fixture : public redpanda_thread_fixture { auto& config = config::shard_local_cfg(); config.get("disable_metrics").set_value(false); }).get(); + app.wire_up_and_start_crypto_services(); + app.wire_up_bootstrap_services(); + app.hydrate_cluster_config(make_minimal_cfg()); + app.bootstrap_from_kvstore(); + app.wire_up_and_start_rpc_service(); + app.establish_cluster_view(); app.initialize(proxy_config(), proxy_client_config()); app.check_environment(); - app.wire_up_and_start_crypto_services(); app.wire_up_and_start(*app_signal, test_mode); } diff --git a/src/v/kafka/server/tests/topic_recreate_test.cc b/src/v/kafka/server/tests/topic_recreate_test.cc index 06f8f449f4785..09ce3554bd254 100644 --- a/src/v/kafka/server/tests/topic_recreate_test.cc +++ b/src/v/kafka/server/tests/topic_recreate_test.cc @@ -137,9 +137,14 @@ class recreate_test_fixture : public redpanda_thread_fixture { auto& config = config::shard_local_cfg(); config.get("disable_metrics").set_value(false); }).get(); + app.wire_up_and_start_crypto_services(); + app.wire_up_bootstrap_services(); + app.hydrate_cluster_config(make_minimal_cfg()); + app.bootstrap_from_kvstore(); + app.wire_up_and_start_rpc_service(); + app.establish_cluster_view(); app.initialize(proxy_config(), proxy_client_config()); app.check_environment(); - app.wire_up_and_start_crypto_services(); app.wire_up_and_start(*app_signal, true); } }; diff --git a/src/v/net/BUILD b/src/v/net/BUILD index 8f3ffae66b9e0..e924847140873 100644 --- a/src/v/net/BUILD +++ b/src/v/net/BUILD @@ -71,6 +71,7 @@ redpanda_cc_library( "//src/v/ssx:mutex", "//src/v/ssx:semaphore", "//src/v/ssx:sformat", + "//src/v/utils:adjustable_semaphore", "//src/v/utils:inet_address", "//src/v/utils:log_hist", "//src/v/utils:named_type", diff --git a/src/v/net/probes.cc b/src/v/net/probes.cc index c16adb91e1462..88c69b9d59fa4 100644 --- a/src/v/net/probes.cc +++ b/src/v/net/probes.cc @@ -14,6 +14,7 @@ #include "net/client_probe.h" #include "net/server_probe.h" #include "net/tls_certificate_probe.h" +#include "net/types.h" #include "ssx/sformat.h" #include @@ -238,12 +239,13 @@ build_reloadable_server_credentials_with_probe( std::move(cb)); } +namespace { + template -ss::future> build_reloadable_credentials_with_probe( - ss::tls::credentials_builder builder, - ss::sstring area, - ss::sstring detail, - ss::tls::reload_callback cb) { +ss::future< + std::pair, ss::lw_shared_ptr>> +build_reloadable_credentials_and_probe( + ss::tls::credentials_builder builder, ss::tls::reload_callback cb) { auto probe = ss::make_lw_shared(); auto wrap_cb = [probe, cb = std::move(cb)]( @@ -259,15 +261,49 @@ ss::future> build_reloadable_credentials_with_probe( ss::shared_ptr cred; if constexpr (std::is_same::value) { - cred = co_await builder.build_reloadable_server_credentials(wrap_cb); + cred = co_await builder.build_reloadable_server_credentials( + std::move(wrap_cb)); } else { cred = co_await builder.build_reloadable_certificate_credentials( - wrap_cb); + std::move(wrap_cb)); } - probe->setup_metrics(std::move(area), std::move(detail)); probe->loaded(*cred, nullptr, builder.get_trust_file_blob()); - co_return cred; + co_return std::pair{std::move(cred), std::move(probe)}; +} + +} // namespace + +ss::future +build_reloadable_server_credentials_with_deferred_probe( + config::tls_config config, ss::tls::reload_callback cb) { + auto builder = co_await config.get_credentials_builder(); + if (!builder) { + co_return server_credentials_with_deferred_probe{}; + } + auto [cred, probe] = co_await build_reloadable_credentials_and_probe< + ss::tls::server_credentials>(std::move(*builder), std::move(cb)); + co_return server_credentials_with_deferred_probe{ + .credentials = std::move(cred), + .probe = std::move(probe), + }; +} + +template +ss::future> build_reloadable_credentials_with_probe( + ss::tls::credentials_builder builder, + ss::sstring area, + ss::sstring detail, + ss::tls::reload_callback cb) { + auto [cred, probe] = co_await build_reloadable_credentials_and_probe( + std::move(builder), std::move(cb)); + probe->setup_metrics( + std::move(area), + std::move(detail), + net::metrics_disabled(config::shard_local_cfg().disable_metrics), + net::public_metrics_disabled( + config::shard_local_cfg().disable_public_metrics)); + co_return std::move(cred); } template ss::future> @@ -341,7 +377,10 @@ void tls_certificate_probe::loaded( } void tls_certificate_probe::setup_metrics( - std::string_view area, std::string_view detail) { + std::string_view area, + std::string_view detail, + net::metrics_disabled metrics_disabled, + net::public_metrics_disabled public_metrics_disabled) { if (ss::this_shard_id() != 0) { return; } @@ -417,19 +456,21 @@ void tls_certificate_probe::setup_metrics( return defs; }; - if (!config::shard_local_cfg().disable_metrics()) { + if (!metrics_disabled && !_metrics_setup) { const auto aggregate_labels = config::shard_local_cfg().aggregate_metrics() ? std::vector{sm::shard_label} : std::vector{}; _metrics.add_group( prometheus_sanitize::metrics_name("tls"), setup(aggregate_labels)); + _metrics_setup = true; } - if (!config::shard_local_cfg().disable_public_metrics()) { + if (!public_metrics_disabled && !_public_metrics_setup) { _public_metrics.add_group( prometheus_sanitize::metrics_name("tls"), setup(std::vector{sm::shard_label})); + _public_metrics_setup = true; } } diff --git a/src/v/net/server.cc b/src/v/net/server.cc index 4dd86039ff48f..2055ad10895b5 100644 --- a/src/v/net/server.cc +++ b/src/v/net/server.cc @@ -36,7 +36,7 @@ namespace net { server::server(server_configuration c, ss::logger& log) : cfg(std::move(c)) , _log(log) - , _memory{size_t{static_cast(cfg.max_service_memory_per_core)}, "net/server-mem"} + , _memory{static_cast(cfg.max_service_memory_per_core), "net/server-mem"} , _probe(std::make_unique()) { vlog( _log.info, "Creating net::server for {} with config {}", cfg.name, cfg); @@ -48,15 +48,7 @@ server::server(ss::sharded* s, ss::logger& log) server::~server() = default; void server::start() { - if (!cfg.disable_metrics) { - setup_metrics(); - _probe->setup_metrics(_metrics, cfg.name.c_str()); - } - - if (!cfg.disable_public_metrics) { - setup_public_metrics(); - _probe->setup_public_metrics(_public_metrics, cfg.name.c_str()); - } + setup_metrics(); if (cfg.connection_rate_bindings) { connection_rate_bindings.emplace(cfg.connection_rate_bindings.value()); @@ -315,6 +307,9 @@ ss::future<> server::wait_for_shutdown() { } ss::future<> server::stop() { + _metrics.clear(); + _public_metrics.clear(); + // if shutdown input was already requested this method is nop, user has to // wait explicitly for shutdown to finish with `wait_for_shutdown` if (_as.abort_requested()) { @@ -325,18 +320,47 @@ ss::future<> server::stop() { return wait_for_shutdown(); } +void server::set_memory_capacity(uint64_t new_capacity) { + _memory.set_capacity(new_capacity); +} + +void server::set_metrics( + net::metrics_disabled disable_metrics, + net::public_metrics_disabled disable_public_metrics) { + cfg.disable_metrics = disable_metrics; + cfg.disable_public_metrics = disable_public_metrics; +} + void server::setup_metrics() { + setup_internal_metrics(); + setup_public_metrics(); + for (const auto& ep : cfg.addrs) { + if (ep.tls_probe) { + ep.tls_probe->setup_metrics( + "rpc", ep.name, cfg.disable_metrics, cfg.disable_public_metrics); + } + } +} + +void server::setup_internal_metrics() { + if (cfg.disable_metrics) { + return; + } + + if (_metrics_setup) { + return; + } namespace sm = ss::metrics; _metrics.add_group( prometheus_sanitize::metrics_name(cfg.name), {sm::make_total_bytes( "max_service_mem_bytes", - [this] { return cfg.max_service_memory_per_core; }, + [this] { return _memory.capacity(); }, sm::description( ssx::sformat("{}: Maximum memory allowed for RPC", cfg.name))), sm::make_total_bytes( "consumed_mem_bytes", - [this] { return cfg.max_service_memory_per_core - _memory.current(); }, + [this] { return _memory.outstanding(); }, sm::description( ssx::sformat( "{}: Memory consumed by request processing", cfg.name))), @@ -344,9 +368,18 @@ void server::setup_metrics() { "dispatch_handler_latency", [this] { return _hist.internal_histogram_logform(); }, sm::description(ssx::sformat("{}: Latency ", cfg.name)))}); + _probe->setup_metrics(_metrics, cfg.name.c_str()); + _metrics_setup = true; } void server::setup_public_metrics() { + if (cfg.disable_public_metrics) { + return; + } + + if (_public_metrics_setup) { + return; + } namespace sm = ss::metrics; std::string_view server_name(cfg.name); @@ -365,6 +398,8 @@ void server::setup_public_metrics() { {server_label(server_name)}, [this] { return _hist.public_histogram_logform(); }) .aggregate({sm::shard_label})}); + _probe->setup_public_metrics(_public_metrics, cfg.name.c_str()); + _public_metrics_setup = true; } fmt::iterator server_configuration::format_to(fmt::iterator it) const { diff --git a/src/v/net/server.h b/src/v/net/server.h index 8001d08dba884..44f8b84454096 100644 --- a/src/v/net/server.h +++ b/src/v/net/server.h @@ -17,8 +17,10 @@ #include "net/conn_quota.h" #include "net/connection.h" #include "net/connection_rate.h" +#include "net/tls_certificate_probe.h" #include "net/types.h" #include "ssx/semaphore.h" +#include "utils/adjustable_semaphore.h" #include "utils/log_hist.h" #include @@ -44,6 +46,13 @@ struct server_endpoint { ss::sstring name; ss::socket_address addr; ss::shared_ptr credentials; + // Optional handle to the probe associated with `credentials`, used so that + // metrics registration can be performed after the endpoint is constructed + // (e.g. when the probe is created during early bootstrap, before + // shard_local_cfg() is ready). The wrap_cb captured by `credentials` is + // the primary owner; this is a co-owning reference for late metric + // registration. + ss::lw_shared_ptr tls_probe{nullptr}; server_endpoint(ss::sstring name, ss::socket_address addr) : name(std::move(name)) @@ -134,18 +143,25 @@ class server { */ ss::future<> stop(); - const server_configuration cfg; // NOLINT + server_configuration cfg; // NOLINT const hist_t& histogram() const { return _hist; } virtual std::string_view name() const = 0; virtual ss::future<> apply(ss::lw_shared_ptr) = 0; server_probe& probe() { return *_probe; } - ssx::semaphore& memory() { return _memory; } + ssx::semaphore& memory() { return _memory.underlying(); } ss::gate& conn_gate() { return _conn_gate; } hist_t& hist() { return _hist; } ss::abort_source& abort_source() { return _as; } bool abort_requested() const { return _as.abort_requested(); } + void set_memory_capacity(uint64_t new_capacity); + void set_metrics( + net::metrics_disabled disable_metrics, + net::public_metrics_disabled disable_public_metrics); + +protected: + void setup_metrics(); private: struct listener { @@ -166,11 +182,11 @@ class server { print_exceptional_future(ss::future<>, const char*, ss::socket_address); ss::future<> apply_proto(ss::lw_shared_ptr, conn_quota::units); - void setup_metrics(); + void setup_internal_metrics(); void setup_public_metrics(); ss::logger& _log; - ssx::semaphore _memory; + adjustable_semaphore _memory; std::vector> _listeners; boost::intrusive::list _connections; ss::abort_source _as; @@ -180,6 +196,8 @@ class server { std::unique_ptr _probe; metrics::internal_metric_groups _metrics; metrics::public_metric_groups _public_metrics; + bool _metrics_setup{false}; + bool _public_metrics_setup{false}; std::optional connection_rate_bindings; std::optional> _connection_rates; diff --git a/src/v/net/tls_certificate_probe.h b/src/v/net/tls_certificate_probe.h index 8c0396f950fee..3925559d01a4f 100644 --- a/src/v/net/tls_certificate_probe.h +++ b/src/v/net/tls_certificate_probe.h @@ -15,6 +15,7 @@ #include "bytes/bytes.h" #include "config/tls_config.h" #include "metrics/metrics.h" +#include "net/types.h" #include "utils/named_type.h" #include @@ -44,7 +45,11 @@ class tls_certificate_probe { std::exception_ptr ex, std::optional trust_file_contents); - void setup_metrics(std::string_view area, std::string_view detail); + void setup_metrics( + std::string_view area, + std::string_view detail, + net::metrics_disabled metrics_disabled, + net::public_metrics_disabled public_metrics_disabled); private: struct cert { @@ -54,6 +59,8 @@ class tls_certificate_probe { }; metrics::internal_metric_groups _metrics; metrics::public_metric_groups _public_metrics; + bool _metrics_setup{false}; + bool _public_metrics_setup{false}; clock_type::time_point _load_time{}; std::optional _cert; std::optional _ca; @@ -86,6 +93,23 @@ build_reloadable_server_credentials_with_probe( ss::sstring listener_name, ss::tls::reload_callback cb = {}); +struct server_credentials_with_deferred_probe { + ss::shared_ptr credentials{nullptr}; + ss::lw_shared_ptr probe{nullptr}; +}; + +/// Like \ref build_reloadable_server_credentials_with_probe, but does not +/// call \ref tls_certificate_probe::setup_metrics on the returned probe. +/// +/// Use this when constructing TLS credentials before the cluster +/// configuration has been marked ready (e.g. during early bootstrap, where +/// \ref config::shard_local_cfg would assert). The caller is responsible +/// for invoking \ref tls_certificate_probe::setup_metrics on the returned +/// probe once the configuration is available. +ss::future +build_reloadable_server_credentials_with_deferred_probe( + config::tls_config tls_config, ss::tls::reload_callback cb = {}); + template concept TLSCreds = std::is_base_of::value; diff --git a/src/v/redpanda/BUILD b/src/v/redpanda/BUILD index a7fe67aeb9477..3f1eb08c0787f 100644 --- a/src/v/redpanda/BUILD +++ b/src/v/redpanda/BUILD @@ -39,6 +39,8 @@ redpanda_cc_library( deps = [ ":cli_parser", "//src/v/base", + "//src/v/bytes:iobuf_parser", + "//src/v/bytes:iostream", "//src/v/cloud_io:cache", "//src/v/cloud_io:remote", "//src/v/cloud_storage", @@ -66,6 +68,7 @@ redpanda_cc_library( "//src/v/cluster:offsets_lookup", "//src/v/cluster:partition_properties_stm", "//src/v/cluster:tx_manager_migrator_rpc", + "//src/v/cluster:types", "//src/v/cluster/utils:partition_change_notifier_impl", "//src/v/cluster_link:fwd", "//src/v/cluster_link:rpc_service", @@ -95,6 +98,7 @@ redpanda_cc_library( "//src/v/kafka/server:write_at_offset_stm", "//src/v/metrics", "//src/v/migrations", + "//src/v/model", "//src/v/net", "//src/v/net:tls", "//src/v/pandaproxy:core", diff --git a/src/v/redpanda/application.cc b/src/v/redpanda/application.cc index e45a6c7a90e2f..f5cafddf7ad76 100644 --- a/src/v/redpanda/application.cc +++ b/src/v/redpanda/application.cc @@ -16,6 +16,7 @@ #include "cloud_storage_clients/client_pool.h" #include "cluster/cloud_metadata/offsets_upload_router.h" #include "cluster/cloud_metadata/offsets_uploader.h" +#include "cluster/cluster_discovery.h" #include "cluster/config_manager.h" #include "cluster/controller.h" #include "cluster/node_isolation_watcher.h" @@ -30,10 +31,12 @@ #include "datalake/credential_manager.h" #include "datalake/datalake_manager.h" #include "datalake/datalake_usage_aggregator.h" +#include "features/feature_table.h" #include "kafka/client/configuration.h" #include "kafka/server/rm_group_frontend.h" #include "metrics/prometheus_sanitize.h" #include "migrations/migrators.h" +#include "net/tls_certificate_probe.h" #include "pandaproxy/rest/api.h" #include "pandaproxy/rest/configuration.h" #include "pandaproxy/schema_registry/api.h" @@ -338,7 +341,13 @@ int application::run(int ac, char** av) { // Cluster config validation uses OpenSSL (e.g. TLS cipher // checks), so crypto must be initialized first. wire_up_and_start_crypto_services(); + wire_up_bootstrap_services(); + mark_config_ready(false).get(); hydrate_cluster_config(node_cfg_yaml); + bootstrap_from_kvstore(); + wire_up_and_start_rpc_service(); + establish_cluster_view(); + log_cluster_config(); init_crashtracker(app_signal); initialize(); check_environment(); @@ -387,9 +396,20 @@ void application::initialize( std::optional schema_reg_client_cfg, std::optional audit_log_client_cfg) { ss::smp::invoke_on_all([] { - // initialize memory groups now that our configuration is loaded + // re-initialize memory groups now that our configuration is loaded + memory_groups_holder().reset(); memory_groups(); }).get(); + + // With memory groups re-initialized, we can now set a proper memory + // capacity in the _rpc server (which was constructed before a consistent + // cluster view was established). + _rpc + .invoke_on_all([](rpc::rpc_server& r) { + r.set_memory_capacity(memory_groups().rpc_total_memory()); + }) + .get(); + construct_service( _memory_sampling, std::ref(_log), ss::sharded_parameter([]() { return config::shard_local_cfg().sampled_memory_profile.bind(); @@ -460,21 +480,6 @@ void application::initialize( "data directory", config::node().data_directory().path); syschecks::pidfile_create(config::node().pidfile_path()); } - smp_groups::config smp_groups_cfg{ - .raft_group_max_non_local_requests - = config::shard_local_cfg().raft_smp_max_non_local_requests().value_or( - smp_groups::default_raft_non_local_requests( - config::shard_local_cfg().topic_partitions_per_shard())), - .proxy_group_max_non_local_requests - = config::shard_local_cfg().pp_sr_smp_max_non_local_requests().value_or( - smp_groups::default_max_nonlocal_requests)}; - - smp_service_groups.create_groups(smp_groups_cfg).get(); - _deferred.emplace_back( - [this] { smp_service_groups.destroy_groups().get(); }); - - // Ensure the scheduling groups singleton is initialized early - std::ignore = scheduling_groups::instance(); construct_service(_scheduling_groups_probe).get(); _scheduling_groups_probe @@ -514,6 +519,18 @@ void application::initialize( } void application::setup_metrics() { + // Two systems that were created in the pre-bootstrapping process that may + // now need their metrics enabled. + feature_table.invoke_on_all(&features::feature_table::setup_metrics).get(); + _rpc + .invoke_on_all([](rpc::rpc_server& r) { + r.set_metrics( + net::metrics_disabled(config::shard_local_cfg().disable_metrics()), + net::public_metrics_disabled( + config::shard_local_cfg().disable_public_metrics())); + }) + .get(); + _rpc.invoke_on_all(&rpc::rpc_server::setup_metrics).get(); setup_internal_metrics(); setup_public_metrics(); } @@ -724,31 +741,76 @@ YAML::Node application::hydrate_node_config(const po::variables_map& cfg) { return config; } -void application::hydrate_cluster_config(const YAML::Node& config) { - auto config_printer = [this](std::string_view service, const auto& cfg) { - std::vector items; - cfg.for_each([&items, &service](const auto& item) { - items.push_back( - ssx::sformat("{}.{}\t- {}", service, item, item.desc())); - }); - std::sort(items.begin(), items.end()); - for (const auto& item : items) { - vlog(_log.info, "{}", item); - } - }; +// Forward declarations of helper functions defined in application_config.cc +std::optional read_file_sanitizer_config(); + +storage::kvstore_config kvstore_config_from_global_config( + std::optional sanitizer_config); + +storage::log_config manager_config_from_global_config( + scheduling_groups& sgs, + std::optional sanitizer_config); + +void application::wire_up_bootstrap_services() { + // Ensure the scheduling groups singleton is initialized early + std::ignore = scheduling_groups::instance(); + + // Construct the feature table + syschecks::systemd_message("Creating feature table").get(); + construct_service(feature_table).get(); + + // Construct local storage + const auto sanitizer_config = read_file_sanitizer_config(); + syschecks::systemd_message("Creating storage").get(); + construct_service( + storage, + [c = sanitizer_config]() mutable { + return kvstore_config_from_global_config(std::move(c)); + }, + [c = sanitizer_config]() mutable { + auto log_cfg = manager_config_from_global_config( + scheduling_groups::instance(), std::move(c)); + log_cfg.reclaim_opts.background_reclaimer_sg + = scheduling_groups::instance().cache_background_reclaim_sg(); + return log_cfg; + }, + std::ref(feature_table)) + .get(); + + // Construct smp groups using potentially stale cluster config. + auto& cfg = config::shard_local_cfg_unsafe(); + smp_groups::config smp_groups_cfg{ + .raft_group_max_non_local_requests + = cfg.raft_smp_max_non_local_requests().value_or( + smp_groups::default_raft_non_local_requests( + cfg.topic_partitions_per_shard())), + .proxy_group_max_non_local_requests + = cfg.pp_sr_smp_max_non_local_requests().value_or( + smp_groups::default_max_nonlocal_requests)}; + + smp_service_groups.create_groups(smp_groups_cfg).get(); + _deferred.emplace_back( + [this] { smp_service_groups.destroy_groups().get(); }); +} +void application::establish_cluster_view() { + bootstrap_controller_view().get(); + + // The shard_local_cfg() is now safe to use as we have a view consistent + // with the rest of the cluster. + mark_config_ready(true).get(); +} + +ss::future<> application::mark_config_ready(bool ready) { + co_await ss::smp::invoke_on_all( + [ready] { config::shard_local_cfg_unsafe().mark_ready(ready); }); +} + +void application::hydrate_cluster_config(const YAML::Node& config) { // This includes loading from local bootstrap file or legacy // config file on first-start or upgrade cases. _config_preload = cluster::config_manager::preload(config).get(); - vlog(_log.info, "Cluster configuration properties:"); - vlog(_log.info, "(use `rpk cluster config edit` to change)"); - config_printer("redpanda", config::shard_local_cfg()); - - vlog(_log.info, "Node configuration properties:"); - vlog(_log.info, "(use `rpk redpanda config set ` to change)"); - config_printer("redpanda", config::node()); - if (config["pandaproxy"]) { _proxy_config.emplace(config["pandaproxy"]); for (const auto& e : _proxy_config->errors()) { @@ -768,8 +830,6 @@ void application::hydrate_cluster_config(const YAML::Node& config) { set_local_kafka_client_config(_proxy_client_config, config::node()); } set_pp_kafka_client_defaults(*_proxy_config, *_proxy_client_config); - config_printer("pandaproxy", *_proxy_config); - config_printer("pandaproxy_client", *_proxy_client_config); } if (config["schema_registry"]) { _schema_reg_config.emplace(config["schema_registry"]); @@ -780,8 +840,6 @@ void application::hydrate_cluster_config(const YAML::Node& config) { _schema_reg_client_config, config::node()); } set_sr_kafka_client_defaults(*_schema_reg_client_config); - config_printer("schema_registry", *_schema_reg_config); - config_printer("schema_registry_client", *_schema_reg_client_config); } /// Auditing will be toggled via cluster config settings, internal audit /// client options can be configured via local config properties @@ -791,7 +849,44 @@ void application::hydrate_cluster_config(const YAML::Node& config) { set_local_kafka_client_config(_audit_log_client_config, config::node()); } set_auditing_kafka_client_defaults(*_audit_log_client_config); - config_printer("audit_log_client", *_audit_log_client_config); +} + +void application::log_cluster_config() { + auto config_printer = [this](std::string_view service, const auto& cfg) { + std::vector items; + cfg.for_each([&items, &service](const auto& item) { + items.push_back( + ssx::sformat("{}.{}\t- {}", service, item, item.desc())); + }); + std::sort(items.begin(), items.end()); + for (const auto& item : items) { + vlog(_log.info, "{}", item); + } + }; + + vlog(_log.info, "Cluster configuration properties:"); + vlog(_log.info, "(use `rpk cluster config edit` to change)"); + config_printer("redpanda", config::shard_local_cfg()); + + vlog(_log.info, "Node configuration properties:"); + vlog(_log.info, "(use `rpk redpanda config set ` to change)"); + config_printer("redpanda", config::node()); + + if (_proxy_config) { + config_printer("pandaproxy", *_proxy_config); + } + if (_proxy_client_config) { + config_printer("pandaproxy_client", *_proxy_client_config); + } + if (_schema_reg_config) { + config_printer("schema_registry", *_schema_reg_config); + } + if (_schema_reg_client_config) { + config_printer("schema_registry_client", *_schema_reg_client_config); + } + if (_audit_log_client_config) { + config_printer("audit_log_client", *_audit_log_client_config); + } } void application::check_environment() { diff --git a/src/v/redpanda/application.h b/src/v/redpanda/application.h index 2c374baa9dd8f..783608c155243 100644 --- a/src/v/redpanda/application.h +++ b/src/v/redpanda/application.h @@ -33,6 +33,7 @@ #include "datalake/credential_manager.h" #include "datalake/fwd.h" #include "debug_bundle/fwd.h" +#include "features/feature_table_snapshot.h" #include "features/fwd.h" #include "finjector/stress_fiber.h" #include "kafka/client/configuration.h" @@ -215,16 +216,52 @@ class application : public ssx::sharded_service_container { return _datalake_coordinator_fe; } + // At a minimum, we need to construct the feature table and storage systems + // in order to properly bootstrap the system. Public for test fixture + // access. + void wire_up_bootstrap_services(); + + // We need the RPC server and bootstrap service (at a minimum) running + // before cluster discovery can be performed. + void wire_up_and_start_rpc_service(); + + // Before we can continue in the bootstrap process, we need to establish + // a consistent view of the cluster-wide state - namely, the cluster + // configuration and feature table state. We do that by: + // 1. Applying any local kvstore snapshots (which contain potentially + // stale config and feature table state, as well as persisted + // node/cluster UUID information). + // 2. If we already have an identity and a persisted member set, + // refreshing that view by fetching an authoritative + // controller_join_snapshot from the controller leader via the + // `fetch_controller_snapshot` RPC. New nodes that lack persisted + // state skip this step; they will instead receive their snapshot + // later through the join_node response in resolve_node_identity. + // 3. Marking shard_local_cfg() as ready, after which downstream + // services may safely read cluster configuration. + // Public for test fixture access. + void establish_cluster_view(); + // Constructs and starts the services required to provide cryptographic // algorithm support to Redpanda. Public for test fixture access. void wire_up_and_start_crypto_services(); + // Public for test fixture access. + void hydrate_cluster_config(const YAML::Node& config); + + // Performs recovery on the local kvstore, applies a local feature table + // snapshot, and sets in-memory node/cluster UUIDs. + // Public for test fixture access. + void bootstrap_from_kvstore(); + private: - // Constructs services across shards required to get bootstrap metadata. - void wire_up_bootstrap_services(); + // Constructs storage services across shards required early on in the + // bootstrap process. + void wire_up_storage_services(); - // Starts services across shards required to get bootstrap metadata. - void start_bootstrap_services(); + // Starts storage services across shards required early on in the + // bootstrap process. + void start_storage_services(); // Constructs services across shards meant for Redpanda runtime. void wire_up_runtime_services( @@ -238,16 +275,33 @@ class application : public ssx::sharded_service_container { std::optional& bucket_name, cloud_topics::test_fixture_cfg ct_test_cfg); - void load_feature_table_snapshot(); + // Marks the shard_local_cfg as ready per the provided flag. + ss::future<> mark_config_ready(bool ready); + + // Applies the provided feature_table_snapshot directly to the in-memory + // feature table state. + ss::future<> + apply_feature_table_snapshot(const features::feature_table_snapshot& snap); + // Attempts to read a local feature table snapshot from the kvstore and + // apply it. + ss::future<> maybe_apply_local_feature_table_snapshot(); + // Performs cluster discovery for first time cluster joiners, or resolves + // node identity from persisted kvstore state. + ss::future<> resolve_node_identity(); + // Fetches and applies a view of the controller_stm from the current + // controller leader. + ss::future<> bootstrap_controller_view(); + // Refreshes the cluster config and feature table from the provided + // snapshot. + ss::future<> + apply_controller_snapshot(const cluster::controller_join_snapshot&); void trigger_abort_source(); // Starts the services meant for Redpanda runtime. Must be called after // having constructed the subsystems via the corresponding `wire_up` calls. void start_runtime_services( - cluster::cluster_discovery&, - ::stop_signal&, - cloud_topics::test_fixture_cfg ct_test_cfg); + ::stop_signal&, cloud_topics::test_fixture_cfg ct_test_cfg); void start_kafka(const model::node_id&, ::stop_signal&); void add_runtime_rpc_services(rpc::rpc_server&, bool start_raft_rpc_early); @@ -255,7 +309,7 @@ class application : public ssx::sharded_service_container { ss::app_template::config setup_app_config(); void validate_arguments(const po::variables_map&); YAML::Node hydrate_node_config(const po::variables_map&); - void hydrate_cluster_config(const YAML::Node& config); + void log_cluster_config(); bool requires_cloud_io(); @@ -278,6 +332,9 @@ class application : public ssx::sharded_service_container { // in the log during startup. cluster::config_manager::preload_result _config_preload; + std::unique_ptr _cluster_discovery; + bool _node_uuid_needs_persisting{false}; + // When joining a cluster, we are tipped off as to the last applied // offset of the controller stm from another node. We will wait for // this offset to be replicated to our controller log before listening diff --git a/src/v/redpanda/application_bootstrap.cc b/src/v/redpanda/application_bootstrap.cc index 94e7fe371c117..66a0d8d8a4719 100644 --- a/src/v/redpanda/application_bootstrap.cc +++ b/src/v/redpanda/application_bootstrap.cc @@ -14,6 +14,8 @@ #include "cluster/controller.h" #include "cluster/controller_snapshot.h" #include "cluster/feature_manager.h" +#include "cluster/members_manager.h" +#include "cluster/types.h" #include "cluster_link/service.h" #include "config/configuration.h" #include "config/node_config.h" @@ -48,6 +50,15 @@ #include #include +namespace { + +bytes node_uuid_key() { + static const auto key = bytes::from_string("node_uuid"); + return key; +} + +} // namespace + void application::wire_up_and_start_crypto_services() { construct_single_service(thread_worker); thread_worker->start({.name = "worker"}).get(); @@ -80,18 +91,14 @@ void application::wire_up_and_start_crypto_services() { .get(); } -// Forward declarations of helper functions defined in application_config.cc -std::optional read_file_sanitizer_config(); - -storage::kvstore_config kvstore_config_from_global_config( - std::optional sanitizer_config); - -storage::log_config manager_config_from_global_config( - scheduling_groups& sgs, - std::optional sanitizer_config); - -void application::wire_up_bootstrap_services() { +void application::wire_up_storage_services() { // Wire up local storage. + storage + .invoke_on_all([](storage::api& s) { + s.resources().update_append_chunk_size( + storage::internal::chunks().chunk_size()); + }) + .get(); ss::smp::invoke_on_all([] { return storage::internal::chunks().start(); }).get(); @@ -115,23 +122,6 @@ void application::wire_up_bootstrap_services() { std::ref(storage_node)) .get(); - const auto sanitizer_config = read_file_sanitizer_config(); - - construct_service( - storage, - [c = sanitizer_config]() mutable { - return kvstore_config_from_global_config(std::move(c)); - }, - [c = sanitizer_config]() mutable { - auto log_cfg = manager_config_from_global_config( - scheduling_groups::instance(), std::move(c)); - log_cfg.reclaim_opts.background_reclaimer_sg - = scheduling_groups::instance().cache_background_reclaim_sg(); - return log_cfg; - }, - std::ref(feature_table)) - .get(); - // Hook up local_monitor to update storage_resources when disk state changes auto storage_disk_notification = storage_node.local().register_disk_notification( @@ -145,92 +135,17 @@ void application::wire_up_bootstrap_services() { storage_node.local().unregister_disk_notification( storage::node::disk_type::data, storage_disk_notification); }); - - // Start empty, populated from snapshot in start_bootstrap_services - syschecks::systemd_message("Creating feature table").get(); - construct_service(feature_table).get(); - - // Wire up the internal RPC server. - ss::sharded rpc_cfg; - rpc_cfg.start(ss::sstring("internal_rpc")).get(); - auto stop_cfg = ss::defer([&rpc_cfg] { rpc_cfg.stop().get(); }); - rpc_cfg - .invoke_on_all([this](net::server_configuration& c) { - return ss::async([this, &c] { - auto rpc_server_addr - = net::resolve_dns(config::node().rpc_server()).get(); - // Use port based load_balancing_algorithm to make connection - // shard assignment deterministic. - c.load_balancing_algo - = ss::server_socket::load_balancing_algorithm::port; - c.max_service_memory_per_core = int64_t( - memory_groups().rpc_total_memory()); - c.disable_metrics = net::metrics_disabled( - config::shard_local_cfg().disable_metrics()); - c.disable_public_metrics = net::public_metrics_disabled( - config::shard_local_cfg().disable_public_metrics()); - c.listen_backlog - = config::shard_local_cfg().rpc_server_listen_backlog; - c.tcp_recv_buf - = config::shard_local_cfg().rpc_server_tcp_recv_buf; - c.tcp_send_buf - = config::shard_local_cfg().rpc_server_tcp_send_buf; - config::tls_config tls_config{ - config::node().rpc_server_tls().is_enabled(), - config::node().rpc_server_tls().get_key_cert_files(), - config::node().rpc_server_tls().get_truststore_file(), - config::node().rpc_server_tls().get_crl_file(), - config::node().rpc_server_tls().get_require_client_auth(), - config::node() - .rpc_server_tls() - .get_tls_v1_2_cipher_suites() - .value_or(ss::sstring{}), - config::node() - .rpc_server_tls() - .get_tls_v1_3_cipher_suites() - .value_or(ss::sstring{net::tls_v1_3_cipher_suites_strict}), - config::node().rpc_server_tls().get_min_tls_version().value_or( - config::tls_version::v1_3), - config::node() - .rpc_server_tls() - .get_enable_renegotiation() - .value_or(false)}; - auto credentials - = net::build_reloadable_server_credentials_with_probe( - tls_config, - "rpc", - "", - [this]( - const std::unordered_set& updated, - const std::exception_ptr& eptr) { - rpc::log_certificate_reload_event( - _log, "Internal RPC TLS", updated, eptr); - }) - .get(); - c.addrs.emplace_back(rpc_server_addr, credentials); - }); - }) - .get(); - - syschecks::systemd_message( - "Constructing internal RPC services {}", rpc_cfg.local()) - .get(); - construct_service(_rpc, &rpc_cfg).get(); } -void application::start_bootstrap_services() { - syschecks::systemd_message("Starting storage services").get(); - - // single instance - storage_node.invoke_on_all(&storage::node::start).get(); - local_monitor.invoke_on_all(&cluster::node::local_monitor::start).get(); - - storage.invoke_on_all(&storage::api::start).get(); +void application::bootstrap_from_kvstore() { + // We need to recover the kvstore's state from snapshots & segments for + // read-only purposes during bootstrapping. + storage.invoke_on_all(&storage::api::recover_kvstore).get(); - // As soon as storage is up, load our feature_table snapshot, if any, + // As soon as storage is up, load our local feature_table snapshot, if any, // so that all other services may rely on having features activated as soon // as they start. - load_feature_table_snapshot(); + maybe_apply_local_feature_table_snapshot().get(); // Before we start up our bootstrapping RPC service, load any relevant // on-disk state we may need: existing cluster UUID, node ID, etc. @@ -321,10 +236,9 @@ void application::start_bootstrap_services() { // Load the local node UUID, or create one if none exists. auto& kvs = storage.local().kvs(); - static const auto node_uuid_key = bytes::from_string("node_uuid"); model::node_uuid node_uuid; auto node_uuid_buf = kvs.get( - storage::kvstore::key_space::controller, node_uuid_key); + storage::kvstore::key_space::controller, node_uuid_key()); if (node_uuid_buf) { node_uuid = serde::from_iobuf( std::move(*node_uuid_buf)); @@ -335,12 +249,7 @@ void application::start_bootstrap_services() { } else { node_uuid = model::node_uuid(uuid_t::create()); vlog(_log.info, "Generated new UUID for node: {}", node_uuid); - kvs - .put( - storage::kvstore::key_space::controller, - node_uuid_key, - serde::to_iobuf(node_uuid)) - .get(); + _node_uuid_needs_persisting = true; } _node_overrides.maybe_set_overrides( @@ -354,57 +263,38 @@ void application::start_bootstrap_services() { node_uuid, u.value()); node_uuid = u.value(); - kvs - .put( - storage::kvstore::key_space::controller, - node_uuid_key, - serde::to_iobuf(node_uuid)) - .get(); + _node_uuid_needs_persisting = true; } storage .invoke_on_all([node_uuid](storage::api& storage) mutable { storage.set_node_uuid(node_uuid); }) .get(); - - syschecks::systemd_message("Starting internal RPC bootstrap service").get(); - _rpc - .invoke_on_all([this](rpc::rpc_server& s) { - std::vector> bootstrap_service; - bootstrap_service.push_back( - std::make_unique( - scheduling_groups::instance().cluster_sg(), - smp_service_groups.cluster_smp_sg(), - std::ref(storage))); - s.add_services(std::move(bootstrap_service)); - }) - .get(); - _rpc.invoke_on_all(&rpc::rpc_server::start).get(); - vlog( - _log.info, - "Started RPC server listening at {}", - config::node().rpc_server()); } -void application::wire_up_and_start( - ::stop_signal& app_signal, - bool test_mode, - cloud_topics::test_fixture_cfg ct_test_cfg) { - // Setup the app level abort service - construct_service(_as).get(); +void application::start_storage_services() { + syschecks::systemd_message("Starting storage services").get(); - // Bootstrap services. - wire_up_bootstrap_services(); - start_bootstrap_services(); + // single instance + storage_node.invoke_on_all(&storage::node::start).get(); + local_monitor.invoke_on_all(&cluster::node::local_monitor::start).get(); - // Begin the cluster discovery manager so we can confirm our initial node - // ID. A valid node ID is required before we can initialize the rest of our - // subsystems. - const auto& node_uuid = storage.local().node_uuid(); - cluster::cluster_discovery cd( - node_uuid, storage.local(), app_signal.abort_source()); + storage.invoke_on_all(&storage::api::start).get(); +} + +ss::future<> application::resolve_node_identity() { + auto& kvs = storage.local().kvs(); - auto invariants_buf = storage.local().kvs().get( + // We need to persist the node's local UUID before potentially joining the + // cluster for the first time. + if (_node_uuid_needs_persisting) { + co_await kvs.put( + storage::kvstore::key_space::controller, + node_uuid_key(), + serde::to_iobuf(storage.local().node_uuid())); + } + + auto invariants_buf = kvs.get( storage::kvstore::key_space::controller, cluster::controller::invariants_key()); @@ -432,26 +322,23 @@ void application::wire_up_and_start( node_id = id.value(); // null out the config'ed ID indiscriminately; it will be set outside // the conditional - ss::smp::invoke_on_all([] { - config::node().node_id.set_value(std::nullopt); - }).get(); + co_await ss::smp::invoke_on_all( + [] { config::node().node_id.set_value(std::nullopt); }); if (invariants_buf.has_value()) { auto invariants = reflection::from_iobuf( std::move(invariants_buf.value())); invariants.node_id = node_id; - storage.local() - .kvs() - .put( - storage::kvstore::key_space::controller, - cluster::controller::invariants_key(), - reflection::to_iobuf( - cluster::configuration_invariants{invariants})) - .get(); + co_await kvs.put( + storage::kvstore::key_space::controller, + cluster::controller::invariants_key(), + reflection::to_iobuf( + cluster::configuration_invariants{invariants})); vlog(_log.debug, "Force-updated local node_id to {}", node_id); } } else { - auto registration_result = cd.register_with_cluster().get(); + auto registration_result + = co_await _cluster_discovery->register_with_cluster(); node_id = registration_result.assigned_node_id; if (registration_result.newly_registered) { @@ -464,36 +351,7 @@ void application::wire_up_and_start( auto snap = serde::from_iobuf( std::move(registration_result.controller_snapshot.value())); - - // The controller is not started yet, so write state directly - // into the feature table and configuration object. We do not - // currently use the rest of the snapshot, but reserve the right - // to do so in future (e.g. to prime all the controller stms - // from the snapshot) - auto ftsnap = std::move(snap.features.snap); - ss::smp::invoke_on_all([ftsnap, &ft = feature_table] { - ftsnap.apply(ft.local()); - }).get(); - cluster::feature_backend::do_save_local_snapshot( - storage.local(), ftsnap) - .get(); - - // The preload object is usually generated from loading a local - // cache or from the bootstrap file. The configuration received - // from the cluster during join takes precedence over either of - // these, and we replace it. - _config_preload - = cluster::config_manager::preload_join(snap).get(); - cluster::config_manager::write_local_cache( - _config_preload.version, _config_preload.raw_values) - .get(); - - // During controller::start, we wait to reach an applied offset. - // By priming this from the join snapshot, we may ensure that - // we wait until this node has replicated all the controller - // metadata since it joined, before we proceed with e.g. - // listening for Kafka API requests. - _await_controller_last_applied = snap.last_applied; + co_await apply_controller_snapshot(snap); } } } @@ -501,10 +359,10 @@ void application::wire_up_and_start( if (config::node().node_id() == std::nullopt) { // If we previously didn't have a node ID, set it in the config. We // will persist it in the kvstore when the controller starts up. - ss::smp::invoke_on_all([node_id] { + co_await ss::smp::invoke_on_all([node_id] { config::node().node_id.set_value( std::make_optional(node_id)); - }).get(); + }); } vlog( @@ -512,7 +370,185 @@ void application::wire_up_and_start( "Starting Redpanda with node_id {}, cluster UUID {}", node_id, storage.local().get_cluster_uuid()); +} + +ss::future<> application::bootstrap_controller_view() { + if (!feature_table.local().is_active( + features::feature::fetch_controller_snapshot_rpc)) { + vlog( + _log.debug, + "fetch_controller_snapshot_rpc feature not active locally; " + "skipping bootstrap snapshot fetch (shard_local_cfg and feature " + "table will reflect local cache only)"); + co_return; + } + // Use the cluster-members snapshot persisted by members_manager as + // the candidate set of peers. A founder on first boot or a + // non-founder joiner that has not yet completed + // register_with_cluster will have no persisted members; in that + // case, fall through to the local cache. + auto persisted = cluster::members_manager::read_members_from_kvstore( + storage.local().kvs()); + const auto self_id = config::node().node_id(); + if (self_id.has_value()) { + std::erase_if(persisted.members, [self_id](const model::broker& b) { + return b.id() == *self_id; + }); + } + if (persisted.members.empty()) { + vlog( + _log.debug, + "No persisted cluster members; skipping controller snapshot " + "fetch (shard_local_cfg and feature table will reflect local " + "cache only)"); + co_return; + } + auto fetched = co_await cluster::cluster_discovery:: + fetch_controller_snapshot_from_leader(persisted.members); + if (!fetched.has_value()) { + vlog( + _log.warn, + "Failed to fetch controller snapshot from any persisted " + "member; shard_local_cfg and feature table will reflect local " + "cache only"); + co_return; + } + + auto snap = serde::from_iobuf( + std::move(fetched.value())); + co_await apply_controller_snapshot(snap); +} + +ss::future<> application::apply_controller_snapshot( + const cluster::controller_join_snapshot& snap) { + co_await apply_feature_table_snapshot(snap.features.snap); + + // Only apply the snapshot's cluster config state if its version is higher + // than the existing preloaded state. + if (snap.config.version > _config_preload.version) { + _config_preload = co_await cluster::config_manager::preload_join(snap); + co_await cluster::config_manager::write_local_cache( + _config_preload.version, _config_preload.raw_values); + } + + _await_controller_last_applied = snap.last_applied; +} + +void application::wire_up_and_start_rpc_service() { + // Construct the rpc service. + ss::sharded rpc_cfg; + rpc_cfg.start(ss::sstring("internal_rpc")).get(); + auto stop_cfg = ss::defer([&rpc_cfg] { rpc_cfg.stop().get(); }); + rpc_cfg + .invoke_on_all([this](net::server_configuration& c) { + return ss::async([this, &c] { + auto rpc_server_addr + = net::resolve_dns(config::node().rpc_server()).get(); + // Because this rpc server is spun up before any consistent + // cluster view is established, we will be relying on stale local + // configs for the duration of this server's uptime. + auto& cfg = config::shard_local_cfg_unsafe(); + // Use port based load_balancing_algorithm to make connection + // shard assignment deterministic. + c.load_balancing_algo + = ss::server_socket::load_balancing_algorithm::port; + c.max_service_memory_per_core = int64_t( + memory_groups().rpc_total_memory()); + // Metrics are disabled for now - they can be enabled later in + // `application::setup_metrics()` after a consistent cluster view + // is established. + c.disable_metrics = net::metrics_disabled(true); + c.disable_public_metrics = net::public_metrics_disabled(true); + c.listen_backlog = cfg.rpc_server_listen_backlog; + c.tcp_recv_buf = cfg.rpc_server_tcp_recv_buf; + c.tcp_send_buf = cfg.rpc_server_tcp_send_buf; + config::tls_config tls_config{ + config::node().rpc_server_tls().is_enabled(), + config::node().rpc_server_tls().get_key_cert_files(), + config::node().rpc_server_tls().get_truststore_file(), + config::node().rpc_server_tls().get_crl_file(), + config::node().rpc_server_tls().get_require_client_auth(), + config::node() + .rpc_server_tls() + .get_tls_v1_2_cipher_suites() + .value_or(ss::sstring{}), + config::node() + .rpc_server_tls() + .get_tls_v1_3_cipher_suites() + .value_or(ss::sstring{net::tls_v1_3_cipher_suites_strict}), + config::node().rpc_server_tls().get_min_tls_version().value_or( + config::tls_version::v1_3), + config::node() + .rpc_server_tls() + .get_enable_renegotiation() + .value_or(false)}; + auto deferred + = net::build_reloadable_server_credentials_with_deferred_probe( + std::move(tls_config), + [this]( + const std::unordered_set& updated, + const std::exception_ptr& eptr) { + rpc::log_certificate_reload_event( + _log, "Internal RPC TLS", updated, eptr); + }) + .get(); + auto& ep = c.addrs.emplace_back( + rpc_server_addr, std::move(deferred.credentials)); + ep.tls_probe = std::move(deferred.probe); + }); + }) + .get(); + + syschecks::systemd_message( + "Constructing internal RPC services {}", rpc_cfg.local()) + .get(); + construct_service(_rpc, &rpc_cfg).get(); + + syschecks::systemd_message("Starting internal RPC bootstrap service").get(); + _rpc + .invoke_on_all([this](rpc::rpc_server& s) { + std::vector> bootstrap_service; + bootstrap_service.push_back( + std::make_unique( + scheduling_groups::instance().cluster_sg(), + smp_service_groups.cluster_smp_sg(), + std::ref(storage))); + s.add_services(std::move(bootstrap_service)); + }) + .get(); + _rpc.invoke_on_all(&rpc::rpc_server::start).get(); + vlog( + _log.info, + "Started RPC server listening at {}", + config::node().rpc_server()); +} +void application::wire_up_and_start( + ::stop_signal& app_signal, + bool test_mode, + cloud_topics::test_fixture_cfg ct_test_cfg) { + // Setup the app level abort service + construct_service(_as).get(); + + // Storage services. + wire_up_storage_services(); + start_storage_services(); + + // Begin the cluster discovery manager so we can confirm our initial node + // ID. A valid node ID is required before we can initialize the rest of our + // subsystems. + _cluster_discovery = std::make_unique( + storage.local().node_uuid(), + storage.local().get_cluster_uuid(), + app_signal.abort_source()); + + resolve_node_identity().get(); + + vassert( + config::node().node_id().has_value(), + "config::node().node_id() should have an assigned value at this point in " + "the start-up process."); + auto node_id = config::node().node_id().value(); wire_up_runtime_services(node_id, app_signal, ct_test_cfg); if (test_mode) { @@ -544,11 +580,12 @@ void application::wire_up_and_start( *controller)); } - if (cd.is_cluster_founder().get()) { + vassert(_cluster_discovery, "_cluster_discovery not constructed"); + if (_cluster_discovery->is_cluster_founder().get()) { controller->set_ready().get(); } - start_runtime_services(cd, app_signal, ct_test_cfg); + start_runtime_services(app_signal, ct_test_cfg); if (_proxy_config && !config::node().recovery_mode_enabled) { _proxy->start().get(); @@ -618,14 +655,44 @@ void application::wire_up_and_start( * e.g. the controller raft group) may rely on up to date knowledge of which * feature bits are enabled. */ -void application::load_feature_table_snapshot() { +ss::future<> application::apply_feature_table_snapshot( + const features::feature_table_snapshot& snap) { + auto my_version = features::feature_table::get_latest_logical_version(); + if (my_version < snap.version) { + vlog( + _log.error, + "Incompatible downgrade detected! My version {}, feature table {} " + "indicates that all nodes in cluster were previously >= that version", + my_version, + snap.version); + vassert( + config::node().upgrade_override_checks || my_version >= snap.version, + "Incompatible downgrade detected"); + } else { + vlog( + _log.debug, + "Loaded feature table snapshot at cluster version {} (vs my binary " + "{})", + snap.version, + my_version); + } + + co_await feature_table.invoke_on_all( + [snap](features::feature_table& ft) { snap.apply(ft); }); + + // Having loaded a snapshot, do our strict check for version compat. + feature_table.local().assert_compatible_version( + config::node().upgrade_override_checks); +} + +ss::future<> application::maybe_apply_local_feature_table_snapshot() { auto val_bytes_opt = storage.local().kvs().get( storage::kvstore::key_space::controller, features::feature_table_snapshot::kvstore_key()); if (!val_bytes_opt) { // No snapshot? Probably we are yet to join cluster. - return; + co_return; } features::feature_table_snapshot snap; @@ -643,36 +710,10 @@ void application::load_feature_table_snapshot() { #ifndef NDEBUG vunreachable("Snapshot decode failed"); #endif - return; + co_return; } - auto my_version = features::feature_table::get_latest_logical_version(); - if (my_version < snap.version) { - vlog( - _log.error, - "Incompatible downgrade detected! My version {}, feature table {} " - "indicates that all nodes in cluster were previously >= that version", - my_version, - snap.version); - vassert( - config::node().upgrade_override_checks || my_version >= snap.version, - "Incompatible downgrade detected"); - } else { - vlog( - _log.debug, - "Loaded feature table snapshot at cluster version {} (vs my binary " - "{})", - snap.version, - my_version); - } - - feature_table - .invoke_on_all([snap](features::feature_table& ft) { snap.apply(ft); }) - .get(); - - // Having loaded a snapshot, do our strict check for version compat. - feature_table.local().assert_compatible_version( - config::node().upgrade_override_checks); + co_await apply_feature_table_snapshot(snap); } /** diff --git a/src/v/redpanda/application_config.cc b/src/v/redpanda/application_config.cc index c312ca20050f7..c0e00cee61b41 100644 --- a/src/v/redpanda/application_config.cc +++ b/src/v/redpanda/application_config.cc @@ -175,8 +175,8 @@ storage::kvstore_config kvstore_config_from_global_config( * - ... #cores */ return storage::kvstore_config( - config::shard_local_cfg().kvstore_max_segment_size(), - config::shard_local_cfg().kvstore_flush_interval.bind(), + config::shard_local_cfg_unsafe().kvstore_max_segment_size(), + config::shard_local_cfg_unsafe().kvstore_flush_interval.bind(), config::node().data_directory().as_sstring(), sanitizer_config); } diff --git a/src/v/redpanda/application_start.cc b/src/v/redpanda/application_start.cc index 9ad3e2e0f56b6..b40ada6156729 100644 --- a/src/v/redpanda/application_start.cc +++ b/src/v/redpanda/application_start.cc @@ -56,9 +56,7 @@ #include void application::start_runtime_services( - cluster::cluster_discovery& cd, - ::stop_signal& app_signal, - cloud_topics::test_fixture_cfg ct_test_cfg) { + ::stop_signal& app_signal, cloud_topics::test_fixture_cfg ct_test_cfg) { // single instance node_status_backend.invoke_on_all(&cluster::node_status_backend::start) .get(); @@ -112,7 +110,8 @@ void application::start_runtime_services( // Initialize the Raft RPC endpoint before the rest of the runtime RPC // services so the cluster seeds can elect a leader and write a cluster // UUID before proceeding with the rest of bootstrap. - const bool start_raft_rpc_early = cd.is_cluster_founder().get(); + const bool start_raft_rpc_early + = _cluster_discovery->is_cluster_founder().get(); if (start_raft_rpc_early) { syschecks::systemd_message("Starting RPC/raft").get(); _rpc @@ -153,7 +152,7 @@ void application::start_runtime_services( } controller ->start( - cd, + *_cluster_discovery, app_signal.abort_source(), std::move(offsets_upload_requestor), producer_id_recovery_manager, @@ -211,9 +210,11 @@ void application::start_runtime_services( // By this point during startup we have enough information to evaluate both // the state of the license and what enterprise features are used. // - If redpanda has been restarted on an existing node, we have already - // loaded the feature table from the local snapshot in - // application::load_feature_table_snapshot and replayed the local - // controller log in controller::start. + // loaded the feature table from the local snapshot via + // application::maybe_apply_local_feature_table_snapshot and replayed + // the local controller log in controller::start. We may also have + // refreshed the feature table from a fresh controller snapshot + // fetched via fetch_controller_snapshot in establish_cluster_view. // - If this is a new node joining an existing cluster, by this point we // have received a controller snapshot from another node in the join // response and have replicated and replayed the the controller stm to the diff --git a/src/v/redpanda/tests/fixture.cc b/src/v/redpanda/tests/fixture.cc index f023887c0c1e1..ae1f824b8bc72 100644 --- a/src/v/redpanda/tests/fixture.cc +++ b/src/v/redpanda/tests/fixture.cc @@ -127,8 +127,13 @@ redpanda_thread_fixture::redpanda_thread_fixture( return std::make_optional(proxy_client_config(kafka_port)); }), audit_log_client_config(kafka_port)); - app.check_environment(); app.wire_up_and_start_crypto_services(); + app.wire_up_bootstrap_services(); + app.hydrate_cluster_config(make_minimal_cfg()); + app.bootstrap_from_kvstore(); + app.wire_up_and_start_rpc_service(); + app.establish_cluster_view(); + app.check_environment(); app.wire_up_and_start(*app_signal, true, ct_test_cfg); } catch (...) { // shutdown half-initialized app nicely so that its destructor doesn't @@ -333,8 +338,13 @@ void redpanda_thread_fixture::restart(should_wipe w) { config.get("disable_metrics").set_value(false); }).get(); app.initialize(proxy_config(), proxy_client_config()); - app.check_environment(); app.wire_up_and_start_crypto_services(); + app.wire_up_bootstrap_services(); + app.hydrate_cluster_config(make_minimal_cfg()); + app.bootstrap_from_kvstore(); + app.wire_up_and_start_rpc_service(); + app.establish_cluster_view(); + app.check_environment(); app.wire_up_and_start(*app_signal, true, ct_test_cfg); } diff --git a/src/v/redpanda/tests/fixture.h b/src/v/redpanda/tests/fixture.h index 5d96e9c36cb90..53ce661b10945 100644 --- a/src/v/redpanda/tests/fixture.h +++ b/src/v/redpanda/tests/fixture.h @@ -283,6 +283,11 @@ class redpanda_thread_fixture { const ss::sstring& username, const ss::sstring& password); + YAML::Node make_minimal_cfg() { + static const YAML::Node minimal_cfg = YAML::Load("redpanda: {}"); + return minimal_cfg; + } + application app; std::optional proxy_port; std::optional schema_reg_port; diff --git a/src/v/resource_mgmt/memory_groups.cc b/src/v/resource_mgmt/memory_groups.cc index 4510b9d2156ae..8d1a3f0e32890 100644 --- a/src/v/resource_mgmt/memory_groups.cc +++ b/src/v/resource_mgmt/memory_groups.cc @@ -21,16 +21,16 @@ namespace { bool wasm_enabled() { - return config::shard_local_cfg().data_transforms_enabled.value() + return config::shard_local_cfg_unsafe().data_transforms_enabled.value() && !config::node().emergency_disable_data_transforms.value(); } bool datalake_enabled() { - return config::shard_local_cfg().iceberg_enabled.value(); + return config::shard_local_cfg_unsafe().iceberg_enabled.value(); } bool cloud_topics_enabled() { - return config::shard_local_cfg().cloud_topics_enabled(); + return config::shard_local_cfg_unsafe().cloud_topics_enabled(); } struct memory_shares { @@ -207,7 +207,7 @@ system_memory_groups& memory_groups() { } size_t total = ss::memory::stats().total_memory(); bool wasm = wasm_enabled(); - const auto& cfg = config::shard_local_cfg(); + const auto& cfg = config::shard_local_cfg_unsafe(); if (wasm) { size_t wasm_memory_reservation = cfg.data_transforms_per_core_memory_reservation.value(); diff --git a/src/v/rpc/rpc_server.cc b/src/v/rpc/rpc_server.cc index 09f0fdb06a8f9..bd4d29f87e876 100644 --- a/src/v/rpc/rpc_server.cc +++ b/src/v/rpc/rpc_server.cc @@ -75,7 +75,7 @@ ss::future<> rpc_server::apply(ss::lw_shared_ptr conn) { ss::future<> rpc_server::send_reply(ss::lw_shared_ptr ctx, netbuf buf) { - if (config::shard_local_cfg().rpc_server_compress_replies()) { + if (_compress_replies()) { buf.set_min_compression_bytes(reply_min_compression_bytes); buf.set_compression(rpc::compression_type::zstd); } diff --git a/src/v/rpc/rpc_server.h b/src/v/rpc/rpc_server.h index 42897e40a4afc..4fb95dfa5d424 100644 --- a/src/v/rpc/rpc_server.h +++ b/src/v/rpc/rpc_server.h @@ -10,6 +10,7 @@ #include "base/vassert.h" #include "config/configuration.h" +#include "config/property.h" #include "net/server.h" #include "rpc/service.h" @@ -23,10 +24,16 @@ struct server_context_impl; class rpc_server : public net::server { public: explicit rpc_server(net::server_configuration s) - : net::server(std::move(s), rpclog) {} + : net::server(std::move(s), rpclog) + , _compress_replies( + config::shard_local_cfg_unsafe().rpc_server_compress_replies.bind()) { + } explicit rpc_server(ss::sharded* s) - : net::server(s, rpclog) {} + : net::server(s, rpclog) + , _compress_replies( + config::shard_local_cfg_unsafe().rpc_server_compress_replies.bind()) { + } ~rpc_server() override = default; @@ -35,7 +42,14 @@ class rpc_server : public net::server { rpc_server(const rpc_server&) = delete; rpc_server& operator=(const rpc_server&) = delete; - void set_all_services_added() { _all_services_added = true; } + void set_all_services_added() { + _all_services_added = true; + if (!cfg.disable_metrics) { + for (auto& s : _services) { + s->setup_metrics(); + } + } + } // Adds the given services to the protocol. // May be called whether or not the server has already been started. @@ -43,11 +57,6 @@ class rpc_server : public net::server { vassert( !_all_services_added, "Adding service after all services already added"); - if (!config::shard_local_cfg().disable_metrics()) { - for (auto& s : services) { - s->setup_metrics(); - } - } std::move( services.begin(), services.end(), std::back_inserter(_services)); } @@ -63,6 +72,8 @@ class rpc_server : public net::server { _services.push_back(std::make_unique(std::forward(args)...)); } + void setup_metrics() { net::server::setup_metrics(); } + private: ss::future<> dispatch_method_once(header, ss::lw_shared_ptr); @@ -72,6 +83,8 @@ class rpc_server : public net::server { bool _all_services_added{false}; std::vector> _services; + + config::binding _compress_replies; }; } // namespace rpc diff --git a/src/v/storage/api.h b/src/v/storage/api.h index fed90cce59f09..78a1fdff1a318 100644 --- a/src/v/storage/api.h +++ b/src/v/storage/api.h @@ -33,16 +33,27 @@ class api : public ss::peering_sharded_service { ss::sharded& feature_table) noexcept : _kv_conf_cb(std::move(kv_conf_cb)) , _log_conf_cb(std::move(log_conf_cb)) - , _feature_table(feature_table) {} + , _feature_table(feature_table) + , _kvstore( + std::make_unique( + _kv_conf_cb(), ss::this_shard_id(), _resources, _feature_table)) {} ss::future<> start() { - _kvstore = std::make_unique( - _kv_conf_cb(), ss::this_shard_id(), _resources, _feature_table); - return _kvstore->start().then([this] { + if (!_kvstore) { + _kvstore = std::make_unique( + _kv_conf_cb(), ss::this_shard_id(), _resources, _feature_table); + } + co_await _kvstore->start(_kvstore_needs_recovery); + if (!_log_mgr) { _log_mgr = std::make_unique( _log_conf_cb(), kvs(), _resources, _feature_table); - return _log_mgr->start(); - }); + } + co_await _log_mgr->start(); + } + + ss::future<> recover_kvstore() { + co_await _kvstore->recover(); + _kvstore_needs_recovery = kvstore::recover_t::no; } ss::future> @@ -71,6 +82,12 @@ class api : public ss::peering_sharded_service { return f; } + void reset() { + _log_mgr.reset(); + _kvstore.reset(); + _kvstore_needs_recovery = kvstore::recover_t::yes; + } + void set_node_uuid(const model::node_uuid& node_uuid) { _node_uuid = node_uuid; } @@ -127,6 +144,7 @@ class api : public ss::peering_sharded_service { std::optional _cluster_uuid; ss::condition_variable _has_cluster_uuid_cond; + kvstore::recover_t _kvstore_needs_recovery{kvstore::recover_t::yes}; }; } // namespace storage diff --git a/src/v/storage/kvstore.cc b/src/v/storage/kvstore.cc index a7a0a51935e84..b7f1ddf4bf3a6 100644 --- a/src/v/storage/kvstore.cc +++ b/src/v/storage/kvstore.cc @@ -55,7 +55,7 @@ kvstore::kvstore( kvstore::~kvstore() noexcept = default; -ss::future<> kvstore::start() { +ss::future<> kvstore::start(recover_t do_recover) { vlog(lg.debug, "Starting kvstore: dir {}", _ntpc.work_directory()); bool is_main_instance = static_cast(ss::this_shard_id()) @@ -91,37 +91,40 @@ ss::future<> kvstore::start() { }); } - return recover() - .then([this] { - _started = true; - - // Flushing background fiber - ssx::spawn_with_gate(_gate, [this] { - return ss::do_until( - [this] { return _gate.is_closed(); }, - [this] { - // semaphore used here instead of condition variable so that - // we don't lose wake-ups if they occur while flushing. - // consume at least one unit to avoid spinning on wait(0). - auto units = std::max(_sem.current(), size_t(1)); - return _sem.wait(units).then([this] { - if (_gate.is_closed()) { - return ss::now(); - } - return roll().then( - [this] { return flush_and_apply_ops(); }); - }); - }); + if (do_recover) { + co_await recover(); + } + vassert( + _recovered, + "kvstore::start(recover_t::no) called before recover() at dir {}", + _ntpc.work_directory()); + + _started = true; + + // Flushing background fiber + ssx::spawn_with_gate(_gate, [this] { + return ss::do_until( + [this] { return _gate.is_closed(); }, + [this] { + // semaphore used here instead of condition variable so that + // we don't lose wake-ups if they occur while flushing. + // consume at least one unit to avoid spinning on wait(0). + auto units = std::max(_sem.current(), size_t(1)); + return _sem.wait(units).then([this] { + if (_gate.is_closed()) { + return ss::now(); + } + return roll().then([this] { return flush_and_apply_ops(); }); + }); }); - }) - .handle_exception_type([](const ss::gate_closed_exception&) { - lg.trace("Shutdown requested during recovery"); - }); + }); } ss::future<> kvstore::stop() { vlog(lg.info, "Stopping kvstore: dir {}", _ntpc.work_directory()); + _probe.metrics.clear(); + _as.request_abort(); // prevent new ops, signal flusher to exit @@ -163,7 +166,7 @@ static inline bytes make_spaced_key(kvstore::key_space ks, bytes_view key) { std::optional kvstore::get(key_space ks, bytes_view key) { _probe.entry_fetched(); - vassert(_started, "kvstore has not been started"); + vassert(_recovered, "kvstore::get called before recover()"); // do not re-assign to string_view -> temporary auto kkey = make_spaced_key(ks, key); @@ -202,7 +205,7 @@ ss::future<> kvstore::put(key_space ks, bytes key, std::optional value) { ss::future<> kvstore::for_each( key_space ks, ss::noncopyable_function visitor) { - vassert(_started, "kvstore has not been started"); + vassert(_recovered, "kvstore::for_each called before recover()"); auto gh = _gate.hold(); auto units = co_await _db_mut.get_units(); @@ -413,6 +416,11 @@ ss::future<> kvstore::save_snapshot() { } ss::future<> kvstore::recover() { + vassert( + !_recovered, + "kvstore::recover called twice for dir {}", + _ntpc.work_directory()); + /* * after loading _next_offset will be set to either zero if no snapshot * is found, or the offset immediately following the snapshot offset. @@ -421,17 +429,19 @@ ss::future<> kvstore::recover() { auto segments = co_await recover_segments( partition_path(_ntpc), - _ntpc.is_locally_compacted(), + /*is_compacted_enabled=*/false, [] { return std::nullopt; }, _as, - config::shard_local_cfg().storage_read_buffer_size(), - config::shard_local_cfg().storage_read_readahead_count(), + config::shard_local_cfg_unsafe().storage_read_buffer_size(), + config::shard_local_cfg_unsafe().storage_read_readahead_count(), std::nullopt, _resources, _feature_table, _ntp_sanitizer_config); co_await replay_segments(std::move(segments)); + + _recovered = true; } ss::future<> kvstore::load_snapshot() { diff --git a/src/v/storage/kvstore.h b/src/v/storage/kvstore.h index 6445a1732c888..df84ae9a7e58f 100644 --- a/src/v/storage/kvstore.h +++ b/src/v/storage/kvstore.h @@ -115,7 +115,20 @@ class kvstore { ss::sharded& feature_table); ~kvstore() noexcept; - ss::future<> start(); + /// Load the snapshot and replay segments into the in-memory map. + /// After this returns, get()/empty()/for_each() are usable. + /// Bootstrap code that needs read access before the writer fiber + /// runs may call this directly and then pass recover_t::no to + /// start(). Asserts if called twice. + ss::future<> recover(); + + using recover_t = ss::bool_class; + + /// Wire up the writer fiber and accept put/remove. With + /// recover_t::yes (default), runs recover() first if it has not + /// already been called externally. With recover_t::no, asserts + /// recover() has already completed. + ss::future<> start(recover_t recover = recover_t::yes); ss::future<> stop(); std::optional get(key_space ks, bytes_view key); @@ -129,7 +142,7 @@ class kvstore { ss::noncopyable_function visitor); bool empty() const { - vassert(_started, "kvstore has not been started"); + vassert(_recovered, "kvstore::empty called before recover()"); return _db.empty(); } @@ -149,6 +162,7 @@ class kvstore { ss::gate _gate; ss::abort_source _as; simple_snapshot_manager _snap; + bool _recovered{false}; bool _started{false}; /** @@ -188,12 +202,11 @@ class kvstore { ss::future<> save_snapshot(); /* - * Recovery + * Recovery (recover() itself is a public entry point declared above) * * 1. load snapshot if found * 2. then recover from segments */ - ss::future<> recover(); ss::future<> load_snapshot(); ss::future<> load_snapshot_from_reader(snapshot_reader&); ss::future<> replay_segments(segment_set); diff --git a/src/v/storage/storage_resources.cc b/src/v/storage/storage_resources.cc index 330d9b435c946..61aa244c72970 100644 --- a/src/v/storage/storage_resources.cc +++ b/src/v/storage/storage_resources.cc @@ -31,12 +31,13 @@ storage_resources::storage_resources( config::binding falloc_step, config::binding target_replay_bytes, config::binding max_concurrent_replay, - config::binding compaction_index_memory) + config::binding compaction_index_memory, + size_t append_chunk_size) : _segment_fallocation_step(falloc_step) , _global_target_replay_bytes(target_replay_bytes) , _max_concurrent_replay(max_concurrent_replay) , _compaction_index_mem_limit(compaction_index_memory) - , _append_chunk_size(internal::chunks().chunk_size()) + , _append_chunk_size(append_chunk_size) , _offset_translator_dirty_bytes( _global_target_replay_bytes() / ss::smp::count) , _configuration_manager_dirty_bytes( @@ -78,16 +79,22 @@ storage_resources::storage_resources( storage_resources::storage_resources(config::binding falloc_step) : storage_resources( std::move(falloc_step), - config::shard_local_cfg().storage_target_replay_bytes.bind(), - config::shard_local_cfg().storage_max_concurrent_replay.bind(), - config::shard_local_cfg().storage_compaction_index_memory.bind()) {} + config::shard_local_cfg_unsafe().storage_target_replay_bytes.bind(), + config::shard_local_cfg_unsafe().storage_max_concurrent_replay.bind(), + config::shard_local_cfg_unsafe().storage_compaction_index_memory.bind(), + config::shard_local_cfg_unsafe().append_chunk_size()) {} storage_resources::storage_resources() : storage_resources( - config::shard_local_cfg().segment_fallocation_step.bind(), - config::shard_local_cfg().storage_target_replay_bytes.bind(), - config::shard_local_cfg().storage_max_concurrent_replay.bind(), - config::shard_local_cfg().storage_compaction_index_memory.bind()) {} + config::shard_local_cfg_unsafe().segment_fallocation_step.bind(), + config::shard_local_cfg_unsafe().storage_target_replay_bytes.bind(), + config::shard_local_cfg_unsafe().storage_max_concurrent_replay.bind(), + config::shard_local_cfg_unsafe().storage_compaction_index_memory.bind(), + config::shard_local_cfg_unsafe().append_chunk_size()) {} + +void storage_resources::update_append_chunk_size(size_t chunk_size) { + _append_chunk_size = chunk_size; +} void storage_resources::update_allowance(uint64_t total, uint64_t free) { // TODO: also take as an input the disk consumption of the SI cache: diff --git a/src/v/storage/storage_resources.h b/src/v/storage/storage_resources.h index d2772aceed233..cbd9f0b5c1d7f 100644 --- a/src/v/storage/storage_resources.h +++ b/src/v/storage/storage_resources.h @@ -44,14 +44,15 @@ class storage_resources { config::binding, config::binding, config::binding, - config::binding); + config::binding, + size_t); storage_resources(const storage_resources&) = delete; /** * Call this when the storage::node_api state is updated */ void update_allowance(uint64_t total, uint64_t free); - + void update_append_chunk_size(size_t chunk_size); /** * Call this when topics_table gets updated */ diff --git a/src/v/storage/tests/utils/disk_log_builder.cc b/src/v/storage/tests/utils/disk_log_builder.cc index f7f68612bad6f..a51e0e6025517 100644 --- a/src/v/storage/tests/utils/disk_log_builder.cc +++ b/src/v/storage/tests/utils/disk_log_builder.cc @@ -182,7 +182,9 @@ void disk_log_builder::add_closed_segment_bytes(ssize_t bytes) { ss::future<> disk_log_builder::stop() { _log->stm_hookset()->stop(); - return _storage.stop().then([this]() { return _feature_table.stop(); }); + co_await _storage.stop(); + _storage.reset(); + co_await _feature_table.stop(); } // Low lever interface access diff --git a/src/v/utils/adjustable_semaphore.h b/src/v/utils/adjustable_semaphore.h index e4403bb548e5b..47bd6db6a97ab 100644 --- a/src/v/utils/adjustable_semaphore.h +++ b/src/v/utils/adjustable_semaphore.h @@ -126,6 +126,16 @@ class adjustable_semaphore { uint64_t capacity() { return _capacity; } + /** + * Expose the underlying ssx::semaphore for callers that need to use + * free-function helpers such as ss::get_units(sem, n) or to check + * waiters(). The capacity tracking on this adjustable_semaphore is + * unaffected; callers that signal()/consume() directly through the + * underlying handle bypass capacity bookkeeping. + */ + ssx::semaphore& underlying() noexcept { return _sem; } + const ssx::semaphore& underlying() const noexcept { return _sem; } + private: ssx::semaphore _sem; diff --git a/tests/rptest/tests/cluster_config_test.py b/tests/rptest/tests/cluster_config_test.py index 4545bf40112be..85f6b1cb84a8c 100644 --- a/tests/rptest/tests/cluster_config_test.py +++ b/tests/rptest/tests/cluster_config_test.py @@ -43,7 +43,7 @@ RedpandaVersion, ) from rptest.tests.redpanda_test import RedpandaTest -from rptest.util import expect_exception, expect_http_error +from rptest.util import expect_exception, expect_http_error, wait_until_result from rptest.utils.si_utils import BucketView BOOTSTRAP_CONFIG = { @@ -71,7 +71,10 @@ def check_restart_clears(admin, redpanda, nodes=None): nodes = redpanda.nodes status = admin.get_cluster_config_status() - for n in status: + relevant_ids = {redpanda.node_id(n) for n in nodes} + relevant = [s for s in status if s["node_id"] in relevant_ids] + assert len(relevant_ids) == len(relevant) + for n in relevant: assert n["restart"] is True first_node = nodes[0] @@ -141,6 +144,33 @@ def is_complete(node): ) +def wait_for_active_nodes_version_status_sync(admin, redpanda, version, nodes): + """ + Like wait_for_version_status_sync, but only requires the subset of + `active_nodes` to agree on `version`. Statuses for other nodes (e.g. a + downed node still listed by the controller) are ignored. + """ + active_ids = {redpanda.node_id(n) for n in nodes} + + def is_complete(node): + node_status = admin.get_cluster_config_status(node=node) + relevant = [s for s in node_status if s["node_id"] in active_ids] + return len(relevant) == len(active_ids) and { + s["config_version"] for s in relevant + } == {version} + + for node in nodes: + wait_until( + lambda n=node: is_complete(n), + timeout_sec=10, + backoff_sec=0.5, + err_msg=( + f"Config status did not converge on {version} for active " + f"nodes {sorted(active_ids)}" + ), + ) + + class ClusterConfigBootstrapTest(RedpandaTest): def __init__(self, *args, **kwargs): super().__init__(*args, extra_rp_conf={}, **kwargs) @@ -2599,6 +2629,337 @@ def assert_restart_status(expect: bool): assert n["restart"] is False +class ClusterConfigMultiNodeBootstrapTest(RedpandaTest): + def __init__(self, test_context): + super().__init__( + test_context, num_brokers=3, si_settings=SISettings(test_context) + ) + self.admin = Admin(self.redpanda) + self.rpk = RpkTool(self.redpanda) + + def setUp(self): + # Skip starting redpanda, so that test can explicitly start + # it with some override_cfg_params + pass + + def _local_replica_stms(self, node, topic_name, partition): + """ + Return the set of stm names registered for `topic_name`/`partition` on + `node`'s local replica, or None if the replica has not materialized yet. + Suitable for passing to wait_until_result. + """ + node_id = self.redpanda.node_id(node) + state = self.admin.get_partition_state( + "kafka", topic_name, partition, node=node + ) + for r in state.get("replicas", []): + if r.get("raft_state", {}).get("node_id") == node_id: + return {s["name"] for s in r["raft_state"].get("stms", [])} + return None + + @cluster(num_nodes=3) + def test_node_delayed_restart(self): + """ + A node which has gone down should see the most up to date cluster config immediately in the bootstrap process, instead of needing to restart again. + """ + + def assert_restart_status_on_nodes(expect: bool, relevant_nodes): + relevant_ids = {self.redpanda.node_id(n) for n in relevant_nodes} + status = self.admin.get_cluster_config_status() + relevant = [s for s in status if s["node_id"] in relevant_ids] + assert len(relevant_ids) == len(relevant) + for n in relevant: + assert n["restart"] is expect, ( + f"Expected restart status {n['restart']} to be {expect}" + ) + + active_nodes = self.redpanda.nodes[0:2] + down_node = self.redpanda.nodes[2] + all_nodes = self.redpanda.nodes + self.redpanda.start(all_nodes) + + # Wait for config status to populate + wait_until( + lambda: len(self.admin.get_cluster_config_status()) == 3, + timeout_sec=30, + backoff_sec=1, + ) + + assert_restart_status_on_nodes(False, all_nodes) + + # Bring one of the nodes down. + self.redpanda.stop_node(down_node) + + # An arbitrary restart-requiring setting with a non-default value + new_setting = (CLOUD_TOPICS_CONFIG_STR, True) + patch_result = self.admin.patch_cluster_config(upsert=dict([new_setting])) + new_version = patch_result["config_version"] + wait_for_active_nodes_version_status_sync( + self.admin, self.redpanda, new_version, nodes=active_nodes + ) + assert_restart_status_on_nodes(True, active_nodes) + + # Restart existing nodes to get them into a clean state + check_restart_clears(self.admin, self.redpanda, nodes=active_nodes) + + config = { + TopicSpec.PROPERTY_STORAGE_MODE: TopicSpec.STORAGE_MODE_CLOUD, + } + topic_name = "tapioca" + self.rpk.create_topic( + topic=topic_name, + partitions=1, + replicas=3, + config=config, + ) + topic_desc = self.rpk.describe_topic_configs(topic_name) + assert ( + topic_desc[TopicSpec.PROPERTY_STORAGE_MODE][0] + == TopicSpec.STORAGE_MODE_CLOUD + ) + + # Start the node back up. + self.redpanda.start_node(down_node) + + # Verify ctp_stm is registered on down_node's local replica of the + # cloud topic. This proves bootstrap applied cloud_topics_enabled=true + # (a needs_restart=yes property) before partition_manager constructed + # the partition; otherwise ctp_stm would be missing on this node + # until another restart. + down_node_id = self.redpanda.node_id(down_node) + stm_names = wait_until_result( + lambda: self._local_replica_stms(down_node, topic_name, 0), + timeout_sec=30, + backoff_sec=1, + err_msg=f"{topic_name} replica never materialized on restarted " + f"node {down_node_id}", + ) + assert "ctp_stm" in stm_names, ( + f"ctp_stm missing on restarted node {down_node_id}; got stms {stm_names}. Bootstrap did not apply cloud_topics_enabled before partition_manager built {topic_name}." + ) + + status = self.admin.get_cluster_config_status() + for n in status: + assert n["restart"] is False + + @cluster(num_nodes=3) + def test_cloud_topic_on_joining_node(self): + """ + A node joining a cluster for the first time should pick up + cloud_topics_enabled (a needs_restart=yes property) from the + register_with_cluster join snapshot, so that partition_manager + registers ctp_stm when the cloud topic's partition is constructed. + """ + seed_nodes = self.redpanda.nodes[0:2] + joiner_node = self.redpanda.nodes[2] + + # Bring up a 2-node cluster first. + self.redpanda.start(seed_nodes) + wait_until( + lambda: len(self.admin.get_cluster_config_status()) == 2, + timeout_sec=30, + backoff_sec=1, + ) + + # Enable cloud_topics_enabled and restart both seeds so the value + # is in active on whichever seed ends up serving the joiner's + # register_with_cluster RPC and validating the cloud topic create. + new_setting = (CLOUD_TOPICS_CONFIG_STR, True) + patch_result = self.admin.patch_cluster_config(upsert=dict([new_setting])) + new_version = patch_result["config_version"] + wait_for_active_nodes_version_status_sync( + self.admin, self.redpanda, new_version, nodes=seed_nodes + ) + self.redpanda.restart_nodes(seed_nodes) + seed_ids = {self.redpanda.node_id(n) for n in seed_nodes} + wait_until( + lambda: all( + s["restart"] is False + for s in self.admin.get_cluster_config_status() + if s["node_id"] in seed_ids + ), + timeout_sec=30, + backoff_sec=1, + err_msg="seed restart flag did not clear after seed restart", + ) + + # Join the third node for the first time. + self.redpanda.start_node(joiner_node) + wait_until( + lambda: len(self.admin.get_cluster_config_status()) == 3, + timeout_sec=30, + backoff_sec=1, + ) + + # Create a cloud topic with rf=3 so the joiner hosts a replica. + topic_name = "tapioca_joiner" + self.rpk.create_topic( + topic=topic_name, + partitions=1, + replicas=3, + config={ + TopicSpec.PROPERTY_STORAGE_MODE: TopicSpec.STORAGE_MODE_CLOUD, + }, + ) + + # ctp_stm must be registered on the joiner's replica. If the + # joiner's bootstrap left cloud_topics_enabled in pending instead + # of active, partition_manager would build the partition without + # ctp_stm and we'd silently lose cloud-topics functionality on + # this node until another restart. + joiner_id = self.redpanda.node_id(joiner_node) + stm_names = wait_until_result( + lambda: self._local_replica_stms(joiner_node, topic_name, 0), + timeout_sec=30, + backoff_sec=1, + err_msg=f"{topic_name} replica never materialized on joiner " + f"node {joiner_id}", + ) + assert "ctp_stm" in stm_names, ( + f"ctp_stm missing on joiner node {joiner_id}; got stms {stm_names}. " + f"register_with_cluster snapshot did not apply cloud_topics_enabled " + f"before partition_manager built {topic_name}." + ) + + status = self.admin.get_cluster_config_status() + for n in status: + assert n["restart"] is False, ( + f"Unexpected restart=true after fresh join: {status}" + ) + + @cluster(num_nodes=3) + def test_cluster_recovery_needs_restart_property(self): + """ + After cluster recovery applies a needs_restart=yes property, the + active value should remain at the default until nodes restart. + After a restart, the recovered value should be in active because + bootstrap reads the local cache (which apply_delta -> store_delta + wrote during recovery). + """ + # Faster cluster metadata upload so the source backup is captured + # quickly. enable_cluster_metadata_upload_loop is true by default. + self.redpanda.add_extra_rp_conf( + { + "controller_snapshot_max_age_sec": 1, + "cloud_storage_cluster_metadata_upload_interval_ms": 1000, + } + ) + + all_nodes = self.redpanda.nodes + self.redpanda.start(all_nodes) + wait_until( + lambda: len(self.admin.get_cluster_config_status()) == 3, + timeout_sec=30, + backoff_sec=1, + ) + + PROPERTY_NAME = "storage_compaction_key_map_memory_limit_percent" + PROPERTY_DEFAULT = 12 + NEW_PROPERTY_VALUE = 6 + # storage_compaction_key_map_memory_limit_percent is needs_restart=yes with default 12. + new_setting = ( + PROPERTY_NAME, + NEW_PROPERTY_VALUE, + ) + patch_result = self.admin.patch_cluster_config(upsert=dict([new_setting])) + new_version = patch_result["config_version"] + wait_for_active_nodes_version_status_sync( + self.admin, self.redpanda, new_version, nodes=all_nodes + ) + + # Let the metadata upload loop capture the post-patch state. + time.sleep(5) + + # Wipe and bring up a fresh cluster. + self.redpanda.stop() + for n in all_nodes: + self.redpanda.remove_local_data(n) + self.redpanda.restart_nodes(all_nodes) + self.admin.await_stable_leader( + "controller", + partition=0, + namespace="redpanda", + timeout_s=60, + backoff_s=2, + ) + + # Use suppress_pending=True so we read the active value only, not + # the pending-aware view that rpk cluster_config_get returns by + # default. We want to verify that the recovered value lands in + # pending without changing active until restart. + for n in all_nodes: + v = self.admin.get_cluster_config( + node=n, key=PROPERTY_NAME, suppress_pending=True + )[PROPERTY_NAME] + assert v == PROPERTY_DEFAULT, ( + f"Expected active {PROPERTY_NAME}={v} to be default value " + f"{PROPERTY_DEFAULT=} on {n.name} pre-recovery" + ) + + # Run cluster recovery. + self.admin.initialize_cluster_recovery() + + def cluster_recovery_complete(): + return ( + "inactive" in self.admin.get_cluster_recovery_status().json()["state"] + ) + + wait_until(cluster_recovery_complete, timeout_sec=60, backoff_sec=1) + + status = self.admin.get_cluster_config_status() + for n in status: + assert n["restart"] is True, ( + f"Expected restart=true after recovery for needs_restart " + f"property, got status {status}" + ) + + # After recovery, the needs_restart=yes property is in pending. + # Active stays at the pre-recovery value; the pending-aware view + # already reflects the recovered value. + for n in all_nodes: + active = self.admin.get_cluster_config( + node=n, key=PROPERTY_NAME, suppress_pending=True + )[PROPERTY_NAME] + assert active == PROPERTY_DEFAULT, ( + f"Expected active {PROPERTY_NAME}={active} to still be " + f"default {PROPERTY_DEFAULT=} on {n.name} after recovery " + f"(needs_restart=yes properties land in pending, not active)" + ) + pending = self.admin.get_cluster_config(node=n, key=PROPERTY_NAME)[ + PROPERTY_NAME + ] + assert pending == NEW_PROPERTY_VALUE, ( + f"Expected pending-aware view of {PROPERTY_NAME}={pending} " + f"to reflect the recovered value {NEW_PROPERTY_VALUE=} on " + f"{n.name}" + ) + + self.redpanda.restart_nodes(all_nodes) + self.admin.await_stable_leader( + "controller", + partition=0, + namespace="redpanda", + timeout_s=60, + backoff_s=2, + ) + + # After restart, hydrate_cluster_config -> load_cache -> + # preload_local writes the recovered value into active. + for n in all_nodes: + v = self.admin.get_cluster_config( + node=n, key=PROPERTY_NAME, suppress_pending=True + )[PROPERTY_NAME] + assert v == NEW_PROPERTY_VALUE, ( + f"Expected active {PROPERTY_NAME}={v} to be " + f"{NEW_PROPERTY_VALUE=} on {n.name} after recovery + restart" + ) + status = self.admin.get_cluster_config_status() + for n in status: + assert n["restart"] is False, ( + f"Unexpected restart=true after post-recovery restart: {status}" + ) + + class ClusterConfigLegacyDefaultTest(RedpandaTest, ClusterConfigHelpersMixin): """ Test config::legacy_default feature, that defaults for features can be