Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) {
// Update the cluster config (via the controller, rather than shard local).
cluster::config_update_request req;
req.upsert.emplace_back("log_segment_size_jitter_percent", "1");
req.upsert.emplace_back("log_segment_size", "2147483649");
app.controller->get_config_frontend()
.local()
.patch(std::move(req), model::timeout_clock::now() + 30s)
Expand Down Expand Up @@ -228,6 +229,7 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) {
raft0 = nullptr;
restart(should_wipe::yes);
task_local_cfg.get("log_segment_size_jitter_percent").reset();
task_local_cfg.get("log_segment_size").reset();
RPTEST_REQUIRE_EVENTUALLY(5s, [this] {
return app.storage.local().get_cluster_uuid().has_value();
});
Expand All @@ -239,6 +241,7 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) {
.has_value());
ASSERT_NE(
1, config::shard_local_cfg().log_segment_size_jitter_percent.value());
ASSERT_NE(2147483649, config::shard_local_cfg().log_segment_size.value());
ASSERT_TRUE(!app.controller->get_credential_store().local().contains(
security::credential_user{"userguy"}));
ASSERT_EQ(
Expand Down Expand Up @@ -274,14 +277,26 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) {
.is_recovery_active();
});

bool has_restarted = false;
// Validate the controller state is restored.
auto validate_post_recovery = [&] {
ASSERT_TRUE(app.controller->get_feature_table()
.local()
.get_configured_license()
.has_value());
// log_segment_size_jitter_percent is marked as needs_restart::yes. We
// won't see its recovered value reflected until the node is restarted.
auto log_segment_size_jitter_expected
= has_restarted ? 1
: config::shard_local_cfg()
.log_segment_size_jitter_percent.default_value();
ASSERT_EQ(
1, config::shard_local_cfg().log_segment_size_jitter_percent.value());
log_segment_size_jitter_expected,
config::shard_local_cfg().log_segment_size_jitter_percent.value());
// On the other hand, log_segment_size is marked as needs_restart::no,
// so we will see its value reflected immediately.
ASSERT_EQ(
2147483649, config::shard_local_cfg().log_segment_size.value());

// Validate User restoration.
ASSERT_TRUE(app.controller->get_credential_store().local().contains(
Expand Down Expand Up @@ -339,6 +354,7 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) {

// Sanity check that the above invariants still hold after restarting.
restart(should_wipe::no);
has_restarted = true;
RPTEST_REQUIRE_EVENTUALLY(5s, [this] {
auto latest_recovery = app.controller->get_cluster_recovery_table()
.local()
Expand Down
75 changes: 68 additions & 7 deletions src/v/cluster/cluster_discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "features/feature_table.h"
#include "model/fundamental.h"
#include "model/metadata.h"
#include "storage/api.h"
#include "storage/ntp_config.h"
#include "utils/directory_walker.h"

#include <seastar/core/seastar.hh>
Expand All @@ -33,12 +33,12 @@ namespace cluster {

cluster_discovery::cluster_discovery(
const model::node_uuid& node_uuid,
storage::api& storage,
std::optional<model::cluster_uuid> cluster_uuid,
ss::abort_source& as)
: _node_uuid(node_uuid)
, _join_retry_jitter(config::shard_local_cfg().join_retry_timeout_ms())
, _cluster_uuid(std::move(cluster_uuid))
, _join_retry_jitter(config::shard_local_cfg_unsafe().join_retry_timeout_ms())
, _join_timeout(std::chrono::seconds(2))
, _storage(storage)
, _as(as) {}

ss::future<cluster_discovery::registration_result>
Expand Down Expand Up @@ -115,7 +115,7 @@ ss::future<bool> cluster_discovery::is_cluster_founder() {
}
_founding_brokers = brokers{make_self_broker(config::node())};
_node_ids_by_uuid = node_ids_by_uuid{
{_storage.node_uuid(), _founding_brokers.front().id()}};
{_node_uuid, _founding_brokers.front().id()}};
_is_cluster_founder = true;
co_return *_is_cluster_founder;
}
Expand Down Expand Up @@ -209,6 +209,67 @@ cluster_discovery::dispatch_node_uuid_registration_to_seeds() {
co_return std::nullopt;
}

ss::future<std::optional<iobuf>>
cluster_discovery::fetch_controller_snapshot_from_leader(
const std::vector<model::broker>& peers) {
constexpr auto fetch_timeout = std::chrono::seconds(2);
for (const auto& broker : peers) {
const auto& addr = broker.rpc_address();
vlog(
clusterlog.info,
"Fetching controller snapshot from {} ({})",
broker.id(),
addr);
result<fetch_controller_snapshot_reply> r(
fetch_controller_snapshot_reply{});
try {
r = co_await do_with_client_one_shot<controller_client_protocol>(
addr,
config::node().rpc_server_tls(),
fetch_timeout,
rpc::transport_version::v2,
[fetch_timeout](controller_client_protocol c) {
return c
.fetch_controller_snapshot(
fetch_controller_snapshot_request{
features::feature_table::get_earliest_logical_version(),
features::feature_table::get_latest_logical_version()},
rpc::client_opts(rpc::clock_type::now() + fetch_timeout))
.then(&rpc::get_ctx_data<fetch_controller_snapshot_reply>);
});
} catch (...) {
vlog(
clusterlog.warn,
"Error fetching controller snapshot from {} ({}), retrying: {}",
broker.id(),
addr,
std::current_exception());
continue;
}
if (r.has_error()) {
vlog(
clusterlog.warn,
"Error fetching controller snapshot from {} ({}): {}, retrying",
broker.id(),
addr,
r.error().message());
continue;
}
auto& reply = r.value();
if (!reply.controller_snapshot.has_value()) {
vlog(
clusterlog.debug,
"Peer {} ({}) not ready to produce controller snapshot, trying "
"next peer",
broker.id(),
addr);
continue;
}
co_return std::move(reply.controller_snapshot);
}
co_return std::nullopt;
}

ss::future<cluster_bootstrap_info_reply>
cluster_discovery::request_cluster_bootstrap_info_single(
net::unresolved_address addr) const {
Expand Down Expand Up @@ -350,7 +411,7 @@ ss::future<> cluster_discovery::discover_founding_brokers() {
_is_cluster_founder = false;
co_return;
}
if (_storage.get_cluster_uuid().has_value()) {
if (_cluster_uuid.has_value()) {
_is_cluster_founder = false;
co_return;
}
Expand Down Expand Up @@ -449,7 +510,7 @@ ss::future<> cluster_discovery::discover_founding_brokers() {
vassert(
broker.id() != model::unassigned_node_id,
"Should have been assigned before");
node_uuid = _storage.node_uuid();
node_uuid = _node_uuid;
} else {
cluster::cluster_bootstrap_info_reply& reply = replies[seed.addr];

Expand Down
27 changes: 20 additions & 7 deletions src/v/cluster/cluster_discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,6 @@
#include <optional>
#include <vector>

namespace storage {
class kvstore;
class api;
} // namespace storage

namespace cluster {
struct cluster_bootstrap_info_reply;

Expand Down Expand Up @@ -80,7 +75,9 @@ class cluster_discovery {
};

cluster_discovery(
const model::node_uuid& node_uuid, storage::api&, ss::abort_source&);
const model::node_uuid& node_uuid,
std::optional<model::cluster_uuid> cluster_uuid,
ss::abort_source&);

// Register with the cluster:
// - If we are a fresh cluster founder, broadcast to other founders
Expand Down Expand Up @@ -124,6 +121,22 @@ class cluster_discovery {
// \pre get_node_ids_by_uuid() has never been called
node_ids_by_uuid& get_node_ids_by_uuid();

// Fetch a fresh controller_join_snapshot from a peer. Used by
// restarting nodes (those that already have a node_id and are not
// going through register_with_cluster) so that bootstrap can apply
// the current cluster-config view to shard_local_cfg before any
// downstream service reads it.
//
// Iterates `peers` in order, returning the first valid response.
// The responder forwards to the controller leader if it is not the
// leader itself, so the result is leader-authoritative regardless
// of which peer answered. Returns nullopt if every peer fails or
// none have a snapshot ready; the caller falls through to whatever
// shard_local_cfg view was loaded from the local cache.
static ss::future<std::optional<iobuf>>
fetch_controller_snapshot_from_leader(
const std::vector<model::broker>& peers);

private:
// Sends requests to each seed server to register the local node UUID
// until one succeeds. Returns nullopt if registration did not succeed.
Expand All @@ -147,11 +160,11 @@ class cluster_discovery {
ss::future<> discover_founding_brokers();

const model::node_uuid _node_uuid;
const std::optional<model::cluster_uuid> _cluster_uuid;
simple_time_jitter<model::timeout_clock> _join_retry_jitter;
const std::chrono::milliseconds _join_timeout;

std::optional<bool> _is_cluster_founder;
storage::api& _storage;
ss::abort_source& _as;
brokers _founding_brokers;
node_ids_by_uuid _node_ids_by_uuid;
Expand Down
Loading
Loading