diff --git a/ansible-runner/project b/ansible-runner/project index 7f18b37..6b58301 160000 --- a/ansible-runner/project +++ b/ansible-runner/project @@ -1 +1 @@ -Subproject commit 7f18b37e90139968561017baf0debb0a18615fe5 +Subproject commit 6b5830182271ebdf3e8ac21666784de3cf4458d4 diff --git a/modules/coact.py b/modules/coact.py index d20b3bf..2e04345 100644 --- a/modules/coact.py +++ b/modules/coact.py @@ -1076,7 +1076,6 @@ def overaged(self, data: dict, threshold: float = 100.0) -> Iterator[OveragePoin ) - # For backwards compatibility, allow running this module directly if __name__ == '__main__': coact(obj={}) diff --git a/modules/coactd.py b/modules/coactd.py index 94f226c..c1afbf1 100644 --- a/modules/coactd.py +++ b/modules/coactd.py @@ -404,7 +404,8 @@ class RepoRegistration(Registration): 'RepoRemoveUser', 'RepoChangeComputeRequirement', 'RepoComputeAllocation', - 'RepoUpdateFeature' + 'RepoUpdateFeature', + 'FacilityComputeAllocation' ] REPO_USERS_GQL = gql(""" @@ -450,6 +451,24 @@ class RepoRegistration(Registration): } """) + REPOS_WITH_ALLOCATIONS_GQL = gql(""" + query reposWithAllocations($facilityName: String!) { + repos(filter: {facility: $facilityName}) { + Id + name + facility + currentComputeAllocations { + Id + clustername + percentOfFacility + start + end + allocatedNodesCount + } + } + } + """) + FACILITY_CURRENT_COMPUTE_CGL = gql(""" query facility( $facility: String! ) { facility(filter: {name: $facility}) { @@ -515,6 +534,13 @@ def do(self, req_id, op_type, req_type, approval, req, dry_run): elif req_type == 'RepoUpdateFeature': return self.do_feature(repo, facility) + elif req_type == 'FacilityComputeAllocation': + clustername = req.get('clustername', None) + assert facility and clustername + return self.do_facility_compute_allocation_cascade( + facility, clustername, dry_run=dry_run + ) + return None def do_new_repo( @@ -902,6 +928,103 @@ def do_compute_requirement(self, repo: str, facility: str, requirement: str, pla def do_feature(self, repo, facility, dry_run: bool = False) -> bool: raise NotImplementedError("do_feature not yet implemented") + def do_facility_compute_allocation_cascade( + self, + facility: str, + clustername: str, + dry_run: bool = False + ) -> bool: + """ + Handle facility-level compute allocation changes by updating all affected repo allocations. + Queries the facility record directly for the current purchased node count. + """ + # Fetch current purchased nodes from the facility record + facility_resp = self.back_channel.execute( + self.FACILITY_CURRENT_COMPUTE_CGL, + {'facility': facility} + ) + fac_data = facility_resp.get('facility', {}) + new_purchased = None + for cp in fac_data.get('computepurchases', []): + if cp['clustername'].lower() == clustername.lower(): + new_purchased = cp['purchased'] + break + + if new_purchased is None: + self.logger.error(f"No purchase record found for {facility}@{clustername} - cannot cascade") + return False + + if new_purchased <= 0: + self.logger.error(f"Invalid purchased nodes: {new_purchased} for {facility}@{clustername} - cannot cascade") + return False + + self.logger.info( + f"Processing facility compute allocation cascade: {facility}@{clustername} " + f"-> {new_purchased} purchased nodes" + ) + + try: + # Get all repositories with allocations on this facility/cluster + repos_resp = self.back_channel.execute( + self.REPOS_WITH_ALLOCATIONS_GQL, + {'facilityName': facility} + ) + + affected_repos = [] + for repo in repos_resp['repos']: + for allocation in repo['currentComputeAllocations']: + if allocation['clustername'].lower() == clustername.lower(): + affected_repos.append({ + 'repo': repo, + 'allocation': allocation + }) + + self.logger.info(f"Found {len(affected_repos)} repo allocations to update on {facility}@{clustername}") + + # Process each affected repository allocation + update_count = 0 + for item in affected_repos: + repo = item['repo'] + allocation = item['allocation'] + + # Calculate new node allocation maintaining the same percentage + percent_of_facility = allocation['percentOfFacility'] + new_allocated_nodes = (percent_of_facility / 100.0) * new_purchased + + self.logger.info( + f"Updating {repo['facility']}:{repo['name']} on {clustername}: " + f"{percent_of_facility}% -> {new_allocated_nodes:.2f} nodes (was {allocation['allocatedNodesCount']})" + ) + + try: + self.do_repo_compute_allocation( + repo['name'], + repo['facility'], + clustername, + percent_of_facility, + new_allocated_nodes, + pdl.parse(allocation['start'], timezone='UTC'), + pdl.parse(allocation['end'], timezone='UTC') if allocation['end'] else None, + dry_run=dry_run, + ) + update_count += 1 + self.logger.info(f"Successfully updated {repo['facility']}:{repo['name']} allocation") + + except Exception as e: + self.logger.error(f"Failed to update {repo['facility']}:{repo['name']}: {e}") + # Continue with other repos even if one fails + continue + + self.logger.info( + f"Facility cascade update completed: {update_count}/{len(affected_repos)} " + f"repo allocations updated on {facility}@{clustername}" + ) + return True + + except Exception as e: + self.logger.error(f"Facility compute allocation cascade failed: {e}") + return False + @coactd.command(name='reporegistration') @common_options @@ -911,7 +1034,7 @@ def repo_registration(ctx, verbose, username, password_file, client_name, dry_ru """Workflow for repository maintenance. Handles NewRepo, RepoMembership, RepoRemoveUser, RepoChangeComputeRequirement, - RepoComputeAllocation, and RepoUpdateFeature request types. + RepoComputeAllocation, RepoUpdateFeature, and FacilityComputeAllocation request types. """ configure_logging_from_verbose(verbose) ctx.obj['verbose'] = verbose diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..c553324 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,15 @@ +""" +pytest configuration for the CLI test suite. + +ansible_runner imports pkg_resources at the top level, which is a +setuptools utility not present in the uv-managed test environment. We stub it +out here, before any test module imports modules.coactd, so the module loads +cleanly without requiring the full ansible/setuptools stack at test time. +""" +import sys +from unittest.mock import MagicMock + +if "pkg_resources" not in sys.modules: + sys.modules["pkg_resources"] = MagicMock() +if "ansible_runner" not in sys.modules: + sys.modules["ansible_runner"] = MagicMock() diff --git a/tests/test_facility_compute_allocation.py b/tests/test_facility_compute_allocation.py new file mode 100644 index 0000000..8765daa --- /dev/null +++ b/tests/test_facility_compute_allocation.py @@ -0,0 +1,172 @@ +""" +Behavioral tests for FacilityComputeAllocation handling in the RepoRegistration daemon. +""" +from unittest.mock import MagicMock + +from modules.coactd import RepoRegistration, RequestStatus + + +START = '2026-01-01T00:00:00Z' +END = '2031-01-01T00:00:00Z' + + +def make_handler(): + handler = RepoRegistration.__new__(RepoRegistration) + handler.logger = MagicMock() + handler.username = 'sdf-bot' + handler.password_file = '/tmp/fake' + handler.client_name = 'test-client' + handler.dry_run = False + handler.back_channel = MagicMock() + handler.ident = 'test-req-id' + return handler + + +def test_approved_request_dispatches_cascade_with_payload_fields(): + """ + An approved FacilityComputeAllocation request routes to + do_facility_compute_allocation_cascade with facility and cluster + extracted from the request dict. + """ + handler = make_handler() + handler.do_facility_compute_allocation_cascade = MagicMock(return_value=True) + + req = { + 'facilityname': 'lcls', + 'clustername': 'ada', + } + result = handler.do('req1', 'INSERT', 'FacilityComputeAllocation', RequestStatus.APPROVED, req, dry_run=False) + + assert result is True + handler.do_facility_compute_allocation_cascade.assert_called_once_with( + 'lcls', 'ada', dry_run=False + ) + + +def test_cascade_recalculates_every_repo_allocation_by_percentage(): + """ + When purchased nodes change, every repo on that cluster receives a new + absolute allocation of (percentOfFacility / 100) * purchased, preserving + each repo's percentage share of the facility. + + do_repo_compute_allocation is the single delegate for each repo; it owns + the SLURM feature-flag check, the DB upsert, the SLURM playbook call, and + the user sync. + """ + handler = make_handler() + handler.do_repo_compute_allocation = MagicMock(return_value=True) + + repos = [ + { + 'Id': 'repo-a', 'name': 'alpha', 'facility': 'lcls', + 'currentComputeAllocations': [{ + 'Id': 'alloc-a', 'clustername': 'ada', + 'percentOfFacility': 25.0, 'allocatedNodesCount': 25.0, + 'start': START, 'end': END, + }], + }, + { + 'Id': 'repo-b', 'name': 'beta', 'facility': 'lcls', + 'currentComputeAllocations': [{ + 'Id': 'alloc-b', 'clustername': 'ada', + 'percentOfFacility': 50.0, 'allocatedNodesCount': 50.0, + 'start': START, 'end': END, + }], + }, + ] + handler.back_channel.execute.side_effect = [ + {'facility': {'computepurchases': [{'clustername': 'ada', 'purchased': 200}]}}, # facility query + {'repos': repos}, + ] + + result = handler.do_facility_compute_allocation_cascade( + 'lcls', 'ada', dry_run=False + ) + + assert result is True + assert handler.do_repo_compute_allocation.call_count == 2 + + # args: (repo_name, facility, cluster, percent, allocated_resource, start, end) + by_repo = { + c.args[0]: c.args[4] + for c in handler.do_repo_compute_allocation.call_args_list + } + assert by_repo['alpha'] == 50.0 # 25% of 200 + assert by_repo['beta'] == 100.0 # 50% of 200 + + +def test_cascade_returns_false_when_no_purchase_record(): + """ + When the facility has no computepurchases entry for the requested cluster, + the cascade logs an error and returns False without touching any repos. + """ + handler = make_handler() + handler.do_repo_compute_allocation = MagicMock(return_value=True) + + handler.back_channel.execute.return_value = { + 'facility': {'computepurchases': [{'clustername': 'other-cluster', 'purchased': 100}]} + } + + result = handler.do_facility_compute_allocation_cascade('lcls', 'ada', dry_run=False) + + assert result is False + handler.do_repo_compute_allocation.assert_not_called() + handler.logger.error.assert_called_once() + + +def test_cascade_returns_false_when_purchased_nodes_is_zero(): + """ + A purchase record with zero nodes is rejected before any repo is updated. + """ + handler = make_handler() + handler.do_repo_compute_allocation = MagicMock(return_value=True) + + handler.back_channel.execute.return_value = { + 'facility': {'computepurchases': [{'clustername': 'ada', 'purchased': 0}]} + } + + result = handler.do_facility_compute_allocation_cascade('lcls', 'ada', dry_run=False) + + assert result is False + handler.do_repo_compute_allocation.assert_not_called() + handler.logger.error.assert_called_once() + + +def test_cascade_continues_after_per_repo_failure(): + """ + If do_repo_compute_allocation raises for one repo, the cascade logs the + error and continues processing the remaining repos, returning True overall. + """ + handler = make_handler() + + repos = [ + { + 'Id': 'repo-a', 'name': 'alpha', 'facility': 'lcls', + 'currentComputeAllocations': [{ + 'Id': 'alloc-a', 'clustername': 'ada', + 'percentOfFacility': 25.0, 'allocatedNodesCount': 50.0, + 'start': START, 'end': END, + }], + }, + { + 'Id': 'repo-b', 'name': 'beta', 'facility': 'lcls', + 'currentComputeAllocations': [{ + 'Id': 'alloc-b', 'clustername': 'ada', + 'percentOfFacility': 50.0, 'allocatedNodesCount': 100.0, + 'start': START, 'end': END, + }], + }, + ] + handler.back_channel.execute.side_effect = [ + {'facility': {'computepurchases': [{'clustername': 'ada', 'purchased': 200}]}}, + {'repos': repos}, + ] + handler.do_repo_compute_allocation = MagicMock( + side_effect=[Exception("slurm playbook failed"), True] + ) + + result = handler.do_facility_compute_allocation_cascade('lcls', 'ada', dry_run=False) + + assert result is True + assert handler.do_repo_compute_allocation.call_count == 2 + handler.logger.error.assert_called_once()