diff --git a/framework/dev/k8s/README.md b/framework/dev/k8s/README.md index b261533bf71a..5d74216a6c7e 100644 --- a/framework/dev/k8s/README.md +++ b/framework/dev/k8s/README.md @@ -1,9 +1,33 @@ # Local k8s Launch-Path Harness This dev-only harness builds local Flower runtime images, configures a k3d -cluster, starts SuperLink and SuperExec, seeds one ServerApp run through the -Control API, and verifies that the Kubernetes executor creates a TaskExecutor -Pod that reaches `Succeeded`. +cluster, starts SuperLink and SuperExec, seeds deterministic ServerApp runs +through the Control API, and verifies that the Kubernetes executor creates +TaskExecutor Pods that reach `Succeeded`. + +It has two main modes: + +- the default one-task launch-path proof; and +- the `--capacity-cleanup-proof` mode, which uses active Pod budget `1`, seeds + two tasks, observes SuperExec waiting for capacity, and verifies completed + TaskExecutor Pod/Secret cleanup before broad namespace cleanup. + +## Prerequisites + +Run commands from the repository root. The wrapper expects these tools on +`PATH`: + +- `docker`; +- `k3d`; +- `kubectl`; +- `uv`; +- `python`. + +The Docker daemon must already be running. If `--skip-build` is used, the local +runtime images selected by the wrapper must already exist and be importable into +k3d. + +## Quick Runs Run the full local smoke test from the repository root: @@ -17,11 +41,47 @@ To reuse previously built images: ./framework/dev/k8s/test-real-launch-path.sh --skip-build ``` +To run the budget-1/two-task capacity and cleanup proof: + +```bash +output_dir=/private/tmp/f7d-v2-capacity-cleanup-proof-$(date +%Y%m%d-%H%M%S) +./framework/dev/k8s/test-real-launch-path.sh \ + --capacity-cleanup-proof \ + --output-dir "${output_dir}" +``` + +To verify the saved capacity evidence manually after the wrapper finishes: + +```bash +python framework/dev/k8s/verify_evidence.py "${output_dir}" \ + --expected-result local-k8s-capacity-cleanup-proof +``` + +`/private/tmp` is only an example local scratch location. For handoff or review, +choose a durable writable directory, or archive the completed evidence directory +after saving the verifier report. + +The wrapper prints verifier output to stdout. To make the verifier report part +of an evidence bundle for review, rerun the verifier and save the output: + +```bash +python framework/dev/k8s/verify_evidence.py "${output_dir}" \ + --expected-result local-k8s-capacity-cleanup-proof \ + > "${output_dir}/diagnostics/verifier-output.txt" +``` + The wrapper deletes the test namespace by default. To inspect resources after a run: ```bash -./framework/dev/k8s/test-real-launch-path.sh --skip-cleanup +output_dir=/private/tmp/f7d-v2-capacity-cleanup-proof-live-$(date +%Y%m%d-%H%M%S) +./framework/dev/k8s/test-real-launch-path.sh \ + --capacity-cleanup-proof \ + --skip-cleanup \ + --output-dir "${output_dir}" +python framework/dev/k8s/verify_evidence.py "${output_dir}" \ + --expected-result local-k8s-capacity-cleanup-proof \ + --no-require-cleanup ``` ## Defaults @@ -32,7 +92,12 @@ run: | Namespace | `flower-local-k8s` | | Seed Job | `flower-local-k8s-seed-run` | | Executor ConfigMap | `flower-local-k8s-executor-config` | -| Result | `local-k8s-launch-path` | +| Default result | `local-k8s-launch-path` | +| Capacity-proof result | `local-k8s-capacity-cleanup-proof` | +| Default seeded runs | `1` | +| Capacity-proof seeded runs | `2` | +| Capacity-proof active Pod budget | `1` | +| Capacity-proof probe hold | `5.0` seconds | | ServerApp marker | `K8s launch probe ServerApp ran` | ## Output @@ -61,12 +126,26 @@ Evidence is written under the selected output directory: | `taskexecutor-secrets.redacted.json` | Redacted per-task Secret evidence with key names and byte lengths. | | `final-state.json` | Pre-cleanup resource counts and object summaries for the run selectors. | | `proof-checklist.json` | Reviewer-facing map from claims to artifact fields, with out-of-scope claims. | +| `harness.log` | Short harness result log. | +| `sanitized-config.yaml` | Sanitized copy of the selected harness profile. | +| `objects/capacity-blocked-pods.json` | Capacity-proof snapshot of the first active TaskExecutor Pod. | +| `objects/executor-config.yaml` | Rendered Kubernetes executor config, including capacity settings. | +| `objects/executor-config.json` | JSON form of the rendered Kubernetes executor config. | +| `objects/secrets-before-cleanup.redacted.json` | Capacity-proof redacted Secret snapshot before executor cleanup. | +| `objects/cleanup-pods.json` | Capacity-proof TaskExecutor Pod state after capacity opens. | +| `objects/secrets-after-cleanup.redacted.json` | Capacity-proof redacted Secret snapshot after executor cleanup. | | `objects/real-launch.yaml` | Rendered SuperLink, executor config, and SuperExec objects. | | `objects/seed-job.yaml` | Rendered seed ConfigMap and Job. | | `objects/pods.json` | Observed TaskExecutor Pod list and phases. | | `diagnostics/commands.txt` | Planned or executed host commands. | +| `diagnostics/failures.txt` | Failure messages when the harness records failures. | +| `diagnostics/image-preflight.json` | Docker image inspection and k3d import plan/results. | +| `diagnostics/image-preflight.txt` | Docker image inspection and k3d import command output. | +| `diagnostics/cleanup.json` | Namespace cleanup command plan/results. | +| `diagnostics/superexec-logs.txt` | Captured SuperExec logs used for claim and capacity-wait evidence. | | `diagnostics/taskexecutor-logs.txt` | Captured TaskExecutor logs. | | `diagnostics/cleanup.txt` | Cleanup defaults and the namespace delete command. | +| `diagnostics/verifier-output.txt` | Optional saved verifier report when rerun manually with shell redirection. | ## How The Evidence Proves Correctness @@ -78,7 +157,8 @@ map in machine-readable form. Open `invocation.json` and check: - - `mode` is `local-k8s-launch-path`; + - `mode` is `local-k8s-launch-path` for the default proof or + `local-k8s-capacity-cleanup-proof` for the capacity cleanup proof; - `dry_run` is `false`; - `repo.branch` and `repo.sha` match the checkout under review; - `equivalent_argv` shows the harness mode, output directory, namespace, @@ -92,12 +172,16 @@ map in machine-readable form. contain the executor config used to render TaskExecutor Pods, including the namespace, image, resource pool, and harness-run label. -3. Confirm one deterministic ServerApp task was seeded through AppIo. +3. Confirm deterministic ServerApp tasks were seeded through AppIo. Open `objects/seed-job.yaml` and check that the Job runs `/opt/flower-local-k8s/seed_run.py` against the SuperLink Control API. - Then check `summary.json` and `task-lineage.json`: `seed_run_id` and - `seeded_run_id` should be present and should match. + Then check `summary.json` and `task-lineage.json`. + + For the default proof, `seed_run_id` and `seeded_run_id` should be present + and should match. For the capacity cleanup proof, `summary.json` should list + two `seed_run_ids`, `task-lineage.json` should list the same + `seeded_run_ids`, and `seeded_task_count` should be `2`. 4. Confirm the Kubernetes executor created the TaskExecutor Pod. @@ -133,8 +217,8 @@ map in machine-readable form. Open `final-state.json`. It records the Pod, Secret, Job, Service, and Namespace observation commands plus resource counts before namespace deletion. This proves what remained at the end of the proof stage. It does - not claim executor-owned completed Pod or Secret cleanup; that is deliberately - out of scope for this slice. + not claim completed Pod or Secret cleanup for the default one-task proof; + that cleanup assertion belongs to the capacity cleanup proof. 8. Confirm the verifier accepted the bundle. @@ -144,6 +228,31 @@ map in machine-readable form. counts, a `Succeeded` phase, and a successful cleanup command when cleanup was required. +For `--capacity-cleanup-proof`, additionally confirm: + +1. `objects/executor-config.yaml` sets `active-pod-budget: 1`. +2. `summary.json` lists two `seed_run_ids`, and `task-lineage.json` records at + least two observed TaskExecutor task records. +3. `events.jsonl` has a passing `capacity.wait_observed` event. Also open + `diagnostics/superexec-logs.txt`; it should include the specific SuperExec + wait marker + `waiting for kubernetes taskexecutor capacity`; `active pods` and `budget` + are useful context, but they are not sufficient evidence on their own. +4. `summary.json` has `cleanup_observed.observed: true`, removed Pod and Secret + names for the completed task, and at least one remaining/new TaskExecutor Pod + after cleanup. Removed and remaining Pod names should be disjoint. +5. `objects/cleanup-pods.json` and + `objects/secrets-after-cleanup.redacted.json` show the post-wait selector + state before broad namespace cleanup. +6. The capacity verifier report should identify the result as + `local-k8s-capacity-cleanup-proof`, show `Task lineage records: 2`, show + `Capacity wait observed: True`, include non-empty removed Pod/Secret lines, + and end with `Verification: PASSED`. + + In this mode, `TaskExecutor Pods: 1` in the verifier report is expected + after cleanup: it is the remaining/new TaskExecutor Pod. The two-task + evidence comes from `Task lineage records: 2` and `task-lineage.json`. + ## What Is Tested | Area | Tested | Notes | @@ -156,20 +265,19 @@ map in machine-readable form. | TaskExecutor Pod creation | Yes | Polls for a Pod matching the run selector before failing. | | TaskExecutor terminal phase | Yes | Waits for observed TaskExecutor Pods to reach `Succeeded`. | | ServerApp execution marker | Yes | Verifies `K8s launch probe ServerApp ran` in TaskExecutor logs. | +| Capacity wait | Optional | `--capacity-cleanup-proof` seeds two runs with active Pod budget `1` and requires SuperExec wait evidence. | +| Sweeper cleanup | Optional | `--capacity-cleanup-proof` requires the first completed TaskExecutor Pod and Secret to be removed before namespace cleanup. | | Wrapper cleanup | Yes | Default wrapper behavior deletes the namespace and verifies cleanup evidence. | ## Out Of Scope | Area | Tested | Notes | | --- | --- | --- | -| Capacity waiting | No | No capacity queue or resource-pool wait behavior is asserted. | -| Sweeper cleanup | No | No reconciler or orphan cleanup loop is validated. | -| Executor-owned Pod deletion | No | Namespace cleanup removes resources; executor deletion behavior is not proven. | -| Executor-owned Secret deletion | No | Secret RBAC is checked, but per-task Secret lifecycle is not asserted. | +| Cardinality proof | No | The capacity proof uses budget `1` and two tasks; budget `2`/three-task cardinality is a later slice. | | AppIo result completion semantics | No | This slice observes launch and Pod success, not full result semantics. | | ClientApp execution | No | The probe includes a minimal ClientApp file only because the FAB schema expects it. | | TLS, CNI/NetworkPolicy, production RBAC | No | This is local/dev-only and uses insecure local AppIo. | -| Concurrency, retry, failure behavior | No | The harness starts one deterministic run. | +| Concurrency, retry, failure behavior | No | The default proof starts one deterministic run; the capacity proof starts two deterministic runs only to exercise budget waiting and cleanup. | ## Useful Commands @@ -177,13 +285,45 @@ Inspect resources after `--skip-cleanup`: ```bash kubectl --context k3d-flower-local-k8s get pods -n flower-local-k8s +kubectl --context k3d-flower-local-k8s get jobs,secrets -n flower-local-k8s kubectl --context k3d-flower-local-k8s logs pod/flower-superlink -n flower-local-k8s kubectl --context k3d-flower-local-k8s logs pod/flower-superexec -n flower-local-k8s ``` +Verify an existing default launch-path bundle: + +```bash +python framework/dev/k8s/verify_evidence.py "${output_dir}" +``` + +Verify an existing capacity cleanup bundle: + +```bash +python framework/dev/k8s/verify_evidence.py "${output_dir}" \ + --expected-result local-k8s-capacity-cleanup-proof +``` + +Verify a bundle from a run that used `--skip-cleanup`: + +```bash +python framework/dev/k8s/verify_evidence.py "${output_dir}" \ + --expected-result local-k8s-capacity-cleanup-proof \ + --no-require-cleanup +``` + Remove the namespace manually: ```bash kubectl --context k3d-flower-local-k8s delete namespace flower-local-k8s \ --ignore-not-found=true --wait=true ``` + +If Docker was restarted and an existing local k3d cluster appears stale, recreate +only the local harness cluster and rerun: + +```bash +k3d cluster delete flower-local-k8s +./framework/dev/k8s/test-real-launch-path.sh \ + --capacity-cleanup-proof \ + --output-dir "${output_dir}" +``` diff --git a/framework/dev/k8s/assets/probe_app/launch_probe/server_app.py b/framework/dev/k8s/assets/probe_app/launch_probe/server_app.py index 12542a48b6f8..aa7ad2509ef5 100644 --- a/framework/dev/k8s/assets/probe_app/launch_probe/server_app.py +++ b/framework/dev/k8s/assets/probe_app/launch_probe/server_app.py @@ -12,11 +12,20 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== +"""Probe ServerApp used by the local k8s launch-path harness.""" + +import time + import flwr as fl app = fl.serverapp.ServerApp() +_PROBE_HOLD_SECONDS_CONFIG_KEY = "local-k8s.probe-hold-seconds" @app.main() def main(grid, context): + """Run the probe ServerApp and optionally stay active for capacity tests.""" print("K8s launch probe ServerApp ran") + hold_seconds = context.run_config.get(_PROBE_HOLD_SECONDS_CONFIG_KEY, 0.0) + if isinstance(hold_seconds, (float, int)) and hold_seconds > 0: + time.sleep(float(hold_seconds)) diff --git a/framework/dev/k8s/assets/probe_app/pyproject.toml b/framework/dev/k8s/assets/probe_app/pyproject.toml index 9164c65d6c48..5f868e09ac2b 100644 --- a/framework/dev/k8s/assets/probe_app/pyproject.toml +++ b/framework/dev/k8s/assets/probe_app/pyproject.toml @@ -17,3 +17,4 @@ serverapp = "launch_probe.server_app:app" clientapp = "launch_probe.client_app:app" [tool.flwr.app.config] +local-k8s.probe-hold-seconds = 0.0 diff --git a/framework/dev/k8s/assets/seed_run.py b/framework/dev/k8s/assets/seed_run.py index 6e9d4f39d521..9f7306ead577 100644 --- a/framework/dev/k8s/assets/seed_run.py +++ b/framework/dev/k8s/assets/seed_run.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # ============================================================================== +"""Seed deterministic ServerApp runs for the local k8s harness.""" + import argparse import hashlib from pathlib import Path @@ -19,34 +21,51 @@ import grpc from flwr.cli.build import build_fab_from_disk -from flwr.common.serde import fab_to_proto +from flwr.common.serde import fab_to_proto, scalar_to_proto from flwr.proto.control_pb2 import StartRunRequest from flwr.proto.control_pb2_grpc import ControlStub from flwr.supercore.constant import NOOP_FEDERATION from flwr.supercore.fab import Fab _PROBE_APP_DIR = Path("/opt/flower-local-k8s/probe_app") +_PROBE_HOLD_SECONDS_CONFIG_KEY = "local-k8s.probe-hold-seconds" def main() -> None: + """Create one or more deterministic ServerApp runs through Control API.""" parser = argparse.ArgumentParser() parser.add_argument("--control-api-address", required=True) + parser.add_argument("--run-count", type=int, default=1) + parser.add_argument("--probe-hold-seconds", type=float, default=0.0) args = parser.parse_args() + if args.run_count < 1: + raise ValueError("--run-count must be at least 1") fab_bytes = build_fab_from_disk(_PROBE_APP_DIR) fab_hash = hashlib.sha256(fab_bytes).hexdigest() channel = grpc.insecure_channel(args.control_api_address) grpc.channel_ready_future(channel).result(timeout=60) stub = ControlStub(channel) - response = stub.StartRun( - StartRunRequest( - fab=fab_to_proto(Fab(fab_hash, fab_bytes, {})), - federation=NOOP_FEDERATION, + override_config = {} + if args.probe_hold_seconds > 0: + override_config[_PROBE_HOLD_SECONDS_CONFIG_KEY] = scalar_to_proto( + args.probe_hold_seconds + ) + run_ids = [] + for _ in range(args.run_count): + response = stub.StartRun( + StartRunRequest( + fab=fab_to_proto(Fab(fab_hash, fab_bytes, {})), + override_config=override_config, + federation=NOOP_FEDERATION, + ) ) - ) - if not response.HasField("run_id"): - raise RuntimeError("Control API did not return a run_id") - print(f"K8s launch seed created run_id={response.run_id}") + if not response.HasField("run_id"): + raise RuntimeError("Control API did not return a run_id") + run_ids.append(response.run_id) + print(f"K8s launch seed created run_id={response.run_id}") + joined_run_ids = ",".join(str(run_id) for run_id in run_ids) + print(f"K8s launch seed created run_ids={joined_run_ids}") if __name__ == "__main__": diff --git a/framework/dev/k8s/common.py b/framework/dev/k8s/common.py index 08d6d789b2c6..dc375a5a29eb 100644 --- a/framework/dev/k8s/common.py +++ b/framework/dev/k8s/common.py @@ -22,7 +22,7 @@ import shlex import subprocess from collections.abc import Mapping, Sequence -from dataclasses import asdict, dataclass, field +from dataclasses import asdict, dataclass, field, replace from datetime import UTC, datetime from pathlib import Path from typing import cast @@ -85,6 +85,8 @@ "--root-certificates", "--root-certificates-bytes", } + + @dataclass class HarnessEvent: """One JSONL event emitted by the local k8s harness.""" @@ -158,6 +160,11 @@ class HarnessProfile: # pylint: disable=too-many-instance-attributes executor_config_name: str = "flower-local-k8s-executor-config" appio_api_port: int = 9091 control_api_port: int = 9093 + active_pod_budget: int | None = None + capacity_poll_interval: float | None = None + capacity_log_interval: float | None = None + seed_run_count: int = 1 + probe_hold_seconds: float = 0.0 def to_mapping(self) -> dict[str, object]: """Return the profile as a JSON/YAML-ready mapping.""" @@ -171,6 +178,12 @@ def to_mapping(self) -> dict[str, object]: } if self.executor_config_path is not None: executor_config["path"] = self.executor_config_path + if self.active_pod_budget is not None: + executor_config["active-pod-budget"] = self.active_pod_budget + if self.capacity_poll_interval is not None: + executor_config["capacity-poll-interval"] = self.capacity_poll_interval + if self.capacity_log_interval is not None: + executor_config["capacity-log-interval"] = self.capacity_log_interval return { "schema-version": SCHEMA_VERSION, "name": self.name, @@ -197,6 +210,8 @@ def to_mapping(self) -> dict[str, object]: "superlink-name": self.superlink_name, "superexec-name": self.superexec_name, "seed-job-name": self.seed_job_name, + "seed-run-count": self.seed_run_count, + "probe-hold-seconds": self.probe_hold_seconds, "appio-api-port": self.appio_api_port, "control-api-port": self.control_api_port, }, @@ -317,6 +332,28 @@ def generic_k3d_profile() -> HarnessProfile: ) +def capacity_cleanup_profile(profile: HarnessProfile) -> HarnessProfile: + """Return profile defaults for the local capacity and cleanup proof.""" + return replace( + profile, + active_pod_budget=( + profile.active_pod_budget if profile.active_pod_budget is not None else 1 + ), + capacity_poll_interval=( + profile.capacity_poll_interval + if profile.capacity_poll_interval is not None + else 1.0 + ), + capacity_log_interval=( + profile.capacity_log_interval + if profile.capacity_log_interval is not None + else 1.0 + ), + seed_run_count=max(profile.seed_run_count, 2), + probe_hold_seconds=max(profile.probe_hold_seconds, 5.0), + ) + + def build_tls_material_contract(profile: HarnessProfile) -> dict[str, object]: """Return sanitized TLS material evidence without storing PEM contents.""" path = ( @@ -583,6 +620,28 @@ def _format_taskexecutor_logs(results: Sequence[CommandResult]) -> str: return "\n\n".join(sections) + "\n" +def _format_superexec_logs(results: Sequence[CommandResult]) -> str: + if not results: + return "No SuperExec logs were captured.\n" + sections: list[str] = [] + for result in results: + command = shlex.join(result.args) + sections.append( + "\n".join( + [ + f"$ {command}", + f"returncode={result.returncode}", + f"dry_run={str(result.dry_run).lower()}", + "stdout:", + result.stdout.rstrip() or "", + "stderr:", + result.stderr.rstrip() or "", + ] + ) + ) + return "\n\n".join(sections) + "\n" + + def _unique_values(values: Sequence[str]) -> list[str]: unique: list[str] = [] for value in values: @@ -597,8 +656,7 @@ def _run_rbac_checks( command_results: list[CommandResult], ) -> dict[str, object]: subject = ( - f"system:serviceaccount:{profile.namespace}:" - f"{profile.superexec_service_account}" + f"system:serviceaccount:{profile.namespace}:{profile.superexec_service_account}" ) checks: list[dict[str, object]] = [] failures: list[str] = [] @@ -797,9 +855,7 @@ def _status_from_command(result: CommandResult, *, planned_status: str) -> str: return "failed" -def _combined_status( - results: Sequence[CommandResult], *, planned_status: str -) -> str: +def _combined_status(results: Sequence[CommandResult], *, planned_status: str) -> str: if any(result.returncode != 0 and not result.dry_run for result in results): return "failed" if any(result.dry_run for result in results): diff --git a/framework/dev/k8s/harness.py b/framework/dev/k8s/harness.py index ca6af0023913..ea74702e4e30 100644 --- a/framework/dev/k8s/harness.py +++ b/framework/dev/k8s/harness.py @@ -27,8 +27,8 @@ if str(_THIS_DIR) not in sys.path: sys.path.insert(0, str(_THIS_DIR)) -import real_launch -from common import ( +import real_launch # noqa: E402,F401 +from common import ( # noqa: E402,F401 CommandResult, EvidenceBundleWriter, HarnessEvent, @@ -49,14 +49,14 @@ redact_command_args, redact_sensitive_data, ) -from manifests import ( +from manifests import ( # noqa: E402,F401 render_appio_seed_manifests, render_kubernetes_executor_config, render_namespace_manifest, render_real_launch_manifests, render_superexec_rbac_manifests, ) -from real_launch import run_local_k8s_launch_path +from real_launch import run_local_k8s_launch_path # noqa: E402 def run_contract_scaffold( @@ -94,7 +94,10 @@ def run_contract_scaffold( HarnessEvent( event="policy.not_validated_locally", status="not_validated", - message="The local k8s contract scaffold does not validate CNI, RBAC, or Kubernetes runtime policy.", + message=( + "The local k8s contract scaffold does not validate CNI, RBAC, " + "or Kubernetes runtime policy." + ), details={ "expected_cni": profile.expected_cni, "reason": "contract scaffold only", @@ -390,18 +393,24 @@ def _parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace: "Write local k8s harness evidence. The default mode writes " "the contract scaffold; infra-proof mode writes " "infra/TLS/RBAC evidence; local-k8s-launch-path mode writes " - "SuperLink/SuperExec/TaskExecutor evidence. Host commands only run " - "with --execute." + "SuperLink/SuperExec/TaskExecutor evidence; capacity-cleanup-proof " + "mode writes the budget-1/two-task capacity and cleanup proof. " + "Host commands only run with --execute." ) ) default_profile = generic_k3d_profile() parser.add_argument( "--mode", - choices=("contract-scaffold", "infra-proof", "local-k8s-launch-path"), + choices=( + "contract-scaffold", + "infra-proof", + "local-k8s-launch-path", + "capacity-cleanup-proof", + ), default="contract-scaffold", help=( "Write the contract scaffold, infra/TLS/RBAC proof bundle, " - "or local k8s launch-path bundle." + "local k8s launch-path bundle, or capacity/cleanup proof bundle." ), ) parser.add_argument( @@ -504,6 +513,36 @@ def _parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace: default=default_profile.seed_job_name, help="Control API seed Job name rendered for local-k8s-launch-path mode.", ) + parser.add_argument( + "--seed-run-count", + type=int, + default=default_profile.seed_run_count, + help="Number of deterministic ServerApp runs created by the seed Job.", + ) + parser.add_argument( + "--probe-hold-seconds", + type=float, + default=default_profile.probe_hold_seconds, + help="Seconds each probe ServerApp should stay active before returning.", + ) + parser.add_argument( + "--active-pod-budget", + type=int, + default=None, + help="Optional Kubernetes executor active Pod budget.", + ) + parser.add_argument( + "--capacity-poll-interval", + type=float, + default=None, + help="Optional Kubernetes executor capacity poll interval.", + ) + parser.add_argument( + "--capacity-log-interval", + type=float, + default=None, + help="Optional Kubernetes executor capacity log interval.", + ) parser.add_argument( "--rbac-name", default=default_profile.rbac_name, @@ -513,8 +552,9 @@ def _parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace: "--execute", action="store_true", help=( - "Run host k3d/kubectl commands in infra-proof or local-k8s-launch-path " - "mode. Without this, commands are only recorded as dry-run evidence." + "Run host k3d/kubectl commands in infra-proof, local-k8s-launch-path, " + "or capacity-cleanup-proof mode. Without this, commands are only " + "recorded as dry-run evidence." ), ) parser.add_argument( @@ -575,6 +615,11 @@ def _profile_from_args(args: argparse.Namespace) -> HarnessProfile: superlink_name=args.superlink_name, superexec_name=args.superexec_name, seed_job_name=args.seed_job_name, + seed_run_count=args.seed_run_count, + probe_hold_seconds=args.probe_hold_seconds, + active_pod_budget=args.active_pod_budget, + capacity_poll_interval=args.capacity_poll_interval, + capacity_log_interval=args.capacity_log_interval, rbac_name=args.rbac_name, labels={ "flower.ai/harness": "local-k8s-launch-path", @@ -586,7 +631,7 @@ def _profile_from_args(args: argparse.Namespace) -> HarnessProfile: def main(argv: Sequence[str] | None = None) -> int: """Write the requested local k8s evidence bundle.""" args = _parse_args(argv) - if args.mode == "local-k8s-launch-path": + if args.mode in {"local-k8s-launch-path", "capacity-cleanup-proof"}: summary = run_local_k8s_launch_path( args.output_dir, profile=_profile_from_args(args), @@ -595,6 +640,7 @@ def main(argv: Sequence[str] | None = None) -> int: apply_manifests=args.apply_manifests, import_images=args.import_images, cleanup=args.cleanup, + capacity_cleanup_proof=args.mode == "capacity-cleanup-proof", ) elif args.mode == "infra-proof": summary = run_infra_proof( diff --git a/framework/dev/k8s/manifests.py b/framework/dev/k8s/manifests.py index 25eedf72ef27..6c6b9fc67652 100644 --- a/framework/dev/k8s/manifests.py +++ b/framework/dev/k8s/manifests.py @@ -77,13 +77,20 @@ def render_kubernetes_executor_config( """Render the trusted root-mapping config consumed by SuperExec.""" labels = dict(profile.labels) labels["flower.ai/harness-run"] = run_id - return { + config: dict[str, object] = { "namespace": profile.namespace, "image": profile.image, "image-pull-policy": profile.image_pull_policy, "resource-pool": profile.resource_pool, "labels": labels, } + if profile.active_pod_budget is not None: + config["active-pod-budget"] = profile.active_pod_budget + if profile.capacity_poll_interval is not None: + config["capacity-poll-interval"] = profile.capacity_poll_interval + if profile.capacity_log_interval is not None: + config["capacity-log-interval"] = profile.capacity_log_interval + return config def render_real_launch_manifests( @@ -140,6 +147,8 @@ def render_appio_seed_manifests( "runtime_image_pull_policy": profile.runtime_image_pull_policy, "control_address": f"{profile.superlink_name}:{profile.control_api_port}", "local_k8s_root": _LOCAL_K8S_ROOT, + "seed_run_count": str(profile.seed_run_count), + "probe_hold_seconds": str(profile.probe_hold_seconds), }, ) seed_config, seed_job = manifests diff --git a/framework/dev/k8s/manifests/seed-job.yaml b/framework/dev/k8s/manifests/seed-job.yaml index a567b56a8010..9673f81d1520 100644 --- a/framework/dev/k8s/manifests/seed-job.yaml +++ b/framework/dev/k8s/manifests/seed-job.yaml @@ -27,6 +27,10 @@ spec: - ${local_k8s_root}/seed_run.py - --control-api-address - ${control_address} + - --run-count + - "${seed_run_count}" + - --probe-hold-seconds + - "${probe_hold_seconds}" volumeMounts: - name: seed-assets mountPath: ${local_k8s_root} diff --git a/framework/dev/k8s/observations.py b/framework/dev/k8s/observations.py index ce2acaae0f50..626ce9575757 100644 --- a/framework/dev/k8s/observations.py +++ b/framework/dev/k8s/observations.py @@ -103,9 +103,10 @@ def _taskexecutor_phase_status( def _seed_observation(result: CommandResult) -> dict[str, object]: - match = re.search(r"\brun_id=(\d+)\b", result.stdout) + run_ids = [int(run_id) for run_id in re.findall(r"\brun_id=(\d+)\b", result.stdout)] return { - "run_id": int(match.group(1)) if match is not None else None, + "run_id": run_ids[0] if run_ids else None, + "run_ids": run_ids, "dry_run": result.dry_run, } @@ -120,6 +121,21 @@ def _superexec_claim_observation(result: CommandResult) -> dict[str, object]: return {"observed": bool(markers), "markers": markers} +def _superexec_capacity_wait_observation(result: CommandResult) -> dict[str, object]: + combined = f"{result.stdout}\n{result.stderr}".lower() + wait_marker = "waiting for kubernetes taskexecutor capacity" + markers = [ + marker + for marker in ( + wait_marker, + "active pods", + "budget", + ) + if marker in combined + ] + return {"observed": wait_marker in markers, "markers": markers} + + def _pod_observation(result: CommandResult) -> dict[str, object]: if result.dry_run or not result.stdout.strip(): return {"items": [], "phases": []} @@ -152,6 +168,31 @@ def _pod_observation(result: CommandResult) -> dict[str, object]: return {"items": items, "phases": phases} +def _secret_observation(result: CommandResult) -> dict[str, object]: + if result.dry_run or not result.stdout.strip(): + return {"items": []} + try: + raw = json.loads(result.stdout) + except json.JSONDecodeError as err: + return {"items": [], "error": f"invalid Secret JSON: {err}"} + items: list[dict[str, object]] = [] + for secret in raw.get("items", []): + if not isinstance(secret, Mapping): + continue + metadata = secret.get("metadata", {}) + if not isinstance(metadata, Mapping): + continue + items.append( + { + "name": metadata.get("name"), + "namespace": metadata.get("namespace"), + "labels": metadata.get("labels", {}), + "type": secret.get("type"), + } + ) + return {"items": items} + + def _pod_names(observation: Mapping[str, object]) -> list[str]: names: list[str] = [] raw_items = observation.get("items", []) @@ -166,6 +207,20 @@ def _pod_names(observation: Mapping[str, object]) -> list[str]: return names +def _secret_names(observation: Mapping[str, object]) -> list[str]: + names: list[str] = [] + raw_items = observation.get("items", []) + if not isinstance(raw_items, Sequence): + return names + for item in raw_items: + if not isinstance(item, Mapping): + continue + name = item.get("name") + if isinstance(name, str) and name: + names.append(name) + return names + + def _pod_phases(observation: Mapping[str, object]) -> list[str]: raw_phases = observation.get("phases", []) if not isinstance(raw_phases, Sequence): diff --git a/framework/dev/k8s/real_launch.py b/framework/dev/k8s/real_launch.py index fc75134006c2..cf8434a4efc2 100644 --- a/framework/dev/k8s/real_launch.py +++ b/framework/dev/k8s/real_launch.py @@ -39,6 +39,7 @@ _combined_status, _format_cleanup_plan, _format_image_preflight, + _format_superexec_logs, _format_taskexecutor_logs, _kubectl_args, _kubectl_context, @@ -50,6 +51,7 @@ build_cleanup_plan, build_image_preflight, build_tls_material_contract, + capacity_cleanup_profile, generic_k3d_profile, redact_command_args, ) @@ -67,6 +69,9 @@ _pod_observation, _pod_phases, _seed_observation, + _secret_names, + _secret_observation, + _superexec_capacity_wait_observation, _superexec_claim_observation, _taskexecutor_phase_status, _taskexecutor_pods_args, @@ -89,9 +94,12 @@ def run_local_k8s_launch_path( apply_manifests: bool = False, import_images: bool = False, cleanup: bool = False, + capacity_cleanup_proof: bool = False, ) -> HarnessSummary: """Write local k8s AppIo/SuperExec/TaskExecutor launch-path evidence.""" profile = profile or generic_k3d_profile() + if capacity_cleanup_proof: + profile = capacity_cleanup_profile(profile) runner = runner or HostCommandRunner(dry_run=not execute) writer = EvidenceBundleWriter(output_dir) writer.initialize() @@ -99,6 +107,11 @@ def run_local_k8s_launch_path( started_at = _utc_now() command_results: list[CommandResult] = [] failures: list[str] = [] + mode_name = ( + "local-k8s-capacity-cleanup-proof" + if capacity_cleanup_proof + else "local-k8s-launch-path" + ) namespace_manifest = render_namespace_manifest(profile) rbac_manifest_list = { @@ -124,6 +137,7 @@ def run_local_k8s_launch_path( apply_manifests=apply_manifests, import_images=import_images, cleanup=cleanup, + capacity_cleanup_proof=capacity_cleanup_proof, ), ) writer.write_yaml("sanitized-config.yaml", profile.to_mapping()) @@ -158,7 +172,7 @@ def write_event(event: str, status: str, message: str, details: object) -> None: message=message, details={ "run_id": run_id, - "mode": "local-k8s-launch-path", + "mode": mode_name, "dry_run": not execute, "data": details, }, @@ -433,15 +447,21 @@ def run_command( record_failure=False, ) seed_observation = _seed_observation(seed_logs_result) - if execute and seed_observation["run_id"] is None: - failures.append("AppIo seed Job did not report a run_id.") + seed_run_ids = cast(list[int], seed_observation["run_ids"]) + if execute and len(seed_run_ids) < profile.seed_run_count: + failures.append( + "AppIo seed Job reported " + f"{len(seed_run_ids)} run IDs, expected {profile.seed_run_count}." + ) write_event( "appio.seeded", _appio_seed_status(seed_apply_result, seed_wait_result, seed_observation), - "Control API seed Job recorded one deterministic ServerApp run.", + "Control API seed Job recorded deterministic ServerApp runs.", { "job": profile.seed_job_name, "run_id": seed_observation["run_id"], + "run_ids": seed_run_ids, + "expected_run_count": profile.seed_run_count, "manifest": "objects/seed-job.yaml", "delete_previous": ( _command_record(seed_prune_result) if seed_prune_result.args else None @@ -484,6 +504,27 @@ def run_command( taskexecutor_pod_attempts: list[CommandResult] = [] taskexecutor_pods_result = CommandResult(args=[], returncode=0, dry_run=not execute) taskexecutor_observation: dict[str, object] = {"items": [], "phases": []} + taskexecutor_wait_results: list[CommandResult] = [] + capacity_wait_results: list[CommandResult] = [] + before_cleanup_secret_evidence: dict[str, object] = { + "schema_version": SCHEMA_VERSION, + "redacted": True, + "selector": taskexecutor_selector, + "items": [], + } + after_cleanup_secret_evidence: dict[str, object] = { + "schema_version": SCHEMA_VERSION, + "redacted": True, + "selector": taskexecutor_selector, + "items": [], + } + cleanup_observation: dict[str, object] = { + "observed": False, + "removed_pods": [], + "removed_secrets": [], + "remaining_pods": [], + "remaining_secrets": [], + } taskexecutor_deadline = time.monotonic() + profile.timeout_seconds while True: taskexecutor_pods_result = run_command( @@ -516,7 +557,182 @@ def run_command( max(0.0, taskexecutor_deadline - time.monotonic()), ) ) - taskexecutor_wait_results: list[CommandResult] = [] + first_taskexecutor_observation = taskexecutor_observation + first_taskexecutor_pod_names = _pod_names(first_taskexecutor_observation) + blocked_pod_snapshot = _json_list_snapshot(taskexecutor_pods_result) + if capacity_cleanup_proof: + before_cleanup_secrets_result = run_command( + _taskexecutor_secrets_args(profile, taskexecutor_selector), + "TaskExecutor credential Secret before cleanup observation", + record_failure=False, + ) + before_cleanup_secret_snapshot = _json_list_snapshot( + before_cleanup_secrets_result + ) + before_cleanup_secret_observation = _secret_observation( + before_cleanup_secrets_result + ) + _redact_secret_observation_stdout(before_cleanup_secrets_result) + before_cleanup_secret_evidence = _taskexecutor_secret_evidence( + before_cleanup_secret_snapshot, + selector=taskexecutor_selector, + command=_command_record(before_cleanup_secrets_result), + ) + + capacity_wait_observation: dict[str, object] = { + "observed": False, + "markers": [], + } + capacity_wait_result = CommandResult(args=[], returncode=0, dry_run=not execute) + capacity_deadline = time.monotonic() + profile.timeout_seconds + while True: + capacity_wait_result = run_command( + _kubectl_args( + profile, + [ + "logs", + f"pod/{profile.superexec_name}", + "-n", + profile.namespace, + "--tail=400", + ], + ), + "SuperExec capacity wait observation", + record_failure=False, + ) + capacity_wait_results.append(capacity_wait_result) + capacity_wait_observation = _superexec_capacity_wait_observation( + capacity_wait_result + ) + if ( + capacity_wait_observation["observed"] + or not execute + or capacity_wait_result.dry_run + ): + break + if time.monotonic() >= capacity_deadline: + failures.append("SuperExec capacity wait was not observed.") + break + time.sleep( + min( + _TASKEXECUTOR_POD_POLL_INTERVAL_SECONDS, + max(0.0, capacity_deadline - time.monotonic()), + ) + ) + write_event( + "capacity.wait_observed", + _observation_status( + capacity_wait_result, capacity_wait_observation["observed"] + ), + "SuperExec logs inspected for Kubernetes capacity waiting.", + { + "active_pod_budget": profile.active_pod_budget, + "seed_run_ids": seed_run_ids, + "first_pods": first_taskexecutor_observation["items"], + "markers": capacity_wait_observation["markers"], + "commands": [ + _command_record(result) for result in capacity_wait_results + ], + }, + ) + + observed_pod_names = set(first_taskexecutor_pod_names) + second_pod_attempts: list[CommandResult] = [] + second_deadline = time.monotonic() + profile.timeout_seconds + while True: + taskexecutor_pods_result = run_command( + _taskexecutor_pods_args(profile, taskexecutor_selector), + "Second TaskExecutor Pod observation", + record_failure=False, + ) + taskexecutor_pod_attempts.append(taskexecutor_pods_result) + second_pod_attempts.append(taskexecutor_pods_result) + taskexecutor_observation = _pod_observation(taskexecutor_pods_result) + current_pod_names = _pod_names(taskexecutor_observation) + observed_pod_names.update(current_pod_names) + new_pod_names = [ + name + for name in current_pod_names + if name not in first_taskexecutor_pod_names + ] + if ( + new_pod_names + or len(observed_pod_names) >= profile.seed_run_count + or not execute + or taskexecutor_pods_result.dry_run + ): + break + if time.monotonic() >= second_deadline: + failures.append( + "Second TaskExecutor Pod was not observed after capacity opened." + ) + break + time.sleep( + min( + _TASKEXECUTOR_POD_POLL_INTERVAL_SECONDS, + max(0.0, second_deadline - time.monotonic()), + ) + ) + + after_cleanup_secrets_result = run_command( + _taskexecutor_secrets_args(profile, taskexecutor_selector), + "TaskExecutor credential Secret after cleanup observation", + record_failure=False, + ) + after_cleanup_secret_snapshot = _json_list_snapshot( + after_cleanup_secrets_result + ) + after_cleanup_secret_observation = _secret_observation( + after_cleanup_secrets_result + ) + _redact_secret_observation_stdout(after_cleanup_secrets_result) + after_cleanup_secret_evidence = _taskexecutor_secret_evidence( + after_cleanup_secret_snapshot, + selector=taskexecutor_selector, + command=_command_record(after_cleanup_secrets_result), + ) + cleanup_observation = _cleanup_observation( + first_pod_names=first_taskexecutor_pod_names, + before_cleanup_secrets=before_cleanup_secret_observation, + after_cleanup_pods=taskexecutor_observation, + after_cleanup_secrets=after_cleanup_secret_observation, + ) + if execute and not cleanup_observation["observed"]: + failures.append( + "Completed TaskExecutor Pod and credential Secret cleanup was " + "not observed before namespace cleanup." + ) + writer.write_json("objects/capacity-blocked-pods.json", blocked_pod_snapshot) + writer.write_json( + "objects/secrets-before-cleanup.redacted.json", + before_cleanup_secret_evidence, + ) + writer.write_json("objects/cleanup-pods.json", taskexecutor_observation) + writer.write_json( + "objects/secrets-after-cleanup.redacted.json", + after_cleanup_secret_evidence, + ) + write_event( + "cleanup.observed", + ( + "planned" + if not execute or taskexecutor_pods_result.dry_run + else ("passed" if cleanup_observation["observed"] else "failed") + ), + "Completed TaskExecutor Pod and Secret cleanup inspected.", + { + "selector": taskexecutor_selector, + "observation": cleanup_observation, + "first_pods": first_taskexecutor_observation["items"], + "pods_after_cleanup": taskexecutor_observation["items"], + "secrets_before_cleanup": before_cleanup_secret_observation["items"], + "secrets_after_cleanup": after_cleanup_secret_observation["items"], + "second_pod_attempts": [ + _command_record(result) for result in second_pod_attempts + ], + }, + ) + taskexecutor_pod_names = _pod_names(taskexecutor_observation) if execute and taskexecutor_pod_names: for pod_name in taskexecutor_pod_names: @@ -566,13 +782,23 @@ def run_command( ) for pod_name in taskexecutor_pod_names ] + superexec_log_results = [superexec_logs_result, *capacity_wait_results] taskexecutor_pod_snapshot = _json_list_snapshot(taskexecutor_pods_result) + lineage_pod_snapshot = ( + _merge_object_list_snapshots(blocked_pod_snapshot, taskexecutor_pod_snapshot) + if capacity_cleanup_proof + else taskexecutor_pod_snapshot + ) writer.write_json("objects/pods.json", taskexecutor_observation) - writer.write_json("taskexecutor-pods.json", taskexecutor_pod_snapshot) + writer.write_json("taskexecutor-pods.json", lineage_pod_snapshot) writer.write_text( "diagnostics/taskexecutor-logs.txt", _format_taskexecutor_logs(taskexecutor_log_results), ) + writer.write_text( + "diagnostics/superexec-logs.txt", + _format_superexec_logs(superexec_log_results), + ) write_event( "kubernetes_executor.pod_created", _taskexecutor_status(taskexecutor_pods_result, taskexecutor_observation), @@ -610,19 +836,7 @@ def run_command( ) taskexecutor_secrets_result = run_command( - _kubectl_args( - profile, - [ - "get", - "secrets", - "-n", - profile.namespace, - "-l", - taskexecutor_selector, - "-o", - "json", - ], - ), + _taskexecutor_secrets_args(profile, taskexecutor_selector), "TaskExecutor credential Secret observation", ) taskexecutor_secret_snapshot = _json_list_snapshot(taskexecutor_secrets_result) @@ -632,6 +846,13 @@ def run_command( selector=taskexecutor_selector, command=_command_record(taskexecutor_secrets_result), ) + lineage_secret_evidence = ( + _merge_secret_evidence( + before_cleanup_secret_evidence, taskexecutor_secret_evidence + ) + if capacity_cleanup_proof + else taskexecutor_secret_evidence + ) writer.write_json( "taskexecutor-secrets.redacted.json", taskexecutor_secret_evidence ) @@ -648,16 +869,19 @@ def run_command( task_lineage = _task_lineage( profile=profile, run_id=run_id, + mode=mode_name, seed_run_id=seed_observation["run_id"], + seed_run_ids=seed_run_ids, selector=taskexecutor_selector, - pod_snapshot=taskexecutor_pod_snapshot, - secret_evidence=taskexecutor_secret_evidence, + pod_snapshot=lineage_pod_snapshot, + secret_evidence=lineage_secret_evidence, ) writer.write_json("task-lineage.json", task_lineage) final_state = _final_state_record( profile=profile, run_id=run_id, + mode=mode_name, cleanup_requested=cleanup, taskexecutor_selector=taskexecutor_selector, pod_snapshot=taskexecutor_pod_snapshot, @@ -675,7 +899,14 @@ def run_command( "harness namespace cleanup", ) - result = "local-k8s-launch-path" if execute else "local-k8s-launch-path-dry-run" + if capacity_cleanup_proof: + result = ( + "local-k8s-capacity-cleanup-proof" + if execute + else "local-k8s-capacity-cleanup-proof-dry-run" + ) + else: + result = "local-k8s-launch-path" if execute else "local-k8s-launch-path-dry-run" status = "failed" if failures else "passed" writer.write_json( "proof-checklist.json", @@ -684,6 +915,9 @@ def run_command( dry_run=not execute, run_id=run_id, cleanup_requested=cleanup, + capacity_cleanup_proof=capacity_cleanup_proof, + active_pod_budget=profile.active_pod_budget, + seed_run_count=profile.seed_run_count, ), ) write_event( @@ -704,11 +938,16 @@ def run_command( not_validated = [ "TaskExecutor AppIo RPC completion", - "capacity wait proof", - "completed Pod and Secret cleanup proof", "NetworkPolicy/CNI enforcement", "production RBAC posture", ] + if not capacity_cleanup_proof or not execute: + not_validated.extend( + [ + "capacity wait proof", + "completed Pod and Secret cleanup proof", + ] + ) if not execute: not_validated.append("host command execution") if tls_status != "passed": @@ -738,6 +977,9 @@ def run_command( }, "selector": taskexecutor_selector, "seed_run_id": seed_observation["run_id"], + "seed_run_ids": seed_run_ids, + "expected_seed_run_count": profile.seed_run_count, + "active_pod_budget": profile.active_pod_budget, "pods": taskexecutor_observation["items"], "credential_secrets": taskexecutor_secret_evidence["items"], "final_state_counts": final_state["counts"], @@ -746,8 +988,27 @@ def run_command( "task_lineage": "task-lineage.json", "taskexecutor_pods": "taskexecutor-pods.json", "taskexecutor_secrets": "taskexecutor-secrets.redacted.json", + "superexec_logs": "diagnostics/superexec-logs.txt", "final_state": "final-state.json", "proof_checklist": "proof-checklist.json", + "capacity_blocked_pods": ( + "objects/capacity-blocked-pods.json" + if capacity_cleanup_proof + else None + ), + "secrets_before_cleanup": ( + "objects/secrets-before-cleanup.redacted.json" + if capacity_cleanup_proof + else None + ), + "cleanup_pods": ( + "objects/cleanup-pods.json" if capacity_cleanup_proof else None + ), + "secrets_after_cleanup": ( + "objects/secrets-after-cleanup.redacted.json" + if capacity_cleanup_proof + else None + ), }, "rbac": rbac_check, "image_preflight": { @@ -763,6 +1024,24 @@ def run_command( "taskexecutor_logs": [ _command_record(result) for result in taskexecutor_log_results ], + "superexec_logs": [ + _command_record(result) for result in superexec_log_results + ], + "capacity_wait": { + "observed": bool(capacity_wait_results) + and any( + _superexec_capacity_wait_observation(result)["observed"] + for result in capacity_wait_results + ), + "commands": [ + _command_record(result) for result in capacity_wait_results + ], + }, + "cleanup_observed": cleanup_observation, + "secrets": { + "before_cleanup": before_cleanup_secret_evidence["items"], + "after_cleanup": after_cleanup_secret_evidence["items"], + }, "cleanup": { "requested": cleanup, "command": cleanup_plan["command"], @@ -786,12 +1065,18 @@ def _invocation_record( apply_manifests: bool, import_images: bool, cleanup: bool, + capacity_cleanup_proof: bool, ) -> dict[str, object]: """Return reviewer-facing inputs for one local k8s launch-path run.""" cwd = Path.cwd() + mode_name = ( + "local-k8s-capacity-cleanup-proof" + if capacity_cleanup_proof + else "local-k8s-launch-path" + ) return { "schema_version": SCHEMA_VERSION, - "mode": "local-k8s-launch-path", + "mode": mode_name, "run_id": run_id, "dry_run": not execute, "cwd": str(cwd), @@ -805,6 +1090,10 @@ def _invocation_record( "apply_manifests": apply_manifests, "import_images": import_images, "cleanup_requested": cleanup, + "capacity_cleanup_proof": capacity_cleanup_proof, + "active_pod_budget": profile.active_pod_budget, + "seed_run_count": profile.seed_run_count, + "probe_hold_seconds": profile.probe_hold_seconds, }, "equivalent_argv": _equivalent_argv( profile=profile, @@ -814,6 +1103,7 @@ def _invocation_record( apply_manifests=apply_manifests, import_images=import_images, cleanup=cleanup, + capacity_cleanup_proof=capacity_cleanup_proof, ), } @@ -827,12 +1117,17 @@ def _equivalent_argv( apply_manifests: bool, import_images: bool, cleanup: bool, + capacity_cleanup_proof: bool, ) -> list[str]: args = [ "python", "framework/dev/k8s/harness.py", "--mode", - "local-k8s-launch-path", + ( + "capacity-cleanup-proof" + if capacity_cleanup_proof + else "local-k8s-launch-path" + ), "--output-dir", str(output_dir), "--cluster-name", @@ -849,7 +1144,17 @@ def _equivalent_argv( profile.superexec_image, "--timeout-seconds", str(profile.timeout_seconds), + "--seed-run-count", + str(profile.seed_run_count), + "--probe-hold-seconds", + str(profile.probe_hold_seconds), ] + if profile.active_pod_budget is not None: + args.extend(["--active-pod-budget", str(profile.active_pod_budget)]) + if profile.capacity_poll_interval is not None: + args.extend(["--capacity-poll-interval", str(profile.capacity_poll_interval)]) + if profile.capacity_log_interval is not None: + args.extend(["--capacity-log-interval", str(profile.capacity_log_interval)]) if execute: args.append("--execute") if create_cluster: @@ -908,6 +1213,26 @@ def _json_snapshot(result: CommandResult) -> dict[str, object]: return {"items": [], "parse_error": "JSON output was not an object"} +def _merge_object_list_snapshots( + *snapshots: Mapping[str, object], +) -> dict[str, object]: + """Merge Kubernetes List snapshots by object name, preserving first-seen order.""" + merged: dict[str, object] = {"items": []} + item_by_name: dict[str, Mapping[str, object]] = {} + order: list[str] = [] + for snapshot in snapshots: + for item in _object_items(snapshot): + metadata = _mapping(item.get("metadata")) + name = _string_or_none(metadata.get("name")) + if name is None: + continue + if name not in item_by_name: + order.append(name) + item_by_name[name] = item + merged["items"] = [dict(item_by_name[name]) for name in order] + return merged + + def _taskexecutor_secret_evidence( secret_snapshot: Mapping[str, object], *, @@ -923,6 +1248,31 @@ def _taskexecutor_secret_evidence( } +def _merge_secret_evidence( + *evidence_records: Mapping[str, object], +) -> dict[str, object]: + """Merge Secret evidence summaries by name.""" + merged: dict[str, object] = { + "schema_version": SCHEMA_VERSION, + "redacted": True, + "items": [], + } + item_by_name: dict[str, Mapping[str, object]] = {} + order: list[str] = [] + for evidence in evidence_records: + if "selector" not in merged and evidence.get("selector") is not None: + merged["selector"] = evidence["selector"] + for item in _object_items(evidence): + name = _string_or_none(item.get("name")) + if name is None: + continue + if name not in item_by_name: + order.append(name) + item_by_name[name] = item + merged["items"] = [dict(item_by_name[name]) for name in order] + return merged + + def _redact_secret_observation_stdout(result: CommandResult) -> None: if result.stdout.strip(): result.stdout = f"{REDACTED} Secret list JSON; see summarized items" @@ -952,7 +1302,9 @@ def _task_lineage( *, profile: HarnessProfile, run_id: str, + mode: str, seed_run_id: object, + seed_run_ids: Sequence[int], selector: str, pod_snapshot: Mapping[str, object], secret_evidence: Mapping[str, object], @@ -970,18 +1322,18 @@ def _task_lineage( pod_name = _string_or_none(metadata.get("name")) secret_name = _pod_secret_name(pod) secret = secrets_by_name.get(secret_name or "") + task_id = labels.get(_TASK_ID_LABEL) if secret is None: secret = _secret_matching_task( secret_evidence, - task_id=labels.get(_TASK_ID_LABEL), + task_id=task_id, launch_attempt=labels.get(_LAUNCH_ATTEMPT_LABEL), ) if isinstance(secret.get("name"), str): secret_name = str(secret["name"]) tasks.append( { - "seeded_run_id": seed_run_id, - "task_id": labels.get(_TASK_ID_LABEL), + "task_id": task_id, "task_type": labels.get("flower.ai/task-type"), "pod_name": pod_name, "pod_uid": _string_or_none(metadata.get("uid")), @@ -995,9 +1347,17 @@ def _task_lineage( ) return { "schema_version": SCHEMA_VERSION, - "mode": "local-k8s-launch-path", + "mode": mode, "run_id": run_id, "seeded_run_id": seed_run_id, + "seeded_run_ids": list(seed_run_ids), + "seeded_task_count": len(seed_run_ids), + "observed_task_count": len(tasks), + "lineage_note": ( + "TaskExecutor Pod labels expose executor task IDs, not ServerApp run IDs; " + "this record captures seeded run IDs and observed TaskExecutor objects " + "without claiming a per-Pod run-ID mapping." + ), "resource_pool": profile.resource_pool, "selector": selector, "tasks": tasks, @@ -1008,6 +1368,7 @@ def _final_state_record( *, profile: HarnessProfile, run_id: str, + mode: str, cleanup_requested: bool, taskexecutor_selector: str, pod_snapshot: Mapping[str, object], @@ -1054,7 +1415,7 @@ def _final_state_record( namespace_snapshot = _json_snapshot(namespace_result) return { "schema_version": SCHEMA_VERSION, - "mode": "local-k8s-launch-path", + "mode": mode, "run_id": run_id, "captured_before_namespace_cleanup": True, "cleanup_requested": cleanup_requested, @@ -1092,81 +1453,153 @@ def _proof_checklist( dry_run: bool, run_id: str, cleanup_requested: bool, + capacity_cleanup_proof: bool, + active_pod_budget: int | None, + seed_run_count: int, ) -> dict[str, object]: proof_status = "planned" if dry_run else status + claims: list[dict[str, object]] = [ + { + "claim": "Harness invocation and selected inputs are inspectable.", + "status": "proved", + "artifact": "invocation.json", + "fields": ["equivalent_argv", "repo", "profile", "settings"], + }, + { + "claim": "SuperExec is configured to use the Kubernetes executor.", + "status": proof_status, + "artifact": "objects/real-launch.yaml", + "fields": [ + "SuperExec container args include --executor kubernetes", + "executor config is mounted at /etc/flower/executor-config.yaml", + ], + }, + { + "claim": ( + "Seeded ServerApp run count and observed TaskExecutor objects are " + "captured together." + ), + "status": proof_status, + "artifact": "task-lineage.json", + "fields": [ + "seeded_run_id", + "seeded_run_ids", + "seeded_task_count", + "observed_task_count", + "tasks[].pod_name", + "tasks[].credential_secret_name", + ], + }, + { + "claim": ( + "The executor-created TaskExecutor Pod is captured as a full " + "redacted object snapshot." + ), + "status": proof_status, + "artifact": "taskexecutor-pods.json", + "fields": ["items[].metadata", "items[].spec", "items[].status"], + }, + { + "claim": ( + "The per-task credential Secret existed without exposing " + "credential values." + ), + "status": proof_status, + "artifact": "taskexecutor-secrets.redacted.json", + "fields": [ + "items[].name", + "items[].data_keys", + "items[].data_byte_lengths", + "items[].redacted", + ], + }, + { + "claim": ( + "Pre-cleanup resource state is captured before namespace deletion." + ), + "status": proof_status, + "artifact": "final-state.json", + "fields": ["counts", "resources", "captured_before_namespace_cleanup"], + }, + ] + out_of_scope = [ + "budget-2/three-task cardinality behavior", + "AppIo TLS proof", + "production deployment readiness", + ] + if capacity_cleanup_proof: + claims.extend( + [ + { + "claim": "The Kubernetes executor active Pod budget is one.", + "status": proof_status, + "artifact": "objects/executor-config.yaml", + "fields": ["active-pod-budget"], + "expected": {"active-pod-budget": active_pod_budget}, + }, + { + "claim": "Two deterministic ServerApp tasks were seeded.", + "status": proof_status, + "artifact": "summary.json", + "fields": ["details.seed_run_ids"], + "expected": {"seed_run_count": seed_run_count}, + }, + { + "claim": "SuperExec waited because TaskExecutor capacity was full.", + "status": proof_status, + "artifact": "events.jsonl", + "fields": ["capacity.wait_observed"], + }, + { + "claim": ( + "SuperExec logs used for capacity wait evidence are " + "persisted for manual review." + ), + "status": proof_status, + "artifact": "diagnostics/superexec-logs.txt", + "fields": ["waiting for kubernetes taskexecutor capacity"], + }, + { + "claim": ( + "A second TaskExecutor Pod launched after capacity opened." + ), + "status": proof_status, + "artifact": "objects/cleanup-pods.json", + "fields": ["items[].name", "items[].phase"], + }, + { + "claim": ( + "The completed first TaskExecutor Pod and credential Secret " + "were removed before namespace cleanup." + ), + "status": proof_status, + "artifact": "summary.json", + "fields": [ + "details.cleanup_observed.removed_pods", + "details.cleanup_observed.removed_secrets", + ], + }, + ] + ) + else: + out_of_scope.extend( + [ + "active Pod budget behavior", + "two-task capacity waiting", + "executor-owned completed Pod cleanup proof", + "executor-owned per-task Secret cleanup proof", + ] + ) return { "schema_version": SCHEMA_VERSION, - "mode": "local-k8s-launch-path", + "mode": ( + "local-k8s-capacity-cleanup-proof" + if capacity_cleanup_proof + else "local-k8s-launch-path" + ), "run_id": run_id, - "claims": [ - { - "claim": "Harness invocation and selected inputs are inspectable.", - "status": "proved", - "artifact": "invocation.json", - "fields": ["equivalent_argv", "repo", "profile", "settings"], - }, - { - "claim": "SuperExec is configured to use the Kubernetes executor.", - "status": proof_status, - "artifact": "objects/real-launch.yaml", - "fields": [ - "SuperExec container args include --executor kubernetes", - "executor config is mounted at /etc/flower/executor-config.yaml", - ], - }, - { - "claim": ( - "One seeded ServerApp run maps to observed TaskExecutor objects." - ), - "status": proof_status, - "artifact": "task-lineage.json", - "fields": [ - "seeded_run_id", - "tasks[].pod_name", - "tasks[].credential_secret_name", - ], - }, - { - "claim": ( - "The executor-created TaskExecutor Pod is captured as a full " - "redacted object snapshot." - ), - "status": proof_status, - "artifact": "taskexecutor-pods.json", - "fields": ["items[].metadata", "items[].spec", "items[].status"], - }, - { - "claim": ( - "The per-task credential Secret existed without exposing " - "credential values." - ), - "status": proof_status, - "artifact": "taskexecutor-secrets.redacted.json", - "fields": [ - "items[].name", - "items[].data_keys", - "items[].data_byte_lengths", - "items[].redacted", - ], - }, - { - "claim": ( - "Pre-cleanup resource state is captured before namespace deletion." - ), - "status": proof_status, - "artifact": "final-state.json", - "fields": ["counts", "resources", "captured_before_namespace_cleanup"], - }, - ], - "out_of_scope": [ - "active Pod budget behavior", - "two-task capacity waiting", - "executor-owned completed Pod cleanup proof", - "executor-owned per-task Secret cleanup proof", - "F7e cardinality behavior", - "AppIo TLS proof", - "production deployment readiness", - ], + "claims": claims, + "out_of_scope": out_of_scope, "cleanup_requested": cleanup_requested, } @@ -1180,6 +1613,45 @@ def __call__( """Run a command and return its result.""" +def _taskexecutor_secrets_args(profile: HarnessProfile, selector: str) -> list[str]: + return _kubectl_args( + profile, + [ + "get", + "secrets", + "-n", + profile.namespace, + "-l", + selector, + "-o", + "json", + ], + ) + + +def _cleanup_observation( + *, + first_pod_names: Sequence[str], + before_cleanup_secrets: Mapping[str, object], + after_cleanup_pods: Mapping[str, object], + after_cleanup_secrets: Mapping[str, object], +) -> dict[str, object]: + after_pod_names = set(_pod_names(after_cleanup_pods)) + before_secret_names = set(_secret_names(before_cleanup_secrets)) + after_secret_names = set(_secret_names(after_cleanup_secrets)) + removed_pods = [ + pod_name for pod_name in first_pod_names if pod_name not in after_pod_names + ] + removed_secrets = sorted(before_secret_names - after_secret_names) + return { + "observed": bool(removed_pods) and bool(removed_secrets), + "removed_pods": removed_pods, + "removed_secrets": removed_secrets, + "remaining_pods": sorted(after_pod_names), + "remaining_secrets": sorted(after_secret_names), + } + + def _pod_summaries(snapshot: Mapping[str, object]) -> list[dict[str, object]]: summaries: list[dict[str, object]] = [] for pod in _object_items(snapshot): diff --git a/framework/dev/k8s/test-real-launch-path.sh b/framework/dev/k8s/test-real-launch-path.sh index 20336d40525c..fb4cec80fb96 100755 --- a/framework/dev/k8s/test-real-launch-path.sh +++ b/framework/dev/k8s/test-real-launch-path.sh @@ -35,6 +35,9 @@ python_image="${PYTHON_IMAGE:-}" kubernetes_package="${KUBERNETES_PACKAGE:-}" build_images=true cleanup=true +capacity_cleanup_proof=false +seed_run_count="${SEED_RUN_COUNT:-1}" +probe_hold_seconds="${PROBE_HOLD_SECONDS:-0.0}" usage() { cat < tuple[list[str], str]: """Verify a local k8s launch-path evidence bundle.""" evidence_path = Path(evidence_dir) @@ -57,8 +60,8 @@ def verify_evidence( _expect(summary.get("status") == "passed", "summary status is not passed", failures) _expect( - summary.get("result") == "local-k8s-launch-path", - "summary result is not local-k8s-launch-path", + summary.get("result") == expected_result, + f"summary result is not {expected_result}", failures, ) _expect(not summary.get("failures"), "summary contains failures", failures) @@ -67,9 +70,14 @@ def verify_evidence( "harness did not execute host commands", failures, ) + expected_mode = ( + "local-k8s-capacity-cleanup-proof" + if expected_result == "local-k8s-capacity-cleanup-proof" + else "local-k8s-launch-path" + ) _expect( - invocation.get("mode") == "local-k8s-launch-path", - "invocation.json mode is not local-k8s-launch-path", + invocation.get("mode") == expected_mode, + f"invocation.json mode is not {expected_mode}", failures, ) _expect( @@ -113,6 +121,22 @@ def verify_evidence( "task-lineage.json seeded_run_id does not match summary", failures, ) + lineage_seed_run_ids = _sequence(task_lineage.get("seeded_run_ids")) + _expect( + list(lineage_seed_run_ids) == list(_sequence(details.get("seed_run_ids"))), + "task-lineage.json seeded_run_ids does not match summary", + failures, + ) + _expect( + task_lineage.get("seeded_task_count") == len(lineage_seed_run_ids), + "task-lineage.json seeded_task_count does not match seeded_run_ids", + failures, + ) + _expect( + task_lineage.get("observed_task_count") == len(task_lineage_tasks), + "task-lineage.json observed_task_count does not match tasks", + failures, + ) _expect( bool(_sequence(taskexecutor_pods.get("items"))), "taskexecutor-pods.json contains no Pod items", @@ -166,11 +190,18 @@ def verify_evidence( str(item) for item in _sequence(proof_checklist.get("out_of_scope")) ] _expect(bool(checklist_claims), "proof-checklist.json contains no claims", failures) - _expect( - any("capacity" in item for item in out_of_scope), - "proof-checklist.json does not keep capacity claims out of scope", - failures, - ) + if expected_result == "local-k8s-capacity-cleanup-proof": + _expect( + not any("capacity wait proof" == item for item in out_of_scope), + "proof-checklist.json incorrectly keeps capacity wait proof out of scope", + failures, + ) + else: + _expect( + any("capacity" in item for item in out_of_scope), + "proof-checklist.json does not keep capacity claims out of scope", + failures, + ) taskexecutor_logs = _sequence(details.get("taskexecutor_logs")) _expect( @@ -205,6 +236,69 @@ def verify_evidence( failures, ) + capacity_wait = _mapping(details.get("capacity_wait")) + cleanup_observed = _mapping(details.get("cleanup_observed")) + if expected_result == "local-k8s-capacity-cleanup-proof": + removed_pods = _sequence(cleanup_observed.get("removed_pods")) + remaining_pods = _sequence(cleanup_observed.get("remaining_pods")) + removed_pod_names = {str(name) for name in removed_pods} + remaining_pod_names = {str(name) for name in remaining_pods} + final_pod_names = { + str(_mapping(pod).get("name")) + for pod in pods + if _mapping(pod).get("name") is not None + } + _expect( + details.get("active_pod_budget") == 1, + "active Pod budget is not 1", + failures, + ) + _expect( + len(lineage_seed_run_ids) >= 2, + "capacity proof did not record at least two seeded run IDs", + failures, + ) + _expect( + len(task_lineage_tasks) >= 2, + "capacity proof did not record at least two observed TaskExecutor tasks", + failures, + ) + _expect( + capacity_wait.get("observed") is True, + "capacity wait was not observed", + failures, + ) + _expect( + cleanup_observed.get("observed") is True, + "completed Pod/Secret cleanup was not observed", + failures, + ) + _expect( + bool(removed_pods), + "capacity proof did not record removed Pods", + failures, + ) + _expect( + bool(_sequence(cleanup_observed.get("removed_secrets"))), + "capacity proof did not record removed Secrets", + failures, + ) + _expect( + bool(remaining_pods), + "capacity proof did not record a remaining TaskExecutor Pod after cleanup", + failures, + ) + _expect( + remaining_pod_names.issubset(final_pod_names), + "remaining cleanup Pods are not present in final TaskExecutor pod records", + failures, + ) + _expect( + remaining_pod_names.isdisjoint(removed_pod_names), + "remaining cleanup Pods overlap removed Pods", + failures, + ) + return failures, _format_report( evidence_path=evidence_path, summary=summary, @@ -215,6 +309,8 @@ def verify_evidence( pod_phases=pod_phases, cleanup_result=cleanup_result, require_cleanup=require_cleanup, + capacity_wait=capacity_wait, + cleanup_observed=cleanup_observed, failures=failures, ) @@ -252,10 +348,18 @@ def _format_report( pod_phases: Sequence[str], cleanup_result: Mapping[str, object], require_cleanup: bool, + capacity_wait: Mapping[str, object], + cleanup_observed: Mapping[str, object], failures: Sequence[str], ) -> str: pods = [_mapping(pod) for pod in _sequence(details.get("pods"))] taskexecutor_logs = _sequence(details.get("taskexecutor_logs")) + removed_pods = ", ".join( + str(item) for item in _sequence(cleanup_observed.get("removed_pods")) + ) + removed_secrets = ", ".join( + str(item) for item in _sequence(cleanup_observed.get("removed_secrets")) + ) cleanup_status = ( f"returncode={cleanup_result.get('returncode')}" if cleanup_result @@ -283,6 +387,9 @@ def _format_report( ), f"TaskExecutor log captures: {len(taskexecutor_logs)}", f"TaskExecutor phases: {', '.join(pod_phases) or ''}", + f"Capacity wait observed: {capacity_wait.get('observed')}", + f"Removed Pods: {removed_pods or ''}", + f"Removed Secrets: {removed_secrets or ''}", f"Cleanup required: {str(require_cleanup).lower()}", f"Cleanup: {cleanup_status}", ] @@ -325,6 +432,12 @@ def _parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace: action="store_true", help="Do not require the final namespace cleanup command to have run.", ) + parser.add_argument( + "--expected-result", + choices=("local-k8s-launch-path", "local-k8s-capacity-cleanup-proof"), + default="local-k8s-launch-path", + help="Expected harness result to verify.", + ) return parser.parse_args(argv) @@ -334,6 +447,7 @@ def main(argv: Sequence[str] | None = None) -> int: failures, report = verify_evidence( args.evidence_dir, require_cleanup=not args.no_require_cleanup, + expected_result=args.expected_result, ) print(report, end="") return 1 if failures else 0 diff --git a/framework/py/flwr/supercore/superexec/executor/factory.py b/framework/py/flwr/supercore/superexec/executor/factory.py index 81de158c44cb..a8fe0009ad00 100644 --- a/framework/py/flwr/supercore/superexec/executor/factory.py +++ b/framework/py/flwr/supercore/superexec/executor/factory.py @@ -28,7 +28,6 @@ from .subprocess_executor import SubprocessExecutor from .types import Executor - _KUBERNETES_CONFIG_FIELD_MAP = { "image-pull-policy": "image_pull_policy", "resource-pool": "resource_pool", diff --git a/framework/py/flwr/supercore/superexec/executor/kubernetes_executor.py b/framework/py/flwr/supercore/superexec/executor/kubernetes_executor.py index b39f89837c58..d276f961188a 100644 --- a/framework/py/flwr/supercore/superexec/executor/kubernetes_executor.py +++ b/framework/py/flwr/supercore/superexec/executor/kubernetes_executor.py @@ -187,6 +187,7 @@ def wait_for_capacity(self) -> None: return last_log_at: float | None = None + waited_for_capacity = False while True: try: active_pod_count = self._active_pod_count() @@ -200,6 +201,8 @@ def wait_for_capacity(self) -> None: ) return if active_pod_count < self._config.active_pod_budget: + if waited_for_capacity: + self._sweep_completed_pods() return if self._config.capacity_log_interval is not None: @@ -218,6 +221,7 @@ def wait_for_capacity(self) -> None: ) last_log_at = now + waited_for_capacity = True self._config.sleep(self._config.capacity_poll_interval) def _sweep_completed_pods_if_due(self) -> None: @@ -231,6 +235,10 @@ def _sweep_completed_pods_if_due(self) -> None: return self._last_completed_pod_sweep_at = now + self._sweep_completed_pods() + + def _sweep_completed_pods(self) -> None: + """Run best-effort completed Pod cleanup.""" try: self._completed_pod_sweeper.sweep() except Exception: # pylint: disable=broad-exception-caught diff --git a/framework/py/flwr/supercore/superexec/executor/kubernetes_executor_harness_test.py b/framework/py/flwr/supercore/superexec/executor/kubernetes_executor_harness_test.py index 74fc4ebdfa99..123f55b8eefb 100644 --- a/framework/py/flwr/supercore/superexec/executor/kubernetes_executor_harness_test.py +++ b/framework/py/flwr/supercore/superexec/executor/kubernetes_executor_harness_test.py @@ -16,8 +16,8 @@ from __future__ import annotations -import importlib.util import hashlib +import importlib.util import json import sys from pathlib import Path @@ -463,7 +463,9 @@ def test_render_appio_seed_manifests_create_control_api_job() -> None: assert "StartRunRequest" in seed_script assert "build_fab_from_disk(_PROBE_APP_DIR)" in seed_script assert "K8s launch seed created run_id=" in seed_script - assert "launch_probe.server_app:app" in seed_config["data"]["probe_pyproject.toml"] + probe_pyproject = seed_config["data"]["probe_pyproject.toml"] + assert "launch_probe.server_app:app" in probe_pyproject + assert "local-k8s.probe-hold-seconds = 0.0" in probe_pyproject assert ( "K8s launch probe ServerApp ran" in seed_config["data"]["launch_probe_server_app.py"] @@ -474,6 +476,10 @@ def test_render_appio_seed_manifests_create_control_api_job() -> None: "/opt/flower-local-k8s/seed_run.py", "--control-api-address", "flower-superlink:9093", + "--run-count", + "1", + "--probe-hold-seconds", + "0.0", ] assert container["volumeMounts"] == [ { @@ -493,6 +499,69 @@ def test_render_appio_seed_manifests_create_control_api_job() -> None: assert seed_job["spec"]["template"]["spec"]["automountServiceAccountToken"] is False +def test_render_kubernetes_executor_config_can_set_capacity_budget() -> None: + """Test rendered config includes optional active Pod budget knobs.""" + profile = harness_module.generic_k3d_profile() + profile.active_pod_budget = 1 + profile.capacity_poll_interval = 0.5 + profile.capacity_log_interval = 1.0 + + config = harness_module.render_kubernetes_executor_config(profile, "k8s-test") + + assert config["active-pod-budget"] == 1 + assert config["capacity-poll-interval"] == 0.5 + assert config["capacity-log-interval"] == 1.0 + + +@pytest.mark.parametrize( + ("log_text", "expected_observed"), + [ + ( + "Waiting for Kubernetes TaskExecutor capacity: " + "1 active Pods, budget 1, selector app.kubernetes.io/name=flower", + True, + ), + ("TaskExecutor capacity budget 1 reached", False), + ("1 active Pods currently running", False), + ], +) +def test_capacity_wait_observation_requires_wait_marker( + log_text: str, expected_observed: bool +) -> None: + """Test capacity wait evidence requires the explicit SuperExec wait marker.""" + result = harness_module.CommandResult( + args=["kubectl", "logs"], + returncode=0, + stdout=log_text, + stderr="", + dry_run=False, + ) + + observation = harness_module.real_launch._superexec_capacity_wait_observation( + result + ) + + assert observation["observed"] is expected_observed + + +def test_render_appio_seed_manifests_can_create_two_held_runs() -> None: + """Test seed manifests can create two deterministic held ServerApp tasks.""" + profile = harness_module.generic_k3d_profile() + profile.seed_run_count = 2 + profile.probe_hold_seconds = 5.0 + + manifests = harness_module.render_appio_seed_manifests(profile, "k8s-test") + + seed_config, seed_job = manifests + seed_script = seed_config["data"]["seed_run.py"] + container = seed_job["spec"]["template"]["spec"]["containers"][0] + assert "--run-count" in container["args"] + assert "2" in container["args"] + assert "--probe-hold-seconds" in container["args"] + assert "5.0" in container["args"] + assert "K8s launch seed created run_ids=" in seed_script + + def test_rendered_local_k8s_outputs_do_not_use_sprint_identifiers() -> None: """Test rendered launch objects do not expose sprint-local identifiers.""" profile = harness_module.generic_k3d_profile() @@ -579,6 +648,7 @@ def test_run_local_k8s_launch_path_dry_run_writes_evidence(tmp_path: Path) -> No assert "app.kubernetes.io/component=taskexecutor" in commands_text assert (output_dir / "diagnostics" / "image-preflight.txt").is_file() assert (output_dir / "diagnostics" / "cleanup.txt").is_file() + assert (output_dir / "diagnostics" / "superexec-logs.txt").is_file() assert (output_dir / "diagnostics" / "taskexecutor-logs.txt").is_file() cleanup_text = (output_dir / "diagnostics" / "cleanup.txt").read_text() assert "Cleanup requested for this run: no" in cleanup_text @@ -635,6 +705,9 @@ def test_run_local_k8s_launch_path_records_terminal_pod_logs_and_cleanup( lineage = json.loads((output_dir / "task-lineage.json").read_text()) assert lineage["seeded_run_id"] == 123 + assert lineage["seeded_task_count"] == 1 + assert lineage["observed_task_count"] == 1 + assert "without claiming a per-Pod run-ID mapping" in lineage["lineage_note"] assert lineage["tasks"] == [ { "credential_secret_name": "flwr-taskexecutor-123-abc-appio", @@ -644,7 +717,6 @@ def test_run_local_k8s_launch_path_records_terminal_pod_logs_and_cleanup( "pod_phase": "Succeeded", "pod_uid": "pod-uid-123", "resource_pool": "generic-k3d", - "seeded_run_id": 123, "task_id": "123", "task_type": "flwr-serverapp", "terminal_phase": "Succeeded", @@ -677,6 +749,9 @@ def test_run_local_k8s_launch_path_records_terminal_pod_logs_and_cleanup( ).read_text() assert "K8s launch probe ServerApp ran" in taskexecutor_logs + superexec_logs = (output_dir / "diagnostics" / "superexec-logs.txt").read_text() + assert "claim launch task_id taskexecutor" in superexec_logs + command_text = (output_dir / "diagnostics" / "commands.txt").read_text() assert "task-token" not in command_text assert "dGFzay10b2tlbg==" not in command_text @@ -701,6 +776,95 @@ def test_run_local_k8s_launch_path_records_terminal_pod_logs_and_cleanup( assert any("delete namespace flower-local-k8s" in command for command in commands) +def test_run_capacity_cleanup_proof_dry_run_writes_evidence(tmp_path: Path) -> None: + """Test capacity-cleanup dry-run writes proof events and config.""" + output_dir = tmp_path / "capacity-cleanup" + + summary = harness_module.run_local_k8s_launch_path( + output_dir, + create_cluster=True, + apply_manifests=True, + capacity_cleanup_proof=True, + ) + + assert summary.status == "passed" + assert summary.result == "local-k8s-capacity-cleanup-proof-dry-run" + assert summary.event_count == 17 + assert "capacity wait proof" in summary.not_validated + assert "completed Pod and Secret cleanup proof" in summary.not_validated + + config = yaml.safe_load( + (output_dir / "objects" / "executor-config.yaml").read_text() + ) + assert config["active-pod-budget"] == 1 + assert config["capacity-poll-interval"] == 1.0 + assert config["capacity-log-interval"] == 1.0 + + events = _read_jsonl(output_dir / "events.jsonl") + assert "capacity.wait_observed" in [event["event"] for event in events] + assert "cleanup.observed" in [event["event"] for event in events] + + +def test_run_capacity_cleanup_proof_records_wait_cleanup_and_second_launch( + tmp_path: Path, +) -> None: + """Test execute-mode evidence captures wait, cleanup, and second launch.""" + runner = _CapacityCleanupRunner() + output_dir = tmp_path / "capacity-cleanup-real" + + summary = harness_module.run_local_k8s_launch_path( + output_dir, + runner=runner, + execute=True, + apply_manifests=True, + import_images=True, + capacity_cleanup_proof=True, + ) + + assert summary.status == "passed" + assert summary.result == "local-k8s-capacity-cleanup-proof" + assert summary.details["seed_run_ids"] == [123, 456] + assert summary.details["active_pod_budget"] == 1 + assert summary.details["capacity_wait"]["observed"] is True + assert summary.details["artifacts"]["superexec_logs"] == ( + "diagnostics/superexec-logs.txt" + ) + assert summary.details["cleanup_observed"]["removed_pods"] == [ + "flwr-taskexecutor-123-abc" + ] + assert summary.details["cleanup_observed"]["removed_secrets"] == [ + "flwr-taskexecutor-123-abc-appio" + ] + assert summary.details["pods"][0]["name"] == "flwr-taskexecutor-456-def" + assert summary.details["pods"][0]["phase"] == "Succeeded" + + cleanup_pods = json.loads( + (output_dir / "objects" / "cleanup-pods.json").read_text() + ) + assert [pod["name"] for pod in cleanup_pods["items"]] == [ + "flwr-taskexecutor-456-def" + ] + lineage = json.loads((output_dir / "task-lineage.json").read_text()) + assert lineage["seeded_run_ids"] == [123, 456] + assert lineage["seeded_task_count"] == 2 + assert lineage["observed_task_count"] == 2 + assert [task["pod_name"] for task in lineage["tasks"]] == [ + "flwr-taskexecutor-123-abc", + "flwr-taskexecutor-456-def", + ] + superexec_logs = (output_dir / "diagnostics" / "superexec-logs.txt").read_text() + assert "claim launch task_id taskexecutor" in superexec_logs + assert "Waiting for Kubernetes TaskExecutor capacity" in superexec_logs + + checklist = json.loads((output_dir / "proof-checklist.json").read_text()) + assert not any(item == "capacity wait proof" for item in checklist["out_of_scope"]) + assert any("capacity was full" in claim["claim"] for claim in checklist["claims"]) + assert any( + claim["artifact"] == "diagnostics/superexec-logs.txt" + for claim in checklist["claims"] + ) + + def test_run_local_k8s_launch_path_polls_until_taskexecutor_pod_appears( tmp_path: Path, ) -> None: @@ -814,6 +978,67 @@ def test_verify_local_k8s_launch_evidence_rejects_missing_serverapp_marker( assert "Verification: FAILED" in report +def test_verify_capacity_cleanup_evidence_accepts_passing_bundle( + tmp_path: Path, +) -> None: + """Test the verifier accepts a passing capacity cleanup bundle.""" + output_dir = tmp_path / "evidence" + _write_verifier_evidence(output_dir, result="local-k8s-capacity-cleanup-proof") + + failures, report = verifier_module.verify_evidence( + output_dir, + expected_result="local-k8s-capacity-cleanup-proof", + ) + + assert failures == [] + assert "Verification: PASSED" in report + assert "Capacity wait observed: True" in report + assert "Removed Pods: flwr-taskexecutor-123-abc" in report + + +def test_verify_capacity_cleanup_evidence_rejects_single_task_record( + tmp_path: Path, +) -> None: + """Test capacity cleanup verification requires two observed task records.""" + output_dir = tmp_path / "evidence" + _write_verifier_evidence(output_dir, result="local-k8s-capacity-cleanup-proof") + lineage_path = output_dir / "task-lineage.json" + lineage = json.loads(lineage_path.read_text(encoding="utf-8")) + lineage["tasks"] = lineage["tasks"][:1] + lineage["observed_task_count"] = 1 + lineage_path.write_text(json.dumps(lineage), encoding="utf-8") + + failures, report = verifier_module.verify_evidence( + output_dir, + expected_result="local-k8s-capacity-cleanup-proof", + ) + + assert any( + "at least two observed TaskExecutor tasks" in failure for failure in failures + ) + assert "Verification: FAILED" in report + + +def test_verify_capacity_cleanup_evidence_rejects_missing_remaining_pod( + tmp_path: Path, +) -> None: + """Test capacity cleanup verification requires a remaining post-cleanup Pod.""" + output_dir = tmp_path / "evidence" + _write_verifier_evidence(output_dir, result="local-k8s-capacity-cleanup-proof") + summary_path = output_dir / "summary.json" + summary = json.loads(summary_path.read_text(encoding="utf-8")) + summary["details"]["cleanup_observed"]["remaining_pods"] = [] + summary_path.write_text(json.dumps(summary), encoding="utf-8") + + failures, report = verifier_module.verify_evidence( + output_dir, + expected_result="local-k8s-capacity-cleanup-proof", + ) + + assert any("remaining TaskExecutor Pod" in failure for failure in failures) + assert "Verification: FAILED" in report + + class _AllowEverythingRunner: """Fake command runner that reports yes for every RBAC can-i check.""" @@ -927,6 +1152,116 @@ def _rbac_allowed(args: list[str]) -> bool: return spec in allowed_specs and "-n" in args and "flower-local-k8s" in args +class _CapacityCleanupRunner: + """Fake command runner for execute-mode capacity cleanup proof evidence.""" + + def __init__(self) -> None: + self.commands: list[list[str]] = [] + self.pod_get_count = 0 + self.secret_get_count = 0 + self.superexec_log_count = 0 + + def run(self, args: list[str]) -> Any: + """Return realistic command output for the capacity proof.""" + self.commands.append(list(args)) + if args[:3] == ["docker", "image", "inspect"]: + return self._result(args) + if args[:3] == ["k3d", "cluster", "list"]: + return self._result(args, stdout="NAME\nflower-local-k8s\n") + if args[:3] == ["k3d", "image", "import"]: + return self._result(args, stdout="imported\n") + if "auth" in args and "can-i" in args: + allowed = _RealLaunchRunner._rbac_allowed(args) + return self._result( + args, + returncode=0 if allowed else 1, + stdout="yes\n" if allowed else "no\n", + ) + if "wait" in args and "--for=jsonpath={.status.phase}=Succeeded" in args: + return self._result(args) + if "get" in args and "pods" in args and "-o" in args and "json" in args: + self.pod_get_count += 1 + if self.pod_get_count == 1: + return self._result( + args, + stdout=json.dumps( + _pod_list_items( + _taskexecutor_pod("flwr-taskexecutor-123-abc", "Running") + ) + ), + ) + if self.pod_get_count == 2: + return self._result( + args, + stdout=json.dumps( + _pod_list_items( + _taskexecutor_pod("flwr-taskexecutor-456-def", "Running") + ) + ), + ) + return self._result( + args, + stdout=json.dumps( + _pod_list_items( + _taskexecutor_pod("flwr-taskexecutor-456-def", "Succeeded") + ) + ), + ) + if "get" in args and "secrets" in args and "-o" in args and "json" in args: + self.secret_get_count += 1 + if self.secret_get_count == 1: + return self._result( + args, + stdout=json.dumps( + _secret_list_for_name("flwr-taskexecutor-123-abc-appio") + ), + ) + return self._result( + args, + stdout=json.dumps( + _secret_list_for_name("flwr-taskexecutor-456-def-appio") + ), + ) + if "get" in args and "jobs" in args and "-o" in args and "json" in args: + return self._result(args, stdout=json.dumps(_object_list("Job"))) + if "get" in args and "services" in args and "-o" in args and "json" in args: + return self._result(args, stdout=json.dumps(_object_list("Service"))) + if "get" in args and "namespace" in args and "-o" in args and "json" in args: + return self._result(args, stdout=json.dumps(_namespace())) + if "logs" in args and "job/flower-local-k8s-seed-run" in args: + return self._result( + args, + stdout=( + "K8s launch seed created run_id=123\n" + "K8s launch seed created run_id=456\n" + "K8s launch seed created run_ids=123,456\n" + ), + ) + if "logs" in args and "pod/flower-superexec" in args: + self.superexec_log_count += 1 + stdout = "claim launch task_id taskexecutor\n" + if self.superexec_log_count > 1: + stdout += ( + "Waiting for Kubernetes TaskExecutor capacity: " + "1 active Pods, budget 1, selector app.kubernetes.io/name=flower\n" + ) + return self._result(args, stdout=stdout) + if "logs" in args and "pod/flwr-taskexecutor-456-def" in args: + return self._result(args, stdout="K8s launch probe ServerApp ran\n") + return self._result(args) + + @staticmethod + def _result( + args: list[str], *, returncode: int = 0, stdout: str = "", stderr: str = "" + ) -> Any: + return harness_module.CommandResult( + args=list(args), + returncode=returncode, + stdout=stdout, + stderr=stderr, + ) + + def _pod_list(phase: str) -> dict[str, Any]: return { "items": [ @@ -972,6 +1307,53 @@ def _pod_list(phase: str) -> dict[str, Any]: } +def _pod_list_items(*pods: dict[str, Any]) -> dict[str, Any]: + return {"items": list(pods)} + + +def _taskexecutor_pod(name: str, phase: str) -> dict[str, Any]: + task_id = name.split("-")[2] + launch_attempt = name.rsplit("-", maxsplit=1)[1] + return { + "metadata": { + "name": name, + "namespace": "flower-local-k8s", + "uid": f"pod-uid-{task_id}", + "labels": { + "app.kubernetes.io/name": "flower", + "app.kubernetes.io/component": "taskexecutor", + "flower.ai/harness-run": "k8s-launch-test", + "flower.ai/launch-attempt": launch_attempt, + "flower.ai/resource-pool": "generic-k3d", + "flower.ai/superexec-task-id": task_id, + "flower.ai/task-type": "flwr-serverapp", + }, + }, + "spec": { + "containers": [ + { + "name": "taskexecutor", + "image": "flwr/superexec:dev", + "args": [ + "--serverappio-api-address", + "flower-superlink:9091", + "--token-file", + "/run/flwr/appio/token", + "--insecure", + ], + } + ], + "volumes": [ + { + "name": "appio-credentials", + "secret": {"secretName": f"{name}-appio"}, + } + ], + }, + "status": {"phase": phase}, + } + + def _secret_list() -> dict[str, Any]: return { "items": [ @@ -998,6 +1380,34 @@ def _secret_list() -> dict[str, Any]: } +def _secret_list_for_name(name: str) -> dict[str, Any]: + task_id = name.split("-")[2] + launch_attempt = name.removesuffix("-appio").rsplit("-", maxsplit=1)[1] + return { + "items": [ + { + "kind": "Secret", + "metadata": { + "name": name, + "namespace": "flower-local-k8s", + "uid": f"secret-uid-{task_id}", + "labels": { + "app.kubernetes.io/name": "flower", + "app.kubernetes.io/component": "taskexecutor", + "flower.ai/harness-run": "k8s-launch-test", + "flower.ai/launch-attempt": launch_attempt, + "flower.ai/resource-pool": "generic-k3d", + "flower.ai/superexec-task-id": task_id, + "flower.ai/task-type": "flwr-serverapp", + }, + }, + "type": "Opaque", + "data": {"token": "dGFzay10b2tlbg=="}, + } + ] + } + + def _object_list(kind: str) -> dict[str, Any]: return { "items": [ @@ -1024,48 +1434,94 @@ def _namespace() -> dict[str, Any]: def _write_verifier_evidence( - output_dir: Path, *, taskexecutor_log_text: str = "K8s launch probe ServerApp ran\n" + output_dir: Path, + *, + taskexecutor_log_text: str = "K8s launch probe ServerApp ran\n", + result: str = "local-k8s-launch-path", ) -> None: (output_dir / "diagnostics").mkdir(parents=True) + capacity_proof = result == "local-k8s-capacity-cleanup-proof" + final_pod_name = ( + "flwr-taskexecutor-456-def" if capacity_proof else "flwr-taskexecutor-test" + ) + lineage_tasks = ( + [ + { + "pod_name": "flwr-taskexecutor-123-abc", + "credential_secret_name": "flwr-taskexecutor-123-abc-appio", + }, + { + "pod_name": final_pod_name, + "credential_secret_name": f"{final_pod_name}-appio", + }, + ] + if capacity_proof + else [ + { + "pod_name": final_pod_name, + "credential_secret_name": f"{final_pod_name}-appio", + } + ] + ) summary = { "status": "passed", - "result": "local-k8s-launch-path", + "result": result, "failures": [], "details": { "run_id": "k8s-launch-test", "seed_run_id": 123, + "seed_run_ids": [123, 456] if capacity_proof else [123], + "active_pod_budget": 1 if capacity_proof else None, "dry_run": False, "image_preflight": { "docker_inspect": {"returncode": 0}, "k3d_import": {"returncode": 0}, }, "rbac": {"status": "passed"}, - "pods": [{"name": "flwr-taskexecutor-test", "phase": "Succeeded"}], + "pods": [{"name": final_pod_name, "phase": "Succeeded"}], "taskexecutor_logs": [{"returncode": 0}], + "capacity_wait": {"observed": capacity_proof}, + "cleanup_observed": { + "observed": capacity_proof, + "removed_pods": ( + ["flwr-taskexecutor-123-abc"] if capacity_proof else [] + ), + "removed_secrets": ( + ["flwr-taskexecutor-123-abc-appio"] if capacity_proof else [] + ), + "remaining_pods": [final_pod_name] if capacity_proof else [], + }, "cleanup": {"requested": True, "result": {"returncode": 0}}, }, } (output_dir / "summary.json").write_text(json.dumps(summary), encoding="utf-8") (output_dir / "invocation.json").write_text( - json.dumps({"mode": "local-k8s-launch-path", "dry_run": False}), + json.dumps( + { + "mode": ( + "local-k8s-capacity-cleanup-proof" + if capacity_proof + else "local-k8s-launch-path" + ), + "dry_run": False, + } + ), encoding="utf-8", ) (output_dir / "task-lineage.json").write_text( json.dumps( { "seeded_run_id": 123, - "tasks": [ - { - "pod_name": "flwr-taskexecutor-test", - "credential_secret_name": "flwr-taskexecutor-test-appio", - } - ], + "seeded_run_ids": [123, 456] if capacity_proof else [123], + "seeded_task_count": 2 if capacity_proof else 1, + "observed_task_count": len(lineage_tasks), + "tasks": lineage_tasks, } ), encoding="utf-8", ) (output_dir / "taskexecutor-pods.json").write_text( - json.dumps({"items": [{"metadata": {"name": "flwr-taskexecutor-test"}}]}), + json.dumps({"items": [{"metadata": {"name": final_pod_name}}]}), encoding="utf-8", ) (output_dir / "taskexecutor-secrets.redacted.json").write_text( @@ -1074,7 +1530,7 @@ def _write_verifier_evidence( "redacted": True, "items": [ { - "name": "flwr-taskexecutor-test-appio", + "name": f"{final_pod_name}-appio", "data_keys": ["token"], "redacted": True, } @@ -1096,7 +1552,11 @@ def _write_verifier_evidence( json.dumps( { "claims": [{"claim": "TaskExecutor Pod observed"}], - "out_of_scope": ["capacity wait proof"], + "out_of_scope": ( + ["budget-2/three-task cardinality behavior"] + if capacity_proof + else ["capacity wait proof"] + ), } ), encoding="utf-8", diff --git a/framework/py/flwr/supercore/superexec/executor/kubernetes_executor_test.py b/framework/py/flwr/supercore/superexec/executor/kubernetes_executor_test.py index d9932e1071a4..3dcc846ed1b8 100644 --- a/framework/py/flwr/supercore/superexec/executor/kubernetes_executor_test.py +++ b/framework/py/flwr/supercore/superexec/executor/kubernetes_executor_test.py @@ -554,6 +554,7 @@ def test_wait_for_capacity_sleeps_and_polls_again_at_budget() -> None: {"items": []}, {"items": [_pod("Pending")]}, {"items": []}, + {"items": []}, ] client.list_namespaced_secret.return_value = {"items": []} sleep = Mock() @@ -566,7 +567,7 @@ def test_wait_for_capacity_sleeps_and_polls_again_at_budget() -> None: KubernetesExecutor(client=client, config=config).wait_for_capacity() - assert client.list_namespaced_pod.call_count == 3 + assert client.list_namespaced_pod.call_count == 4 sleep.assert_called_once_with(3.0) @@ -584,6 +585,7 @@ def test_wait_for_capacity_counts_pending_running_and_terminating_pods() -> None ] }, {"items": []}, + {"items": []}, ] client.list_namespaced_secret.return_value = {"items": []} sleep = Mock() @@ -643,6 +645,41 @@ def test_wait_for_capacity_sweeps_terminal_pods_before_capacity_check() -> None: ) +def test_wait_for_capacity_sweeps_after_blocked_wait_opens() -> None: + """Test cleanup runs again after a blocked wait opens capacity.""" + client = Mock() + labels = _task_labels(123) + client.list_namespaced_pod.side_effect = [ + {"items": []}, + {"items": [_pod("Running", labels=labels)]}, + {"items": [_pod("Succeeded", labels=labels)]}, + {"items": [_pod("Succeeded", labels=labels)]}, + ] + client.list_namespaced_secret.side_effect = [ + {"items": []}, + {"items": [_secret(_SECRET_NAME, labels)]}, + ] + sleep = Mock() + config = _executor_config( + resource_pool="gpu-pool", + active_pod_budget=1, + capacity_poll_interval=3.0, + sleep=sleep, + ) + + KubernetesExecutor(client=client, config=config).wait_for_capacity() + + sleep.assert_called_once_with(3.0) + client.delete_namespaced_pod.assert_called_once_with( + name=_POD_NAME, + namespace="flower-system", + grace_period_seconds=0, + ) + client.delete_namespaced_secret.assert_called_once_with( + name=_SECRET_NAME, namespace="flower-system" + ) + + def test_wait_for_capacity_throttles_completed_pod_sweeps() -> None: """Test capacity wait does not sweep more often than the internal interval.""" client = Mock()