diff --git a/src/v/cluster/cloud_metadata/cluster_recovery_backend.cc b/src/v/cluster/cloud_metadata/cluster_recovery_backend.cc index 593542fe77e32..81e978c672b4d 100644 --- a/src/v/cluster/cloud_metadata/cluster_recovery_backend.cc +++ b/src/v/cluster/cloud_metadata/cluster_recovery_backend.cc @@ -58,6 +58,7 @@ cluster_recovery_backend::cluster_recovery_backend( cluster::members_table& members_table, features::feature_table& features, security::credential_store& creds, + security::role_store& roles, cluster::topic_table& topics, cluster::controller_api& api, cluster::feature_manager& feature_manager, @@ -75,6 +76,7 @@ cluster_recovery_backend::cluster_recovery_backend( , _members_table(members_table) , _features(features) , _creds(creds) + , _roles(roles) , _topics(topics) , _controller_api(api) , _feature_manager(feature_manager) @@ -225,6 +227,19 @@ ss::future cluster_recovery_backend::do_action( co_return cluster::errc::replication_error; } } + + // Role recovery is bundled within ACL recovery stage in order to + // avoid adding a new recovery stage enum that isn't backportable. + retry_chain_node roles_retry(&parent_retry); + for (auto& role : actions.roles) { + std::error_code err = co_await _security_frontend.create_role( + std::move(role.name), + std::move(role.role), + roles_retry.get_deadline()); + if (err != make_error_code(errc::success)) { + co_return cluster::errc::replication_error; + } + } break; } case recovery_stage::recovered_remote_topic_data: { @@ -506,7 +521,7 @@ ss::future<> cluster_recovery_backend::recover_until_term_change() { // We may need to restore state from the controller snapshot. cloud_metadata::controller_snapshot_reconciler reconciler( - _recovery_table.local(), _features, _creds, _topics); + _recovery_table.local(), _features, _creds, _roles, _topics); auto controller_actions = reconciler.get_actions( controller_snap.value()); vlog( diff --git a/src/v/cluster/cloud_metadata/cluster_recovery_backend.h b/src/v/cluster/cloud_metadata/cluster_recovery_backend.h index 819d01479cdb0..f01c15cd4c36a 100644 --- a/src/v/cluster/cloud_metadata/cluster_recovery_backend.h +++ b/src/v/cluster/cloud_metadata/cluster_recovery_backend.h @@ -42,6 +42,7 @@ class cluster_recovery_backend { cluster::members_table&, features::feature_table&, security::credential_store&, + security::role_store&, cluster::topic_table&, cluster::controller_api&, cluster::feature_manager&, @@ -106,6 +107,7 @@ class cluster_recovery_backend { cluster::members_table& _members_table; features::feature_table& _features; security::credential_store& _creds; + security::role_store& _roles; cluster::topic_table& _topics; cluster::controller_api& _controller_api; diff --git a/src/v/cluster/cloud_metadata/tests/cluster_metadata_utils.h b/src/v/cluster/cloud_metadata/tests/cluster_metadata_utils.h index 66c5afcfcf107..ee94fd58963e3 100644 --- a/src/v/cluster/cloud_metadata/tests/cluster_metadata_utils.h +++ b/src/v/cluster/cloud_metadata/tests/cluster_metadata_utils.h @@ -30,11 +30,10 @@ inline security::license get_test_license() { return security::make_license(license_str); } -inline security::acl_binding binding_for_user(const ss::sstring& user) { - const security::acl_principal principal{ - security::principal_type::ephemeral_user, user}; +inline security::acl_binding +binding_for_principal(security::acl_principal principal) { security::acl_entry acl_entry{ - principal, + std::move(principal), security::acl_host::wildcard_host(), security::acl_operation::all, security::acl_permission::allow}; @@ -47,6 +46,16 @@ inline security::acl_binding binding_for_user(const ss::sstring& user) { return binding; } +inline security::acl_binding binding_for_user(const ss::sstring& user) { + return binding_for_principal( + security::acl_principal{security::principal_type::ephemeral_user, user}); +} + +inline security::acl_binding binding_for_role(const ss::sstring& role_name) { + return binding_for_principal( + security::acl_principal{security::principal_type::role, role_name}); +} + inline topic_properties uploadable_topic_properties() { auto props = random_topic_properties(); if ( diff --git a/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc b/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc index 12ac2c340c353..2e02b16452a1e 100644 --- a/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc +++ b/src/v/cluster/cloud_metadata/tests/cluster_recovery_backend_test.cc @@ -32,6 +32,7 @@ #include "partition_manager.h" #include "redpanda/application.h" #include "redpanda/tests/fixture.h" +#include "security/role_store.h" #include "security/scram_credential.h" #include "security/types.h" #include "ssx/future-util.h" @@ -134,12 +135,27 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) { model::timeout_clock::now() + 30s) .get(); - // Create an ACL. + // Create a Role. + security::role_name role_name{"test_role"}; + app.controller->get_security_frontend() + .local() + .create_role( + role_name, + security::role({{security::role_member_type::user, "userguy"}}), + model::timeout_clock::now() + 30s) + .get(); + + // Create ACLs auto binding = binding_for_user("__pandaproxy"); app.controller->get_security_frontend() .local() .create_acls({binding}, 5s) .get(); + auto role_binding = binding_for_role(role_name); + app.controller->get_security_frontend() + .local() + .create_acls({role_binding}, 5s) + .get(); // Create some topics, but disable the upload loop so we can manually flush // their manifests. @@ -261,11 +277,32 @@ TEST_P(ClusterRecoveryBackendLeadershipParamTest, TestRecoveryControllerState) { .has_value()); ASSERT_EQ( 1, config::shard_local_cfg().log_segment_size_jitter_percent.value()); + + // Validate User restoration. ASSERT_TRUE(app.controller->get_credential_store().local().contains( security::credential_user{"userguy"})); + + // Validate Role restoration. + const auto& role_store = app.controller->get_role_store().local(); + ASSERT_TRUE(role_store.contains(role_name)); + auto role = role_store.get(role_name); + ASSERT_TRUE(role.has_value()); + const auto& role_members = role.value().members(); + ASSERT_EQ(role_members.size(), 1); ASSERT_EQ( - 1, - app.controller->get_authorizer().local().all_bindings().get().size()); + role_members.contains( + security::role_member{security::role_member_type::user, "userguy"}), + true); + + // Validate ACL restoration. + auto acl_bindings + = app.controller->get_authorizer().local().all_bindings().get(); + ASSERT_EQ(acl_bindings.size(), 2); + absl::flat_hash_set bindings_set( + acl_bindings.begin(), acl_bindings.end()); + ASSERT_TRUE(bindings_set.contains(binding)); + ASSERT_TRUE(bindings_set.contains(role_binding)); + // NOTE: internal topics may be created. auto topic_count = app.controller->get_topics_state().local().all_topics_count(); diff --git a/src/v/cluster/cloud_metadata/tests/controller_snapshot_reconciliation_test.cc b/src/v/cluster/cloud_metadata/tests/controller_snapshot_reconciliation_test.cc index 60771471b5186..e09708912c280 100644 --- a/src/v/cluster/cloud_metadata/tests/controller_snapshot_reconciliation_test.cc +++ b/src/v/cluster/cloud_metadata/tests/controller_snapshot_reconciliation_test.cc @@ -54,6 +54,7 @@ class controller_snapshot_reconciliation_fixture app.controller->get_cluster_recovery_table().local(), app.feature_table.local(), app.controller->get_credential_store().local(), + app.controller->get_role_store().local(), app.controller->get_topics_state().local()) {} void SetUp() override { @@ -106,7 +107,7 @@ void validate_actions( actions_contain(actions, cluster::recovery_stage::recovered_users)); ASSERT_EQ( - !actions.acls.empty(), + !actions.acls.empty() || !actions.roles.empty(), actions_contain(actions, cluster::recovery_stage::recovered_acls)); ASSERT_EQ( @@ -238,6 +239,19 @@ TEST_F(controller_snapshot_reconciliation_fixture, test_reconciler_acls) { validate_actions(actions); } +TEST_F(controller_snapshot_reconciliation_fixture, test_reconciler_roles) { + cluster::controller_snapshot snap; + auto& security_snap = snap.security; + security_snap.roles.emplace_back( + security::role_name("role_name"), + security::role({{security::role_member_type::user, "test_user"}})); + + auto actions = reconciler.get_actions(snap); + ASSERT_TRUE( + actions_contain(actions, cluster::recovery_stage::recovered_acls)); + validate_actions(actions); +} + TEST_F( controller_snapshot_reconciliation_fixture, test_reconcile_remote_topics) { cluster::controller_snapshot snap; diff --git a/src/v/cluster/cluster_recovery_reconciler.cc b/src/v/cluster/cluster_recovery_reconciler.cc index ea609deaa8586..a2d8e6c2fc863 100644 --- a/src/v/cluster/cluster_recovery_reconciler.cc +++ b/src/v/cluster/cluster_recovery_reconciler.cc @@ -99,7 +99,16 @@ controller_snapshot_reconciler::get_actions( // since this is idempotent. actions.acls.emplace_back(binding); } - if (!actions.acls.empty()) { + + // Role recovery is bundled within ACL recovery stage in order to + // avoid adding a new recovery stage enum that isn't backportable. + for (const auto& snap_role : snap.security.roles) { + if (!_roles.contains(snap_role.name)) { + actions.roles.emplace_back(snap_role.name, snap_role.role); + } + } + + if (!actions.acls.empty() || !actions.roles.empty()) { actions.stages.emplace_back(recovery_stage::recovered_acls); } } diff --git a/src/v/cluster/cluster_recovery_reconciler.h b/src/v/cluster/cluster_recovery_reconciler.h index 6fd2bef82c6d0..b8973973f69bf 100644 --- a/src/v/cluster/cluster_recovery_reconciler.h +++ b/src/v/cluster/cluster_recovery_reconciler.h @@ -15,6 +15,7 @@ #include "cluster/types.h" #include "features/feature_table.h" #include "security/credential_store.h" +#include "security/role_store.h" #include @@ -23,6 +24,11 @@ struct user_credential { security::credential_user user; security::scram_credential cred; }; + +struct role_recovery { + security::role_name name; + security::role role; +}; } // namespace cluster namespace cluster::cloud_metadata { @@ -41,6 +47,7 @@ class controller_snapshot_reconciler { config_update_request config; fragmented_vector users; fragmented_vector acls; + fragmented_vector roles; fragmented_vector remote_topics; fragmented_vector local_topics; // TODO: restore wasm plugins/transforms @@ -60,10 +67,12 @@ class controller_snapshot_reconciler { cluster::cluster_recovery_table& recovery, features::feature_table& features, security::credential_store& creds, + security::role_store& roles, cluster::topic_table& topics) : _recovery_table(recovery) , _feature_table(features) , _creds(creds) + , _roles(roles) , _topic_table(topics) {} // Returns a list of properties that are explicitly not recovered, since @@ -82,6 +91,7 @@ class controller_snapshot_reconciler { cluster::cluster_recovery_table& _recovery_table; features::feature_table& _feature_table; security::credential_store& _creds; + security::role_store& _roles; cluster::topic_table& _topic_table; }; diff --git a/src/v/cluster/controller.cc b/src/v/cluster/controller.cc index 121b92be5bc31..5fa5a3798cb61 100644 --- a/src/v/cluster/controller.cc +++ b/src/v/cluster/controller.cc @@ -780,6 +780,7 @@ ss::future<> controller::start( _members_table.local(), _feature_table.local(), _credentials.local(), + _roles.local(), _tp_state.local(), _api.local(), _feature_manager.local(), diff --git a/tests/rptest/clients/rpk.py b/tests/rptest/clients/rpk.py index 60d947a19c706..17321b7113954 100644 --- a/tests/rptest/clients/rpk.py +++ b/tests/rptest/clients/rpk.py @@ -16,7 +16,7 @@ import time import typing from collections import namedtuple -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Any, Iterator, Optional from ducktape.cluster.cluster import ClusterNode @@ -217,6 +217,24 @@ class RpkTable: rows: list[list[str]] +@dataclass +class RPKACLInput: + # Can't use mutables in defaults of dataclass + # https://docs.python.org/3/library/dataclasses.html#dataclasses.field + allow_principal: list[str] = field(default_factory=list[str]) + deny_principal: list[str] = field(default_factory=list[str]) + allow_role: list[str] = field(default_factory=list[str]) + deny_role: list[str] = field(default_factory=list[str]) + allow_host: list[str] = field(default_factory=list[str]) + deny_host: list[str] = field(default_factory=list[str]) + topic: list[str] = field(default_factory=list[str]) + group: list[str] = field(default_factory=list[str]) + operation: list[str] = field(default_factory=list[str]) + txn_id: list[str] = field(default_factory=list[str]) + cluster: bool = False + resource_pattern_type: str = "" + + def parse_rpk_table(out): lines = out.splitlines() return parse_rpk_table_lines(lines) @@ -641,7 +659,7 @@ def topic_line(line): lines = output.splitlines() for i, line in enumerate(lines): if line.split() == ["NAME", "PARTITIONS", "REPLICAS"]: - return map(topic_line, lines[i + 1 :]) + return list(map(topic_line, lines[i + 1 :])) assert False, "Unexpected output format" @@ -1620,6 +1638,81 @@ def acl_create_allow_cluster( return output + def acl_create(self, acl: RPKACLInput): + cmd = ( + [ + self._rpk_binary(), + "security", + "acl", + "create", + ] + + self._schema_registry_conn_settings() + + self._kafka_conn_settings() + ) + + def append_flag(flag: str, values: list[str]): + if values: + cmd.extend([flag, ",".join(values)]) + + def append_bool_flag(flag: str, value: bool): + if value: + cmd.append(flag) + + append_flag("--allow-principal", acl.allow_principal) + append_flag("--deny-principal", acl.deny_principal) + append_flag("--allow-role", acl.allow_role) + append_flag("--deny-role", acl.deny_role) + append_flag("--allow-host", acl.allow_host) + append_flag("--deny-host", acl.deny_host) + append_flag("--topic", acl.topic) + append_flag("--group", acl.group) + append_flag("--operation", acl.operation) + append_flag("--transactional-id", acl.txn_id) + append_bool_flag("--cluster", acl.cluster) + + if acl.resource_pattern_type: + cmd += ["--resource-pattern-type", acl.resource_pattern_type] + + return self._execute(cmd) + + def acl_delete(self, acl: RPKACLInput): + cmd = ( + [ + self._rpk_binary(), + "security", + "acl", + "delete", + "--no-confirm", + ] + + self._schema_registry_conn_settings() + + self._kafka_conn_settings() + ) + + def append_flag(flag: str, values: list[str]): + if values: + cmd.extend([flag, ",".join(values)]) + + def append_bool_flag(flag: str, value: bool): + if value: + cmd.append(flag) + + append_flag("--allow-principal", acl.allow_principal) + append_flag("--deny-principal", acl.deny_principal) + append_flag("--allow-role", acl.allow_role) + append_flag("--deny-role", acl.deny_role) + append_flag("--allow-host", acl.allow_host) + append_flag("--deny-host", acl.deny_host) + append_flag("--topic", acl.topic) + append_flag("--group", acl.group) + append_flag("--operation", acl.operation) + append_flag("--transactional-id", acl.txn_id) + append_bool_flag("--cluster", acl.cluster) + + if acl.resource_pattern_type: + cmd += ["--resource-pattern-type", acl.resource_pattern_type] + + return self._execute(cmd) + def cluster_metadata_id(self): """ Calls 'cluster metadata' and returns the cluster ID, if set, @@ -2191,28 +2284,28 @@ def _sasl_mechanism(self): def _tls_enabled(self): return self._security.tls_enabled - def create_role(self, role_name): + def create_role(self, role_name) -> dict[str, Any]: return self._run_role(["create", role_name]) - def list_roles(self): + def list_roles(self) -> dict[str, Any]: return self._run_role(["list"]) - def delete_role(self, role_name): + def delete_role(self, role_name) -> dict[str, Any]: cmd = ["delete", role_name, "--no-confirm"] + self._kafka_conn_settings() return self._run_role(cmd) - def assign_role(self, role_name, principals): + def assign_role(self, role_name, principals) -> dict[str, Any]: cmd = ["assign", role_name, "--principal", ",".join(principals)] return self._run_role(cmd) - def unassign_role(self, role_name, principals): + def unassign_role(self, role_name, principals) -> dict[str, Any]: cmd = ["unassign", role_name, "--principal", ",".join(principals)] return self._run_role(cmd) - def describe_role(self, role_name): + def describe_role(self, role_name) -> dict[str, Any]: return self._run_role(["describe", role_name] + self._kafka_conn_settings()) - def _run_role(self, cmd, output_format="json"): + def _run_role(self, cmd, output_format="json") -> dict[str, Any] | str: cmd = [ self._rpk_binary(), "security", diff --git a/tests/rptest/tests/cluster_recovery_test.py b/tests/rptest/tests/cluster_recovery_test.py index 5670c87aeeb76..f0e47ef839f8b 100644 --- a/tests/rptest/tests/cluster_recovery_test.py +++ b/tests/rptest/tests/cluster_recovery_test.py @@ -13,7 +13,8 @@ from ducktape.tests.test import TestContext from ducktape.utils.util import wait_until -from rptest.clients.rpk import RpkTool +from rptest.services.admin import RoleMember +from rptest.clients.rpk import RPKACLInput, RpkTool from rptest.clients.types import TopicSpec from rptest.services.cluster import cluster from rptest.services.kgo_verifier_services import KgoVerifierProducer @@ -91,12 +92,25 @@ def test_basic_controller_snapshot_restore(self): algorithm = "SCRAM-SHA-256" users = dict() users["admin"] = None # Created by the RedpandaService. + roles: dict[str, set[RoleMember]] = dict() for _ in range(3): user = f"user-{random_string(6)}" password = f"user-{random_string(6)}" users[user] = password self.redpanda._admin.create_user(user, password, algorithm) rpk.acl_create_allow_cluster(user, op="describe") + + role_name: str = f"role-{random_string(6)}" + role_members = [RoleMember.User(user)] + roles[role_name] = set(role_members) + self.redpanda._admin.create_role(role_name) + self.redpanda._admin.update_role_members(role_name, add=role_members) + + role_acl = RPKACLInput() + role_acl.allow_role = [role_name] + role_acl.cluster = True + role_acl.operation = ["ALL"] + rpk.acl_create(role_acl) rpk.acl_create_allow_cluster("admin", op="describe") time.sleep(5) @@ -111,6 +125,20 @@ def test_basic_controller_snapshot_restore(self): "controller", partition=0, namespace="redpanda", timeout_s=60, backoff_s=2 ) + self.logger.info("Verifying that no data is present before recovery") + + assert len(rpk.list_topics()) == 0, "Expected no topics before recovery" + assert len(self.redpanda._admin.list_users()) == 0, ( + "Expected no users before recovery" + ) + # Expecting 1 line for the header only. + assert len(rpk.acl_list().splitlines()) == 1, "Expected no ACLs before recovery" + assert len(rpk.list_roles().get("roles", [])) == 0, ( + "Expected no roles before recovery" + ) + + self.logger.info("Initializing cluster recovery") + self.redpanda._admin.initialize_cluster_recovery() def cluster_recovery_complete(): @@ -140,6 +168,31 @@ def cluster_recovery_complete(): found = True assert found, f"Couldn't find {u} in {acls_lines}" + self.logger.info("Verifying roles") + + restored_roles = rpk.list_roles().get("roles", []) + assert set(restored_roles) == set(roles.keys()), ( + f"{restored_roles} vs {roles.keys()}" + ) + + for role_name, members in roles.items(): + res = rpk.describe_role(role_name) + restored_role_members = set( + RoleMember.User(member["name"]) + for member in res.get("members", []) + if member["principal_type"] == RoleMember.PrincipalType.USER.value + ) + + assert restored_role_members == members, ( + f"Role {role_name} members mismatch: {restored_role_members} vs {members}" + ) + + found_role_acl = False + for l in acls_lines: + if role_name in l and "ALLOW" in l and "ALL" in l: + found_role_acl = True + assert found_role_acl, f"Couldn't find {role_name} in {acls_lines}" + @cluster(num_nodes=4) def test_bootstrap_with_recovery(self): """