Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ansible-runner/project
1 change: 0 additions & 1 deletion modules/coact.py
Original file line number Diff line number Diff line change
Expand Up @@ -1076,7 +1076,6 @@ def overaged(self, data: dict, threshold: float = 100.0) -> Iterator[OveragePoin
)



# For backwards compatibility, allow running this module directly
if __name__ == '__main__':
coact(obj={})
127 changes: 125 additions & 2 deletions modules/coactd.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,8 @@ class RepoRegistration(Registration):
'RepoRemoveUser',
'RepoChangeComputeRequirement',
'RepoComputeAllocation',
'RepoUpdateFeature'
'RepoUpdateFeature',
'FacilityComputeAllocation'
]

REPO_USERS_GQL = gql("""
Expand Down Expand Up @@ -450,6 +451,24 @@ class RepoRegistration(Registration):
}
""")

REPOS_WITH_ALLOCATIONS_GQL = gql("""
query reposWithAllocations($facilityName: String!) {
repos(filter: {facility: $facilityName}) {
Id
name
facility
currentComputeAllocations {
Id
clustername
percentOfFacility
start
end
allocatedNodesCount
}
}
}
""")

FACILITY_CURRENT_COMPUTE_CGL = gql("""
query facility( $facility: String! ) {
facility(filter: {name: $facility}) {
Expand Down Expand Up @@ -515,6 +534,13 @@ def do(self, req_id, op_type, req_type, approval, req, dry_run):
elif req_type == 'RepoUpdateFeature':
return self.do_feature(repo, facility)

elif req_type == 'FacilityComputeAllocation':
clustername = req.get('clustername', None)
assert facility and clustername
return self.do_facility_compute_allocation_cascade(
facility, clustername, dry_run=dry_run
)

return None

def do_new_repo(
Expand Down Expand Up @@ -902,6 +928,103 @@ def do_compute_requirement(self, repo: str, facility: str, requirement: str, pla
def do_feature(self, repo, facility, dry_run: bool = False) -> bool:
raise NotImplementedError("do_feature not yet implemented")

def do_facility_compute_allocation_cascade(
self,
facility: str,
clustername: str,
dry_run: bool = False
) -> bool:
"""
Handle facility-level compute allocation changes by updating all affected repo allocations.
Queries the facility record directly for the current purchased node count.
"""
# Fetch current purchased nodes from the facility record
facility_resp = self.back_channel.execute(
self.FACILITY_CURRENT_COMPUTE_CGL,
{'facility': facility}
)
fac_data = facility_resp.get('facility', {})
new_purchased = None
for cp in fac_data.get('computepurchases', []):
if cp['clustername'].lower() == clustername.lower():
new_purchased = cp['purchased']
break

if new_purchased is None:
self.logger.error(f"No purchase record found for {facility}@{clustername} - cannot cascade")
return False

if new_purchased <= 0:
self.logger.error(f"Invalid purchased nodes: {new_purchased} for {facility}@{clustername} - cannot cascade")
return False

self.logger.info(
f"Processing facility compute allocation cascade: {facility}@{clustername} "
f"-> {new_purchased} purchased nodes"
)

try:
# Get all repositories with allocations on this facility/cluster
repos_resp = self.back_channel.execute(
self.REPOS_WITH_ALLOCATIONS_GQL,
{'facilityName': facility}
)

affected_repos = []
for repo in repos_resp['repos']:
for allocation in repo['currentComputeAllocations']:
if allocation['clustername'].lower() == clustername.lower():
affected_repos.append({
'repo': repo,
'allocation': allocation
})

self.logger.info(f"Found {len(affected_repos)} repo allocations to update on {facility}@{clustername}")

# Process each affected repository allocation
update_count = 0
for item in affected_repos:
repo = item['repo']
allocation = item['allocation']

# Calculate new node allocation maintaining the same percentage
percent_of_facility = allocation['percentOfFacility']
new_allocated_nodes = (percent_of_facility / 100.0) * new_purchased

self.logger.info(
f"Updating {repo['facility']}:{repo['name']} on {clustername}: "
f"{percent_of_facility}% -> {new_allocated_nodes:.2f} nodes (was {allocation['allocatedNodesCount']})"
)

try:
self.do_repo_compute_allocation(
repo['name'],
repo['facility'],
clustername,
percent_of_facility,
new_allocated_nodes,
pdl.parse(allocation['start'], timezone='UTC'),
pdl.parse(allocation['end'], timezone='UTC') if allocation['end'] else None,
dry_run=dry_run,
)
update_count += 1
self.logger.info(f"Successfully updated {repo['facility']}:{repo['name']} allocation")

except Exception as e:
self.logger.error(f"Failed to update {repo['facility']}:{repo['name']}: {e}")
# Continue with other repos even if one fails
continue

self.logger.info(
f"Facility cascade update completed: {update_count}/{len(affected_repos)} "
f"repo allocations updated on {facility}@{clustername}"
)
return True

except Exception as e:
self.logger.error(f"Facility compute allocation cascade failed: {e}")
return False


@coactd.command(name='reporegistration')
@common_options
Expand All @@ -911,7 +1034,7 @@ def repo_registration(ctx, verbose, username, password_file, client_name, dry_ru
"""Workflow for repository maintenance.

Handles NewRepo, RepoMembership, RepoRemoveUser, RepoChangeComputeRequirement,
RepoComputeAllocation, and RepoUpdateFeature request types.
RepoComputeAllocation, RepoUpdateFeature, and FacilityComputeAllocation request types.
"""
configure_logging_from_verbose(verbose)
ctx.obj['verbose'] = verbose
Expand Down
15 changes: 15 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""
pytest configuration for the CLI test suite.

ansible_runner imports pkg_resources at the top level, which is a
setuptools utility not present in the uv-managed test environment. We stub it
out here, before any test module imports modules.coactd, so the module loads
cleanly without requiring the full ansible/setuptools stack at test time.
"""
import sys
from unittest.mock import MagicMock

if "pkg_resources" not in sys.modules:
sys.modules["pkg_resources"] = MagicMock()
if "ansible_runner" not in sys.modules:
sys.modules["ansible_runner"] = MagicMock()
172 changes: 172 additions & 0 deletions tests/test_facility_compute_allocation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
"""
Behavioral tests for FacilityComputeAllocation handling in the RepoRegistration daemon.
"""
from unittest.mock import MagicMock

from modules.coactd import RepoRegistration, RequestStatus


START = '2026-01-01T00:00:00Z'
END = '2031-01-01T00:00:00Z'


def make_handler():
handler = RepoRegistration.__new__(RepoRegistration)
handler.logger = MagicMock()
handler.username = 'sdf-bot'
handler.password_file = '/tmp/fake'
handler.client_name = 'test-client'
handler.dry_run = False
handler.back_channel = MagicMock()
handler.ident = 'test-req-id'
return handler


def test_approved_request_dispatches_cascade_with_payload_fields():
"""
An approved FacilityComputeAllocation request routes to
do_facility_compute_allocation_cascade with facility and cluster
extracted from the request dict.
"""
handler = make_handler()
handler.do_facility_compute_allocation_cascade = MagicMock(return_value=True)

req = {
'facilityname': 'lcls',
'clustername': 'ada',
}
result = handler.do('req1', 'INSERT', 'FacilityComputeAllocation', RequestStatus.APPROVED, req, dry_run=False)

assert result is True
handler.do_facility_compute_allocation_cascade.assert_called_once_with(
'lcls', 'ada', dry_run=False
)


def test_cascade_recalculates_every_repo_allocation_by_percentage():
"""
When purchased nodes change, every repo on that cluster receives a new
absolute allocation of (percentOfFacility / 100) * purchased, preserving
each repo's percentage share of the facility.

do_repo_compute_allocation is the single delegate for each repo; it owns
the SLURM feature-flag check, the DB upsert, the SLURM playbook call, and
the user sync.
"""
handler = make_handler()
handler.do_repo_compute_allocation = MagicMock(return_value=True)

repos = [
{
'Id': 'repo-a', 'name': 'alpha', 'facility': 'lcls',
'currentComputeAllocations': [{
'Id': 'alloc-a', 'clustername': 'ada',
'percentOfFacility': 25.0, 'allocatedNodesCount': 25.0,
'start': START, 'end': END,
}],
},
{
'Id': 'repo-b', 'name': 'beta', 'facility': 'lcls',
'currentComputeAllocations': [{
'Id': 'alloc-b', 'clustername': 'ada',
'percentOfFacility': 50.0, 'allocatedNodesCount': 50.0,
'start': START, 'end': END,
}],
},
]
handler.back_channel.execute.side_effect = [
{'facility': {'computepurchases': [{'clustername': 'ada', 'purchased': 200}]}}, # facility query
{'repos': repos},
]

result = handler.do_facility_compute_allocation_cascade(
'lcls', 'ada', dry_run=False
)

assert result is True
assert handler.do_repo_compute_allocation.call_count == 2

# args: (repo_name, facility, cluster, percent, allocated_resource, start, end)
by_repo = {
c.args[0]: c.args[4]
for c in handler.do_repo_compute_allocation.call_args_list
}
assert by_repo['alpha'] == 50.0 # 25% of 200
assert by_repo['beta'] == 100.0 # 50% of 200


def test_cascade_returns_false_when_no_purchase_record():
"""
When the facility has no computepurchases entry for the requested cluster,
the cascade logs an error and returns False without touching any repos.
"""
handler = make_handler()
handler.do_repo_compute_allocation = MagicMock(return_value=True)

handler.back_channel.execute.return_value = {
'facility': {'computepurchases': [{'clustername': 'other-cluster', 'purchased': 100}]}
}

result = handler.do_facility_compute_allocation_cascade('lcls', 'ada', dry_run=False)

assert result is False
handler.do_repo_compute_allocation.assert_not_called()
handler.logger.error.assert_called_once()


def test_cascade_returns_false_when_purchased_nodes_is_zero():
"""
A purchase record with zero nodes is rejected before any repo is updated.
"""
handler = make_handler()
handler.do_repo_compute_allocation = MagicMock(return_value=True)

handler.back_channel.execute.return_value = {
'facility': {'computepurchases': [{'clustername': 'ada', 'purchased': 0}]}
}

result = handler.do_facility_compute_allocation_cascade('lcls', 'ada', dry_run=False)

assert result is False
handler.do_repo_compute_allocation.assert_not_called()
handler.logger.error.assert_called_once()


def test_cascade_continues_after_per_repo_failure():
"""
If do_repo_compute_allocation raises for one repo, the cascade logs the
error and continues processing the remaining repos, returning True overall.
"""
handler = make_handler()

repos = [
{
'Id': 'repo-a', 'name': 'alpha', 'facility': 'lcls',
'currentComputeAllocations': [{
'Id': 'alloc-a', 'clustername': 'ada',
'percentOfFacility': 25.0, 'allocatedNodesCount': 50.0,
'start': START, 'end': END,
}],
},
{
'Id': 'repo-b', 'name': 'beta', 'facility': 'lcls',
'currentComputeAllocations': [{
'Id': 'alloc-b', 'clustername': 'ada',
'percentOfFacility': 50.0, 'allocatedNodesCount': 100.0,
'start': START, 'end': END,
}],
},
]
handler.back_channel.execute.side_effect = [
{'facility': {'computepurchases': [{'clustername': 'ada', 'purchased': 200}]}},
{'repos': repos},
]
handler.do_repo_compute_allocation = MagicMock(
side_effect=[Exception("slurm playbook failed"), True]
)

result = handler.do_facility_compute_allocation_cascade('lcls', 'ada', dry_run=False)

assert result is True
assert handler.do_repo_compute_allocation.call_count == 2
handler.logger.error.assert_called_once()