diff --git a/framework/dev/k8s/README.md b/framework/dev/k8s/README.md index 5d74216a6c7e..c5812516ba59 100644 --- a/framework/dev/k8s/README.md +++ b/framework/dev/k8s/README.md @@ -5,12 +5,15 @@ cluster, starts SuperLink and SuperExec, seeds deterministic ServerApp runs through the Control API, and verifies that the Kubernetes executor creates TaskExecutor Pods that reach `Succeeded`. -It has two main modes: +It has three common modes: - the default one-task launch-path proof; and - the `--capacity-cleanup-proof` mode, which uses active Pod budget `1`, seeds two tasks, observes SuperExec waiting for capacity, and verifies completed - TaskExecutor Pod/Secret cleanup before broad namespace cleanup. + TaskExecutor Pod/Secret cleanup before broad namespace cleanup; and +- the `--demo` preset, which uses active Pod budget `4`, seeds eight tasks, + keeps probe ServerApps active for inspection, leaves resources in place, and + proves the capacity cardinality case. ## Prerequisites @@ -57,6 +60,26 @@ python framework/dev/k8s/verify_evidence.py "${output_dir}" \ --expected-result local-k8s-capacity-cleanup-proof ``` +To run the demo-friendly budget-4/eight-task cardinality proof: + +```bash +output_dir=/private/tmp/f7e-demo-cardinality-proof-$(date +%Y%m%d-%H%M%S) +./framework/dev/k8s/test-real-launch-path.sh \ + --demo \ + --output-dir "${output_dir}" +``` + +The demo preset leaves namespace resources in place for live inspection. Verify +the saved bundle with the explicit demo expectations: + +```bash +python framework/dev/k8s/verify_evidence.py "${output_dir}" \ + --expected-result local-k8s-capacity-cleanup-proof \ + --expected-active-pod-budget 4 \ + --expected-seed-run-count 8 \ + --no-require-cleanup +``` + `/private/tmp` is only an example local scratch location. For handoff or review, choose a durable writable directory, or archive the completed evidence directory after saving the verifier report. @@ -98,6 +121,9 @@ python framework/dev/k8s/verify_evidence.py "${output_dir}" \ | Capacity-proof seeded runs | `2` | | Capacity-proof active Pod budget | `1` | | Capacity-proof probe hold | `5.0` seconds | +| Demo seeded runs | `8` | +| Demo active Pod budget | `4` | +| Demo probe hold | `45` seconds | | ServerApp marker | `K8s launch probe ServerApp ran` | ## Output @@ -180,8 +206,9 @@ map in machine-readable form. For the default proof, `seed_run_id` and `seeded_run_id` should be present and should match. For the capacity cleanup proof, `summary.json` should list - two `seed_run_ids`, `task-lineage.json` should list the same - `seeded_run_ids`, and `seeded_task_count` should be `2`. + the expected `seed_run_ids`, `task-lineage.json` should list the same + `seeded_run_ids`, and `seeded_task_count` should match the expected run + count. The `--demo` preset expects three seeded runs. 4. Confirm the Kubernetes executor created the TaskExecutor Pod. @@ -230,9 +257,10 @@ map in machine-readable form. For `--capacity-cleanup-proof`, additionally confirm: -1. `objects/executor-config.yaml` sets `active-pod-budget: 1`. -2. `summary.json` lists two `seed_run_ids`, and `task-lineage.json` records at - least two observed TaskExecutor task records. +1. `objects/executor-config.yaml` sets the selected `active-pod-budget`. +2. `summary.json` lists the expected `seed_run_ids`, and + `task-lineage.json` records at least that many observed TaskExecutor task + records. 3. `events.jsonl` has a passing `capacity.wait_observed` event. Also open `diagnostics/superexec-logs.txt`; it should include the specific SuperExec wait marker @@ -249,9 +277,23 @@ For `--capacity-cleanup-proof`, additionally confirm: `Capacity wait observed: True`, include non-empty removed Pod/Secret lines, and end with `Verification: PASSED`. - In this mode, `TaskExecutor Pods: 1` in the verifier report is expected - after cleanup: it is the remaining/new TaskExecutor Pod. The two-task - evidence comes from `Task lineage records: 2` and `task-lineage.json`. + In the budget-1/two-task mode, `TaskExecutor Pods: 1` in the verifier report + is expected after cleanup: it is the remaining/new TaskExecutor Pod. The + full task-count evidence comes from `Task lineage records` and + `task-lineage.json`. + +For `--demo`, additionally confirm: + +1. `objects/executor-config.yaml` sets `active-pod-budget: 4`. +2. `summary.json` has `expected_seed_run_count: 8`, + `active_pod_budget: 4`, and `cardinality.observed: true`. +3. `summary.json` lists four `cardinality.first_active_pods`, proving the + budget was full before additional launches. +4. `summary.json` lists four `cardinality.launched_after_capacity_opened` + entries, proving waiting TaskExecutors launched after capacity opened. +5. `diagnostics/superexec-logs.txt` includes the capacity wait marker with + `4 active Pods` and `budget 4`. +6. `proof-checklist.json` does not list capacity cardinality as out of scope. ## What Is Tested @@ -267,13 +309,13 @@ For `--capacity-cleanup-proof`, additionally confirm: | ServerApp execution marker | Yes | Verifies `K8s launch probe ServerApp ran` in TaskExecutor logs. | | Capacity wait | Optional | `--capacity-cleanup-proof` seeds two runs with active Pod budget `1` and requires SuperExec wait evidence. | | Sweeper cleanup | Optional | `--capacity-cleanup-proof` requires the first completed TaskExecutor Pod and Secret to be removed before namespace cleanup. | +| Cardinality proof | Optional | `--demo` seeds three runs with active Pod budget `2` and requires two active Pods before the third waits and launches after a slot opens. | | Wrapper cleanup | Yes | Default wrapper behavior deletes the namespace and verifies cleanup evidence. | ## Out Of Scope | Area | Tested | Notes | | --- | --- | --- | -| Cardinality proof | No | The capacity proof uses budget `1` and two tasks; budget `2`/three-task cardinality is a later slice. | | AppIo result completion semantics | No | This slice observes launch and Pod success, not full result semantics. | | ClientApp execution | No | The probe includes a minimal ClientApp file only because the FAB schema expects it. | | TLS, CNI/NetworkPolicy, production RBAC | No | This is local/dev-only and uses insecure local AppIo. | @@ -290,6 +332,26 @@ kubectl --context k3d-flower-local-k8s logs pod/flower-superlink -n flower-local kubectl --context k3d-flower-local-k8s logs pod/flower-superexec -n flower-local-k8s ``` +Live demo watch commands: + +```bash +watch -n 1 'kubectl get pods -n flower-local-k8s -o wide --sort-by=.metadata.creationTimestamp' +``` + +```bash +watch -n 1 'kubectl get pods -n flower-local-k8s -l app.kubernetes.io/component=taskexecutor -L flower.ai/resource-pool,flower.ai/superexec-task-id,flower.ai/launch-attempt --sort-by=.metadata.creationTimestamp' +``` + +```bash +kubectl logs -n flower-local-k8s -f pod/flower-superexec --tail=200 +``` + +On macOS without `watch`, use a loop: + +```bash +while true; do clear; date; kubectl get pods -n flower-local-k8s -o wide --sort-by=.metadata.creationTimestamp; sleep 1; done +``` + Verify an existing default launch-path bundle: ```bash diff --git a/framework/dev/k8s/assets/probe_app/launch_probe/server_app.py b/framework/dev/k8s/assets/probe_app/launch_probe/server_app.py index aa7ad2509ef5..5a565cd59a9c 100644 --- a/framework/dev/k8s/assets/probe_app/launch_probe/server_app.py +++ b/framework/dev/k8s/assets/probe_app/launch_probe/server_app.py @@ -25,7 +25,15 @@ @app.main() def main(grid, context): """Run the probe ServerApp and optionally stay active for capacity tests.""" - print("K8s launch probe ServerApp ran") + run_id = context.run_id + print(f"K8s launch probe ServerApp starting run_id={run_id}", flush=True) + print(f"K8s launch probe ServerApp ran run_id={run_id}", flush=True) hold_seconds = context.run_config.get(_PROBE_HOLD_SECONDS_CONFIG_KEY, 0.0) if isinstance(hold_seconds, (float, int)) and hold_seconds > 0: + print( + f"K8s launch probe ServerApp sleeping run_id={run_id} " + f"seconds={float(hold_seconds)}", + flush=True, + ) time.sleep(float(hold_seconds)) + print(f"K8s launch probe ServerApp exiting run_id={run_id}", flush=True) diff --git a/framework/dev/k8s/harness.py b/framework/dev/k8s/harness.py index ea74702e4e30..391239b79178 100644 --- a/framework/dev/k8s/harness.py +++ b/framework/dev/k8s/harness.py @@ -29,13 +29,13 @@ import real_launch # noqa: E402,F401 from common import ( # noqa: E402,F401 + REDACTED, CommandResult, EvidenceBundleWriter, HarnessEvent, HarnessProfile, HarnessSummary, HostCommandRunner, - REDACTED, _command_error, _command_record, _kubectl_args, @@ -394,7 +394,7 @@ def _parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace: "the contract scaffold; infra-proof mode writes " "infra/TLS/RBAC evidence; local-k8s-launch-path mode writes " "SuperLink/SuperExec/TaskExecutor evidence; capacity-cleanup-proof " - "mode writes the budget-1/two-task capacity and cleanup proof. " + "mode writes capacity and cleanup proof evidence. " "Host commands only run with --execute." ) ) diff --git a/framework/dev/k8s/real_launch.py b/framework/dev/k8s/real_launch.py index cf8434a4efc2..0d1089e15989 100644 --- a/framework/dev/k8s/real_launch.py +++ b/framework/dev/k8s/real_launch.py @@ -26,17 +26,17 @@ from uuid import uuid4 from common import ( + REDACTED, + SCHEMA_VERSION, CommandResult, EvidenceBundleWriter, HarnessEvent, HarnessProfile, HarnessSummary, HostCommandRunner, - REDACTED, - SCHEMA_VERSION, + _combined_status, _command_error, _command_record, - _combined_status, _format_cleanup_plan, _format_image_preflight, _format_superexec_logs, @@ -68,9 +68,9 @@ _pod_names, _pod_observation, _pod_phases, - _seed_observation, _secret_names, _secret_observation, + _seed_observation, _superexec_capacity_wait_observation, _superexec_claim_observation, _taskexecutor_phase_status, @@ -506,6 +506,11 @@ def run_command( taskexecutor_observation: dict[str, object] = {"items": [], "phases": []} taskexecutor_wait_results: list[CommandResult] = [] capacity_wait_results: list[CommandResult] = [] + capacity_wait_observation: dict[str, object] = { + "observed": False, + "markers": [], + } + launched_after_capacity_opened: list[str] = [] before_cleanup_secret_evidence: dict[str, object] = { "schema_version": SCHEMA_VERSION, "redacted": True, @@ -525,7 +530,15 @@ def run_command( "remaining_pods": [], "remaining_secrets": [], } + cardinality_observation: dict[str, object] = { + "required": _requires_cardinality_proof(profile), + "observed": False, + "reason": "not evaluated", + } taskexecutor_deadline = time.monotonic() + profile.timeout_seconds + required_initial_active_pods = _required_initial_active_pod_count( + profile, capacity_cleanup_proof=capacity_cleanup_proof + ) while True: taskexecutor_pods_result = run_command( _taskexecutor_pods_args(profile, taskexecutor_selector), @@ -534,8 +547,14 @@ def run_command( ) taskexecutor_pod_attempts.append(taskexecutor_pods_result) taskexecutor_observation = _pod_observation(taskexecutor_pods_result) + initial_capacity_ready = ( + len(_active_pod_names(taskexecutor_observation)) + >= required_initial_active_pods + if capacity_cleanup_proof + else bool(taskexecutor_observation["items"]) + ) if ( - taskexecutor_observation["items"] + initial_capacity_ready or not execute or taskexecutor_pods_result.dry_run ): @@ -547,8 +566,11 @@ def run_command( f"{_command_error(taskexecutor_pods_result)}" ) failures.append( - "No TaskExecutor Pod was observed through the local k8s selector " - "before timeout." + _taskexecutor_initial_observation_failure( + taskexecutor_observation, + required_active_pods=required_initial_active_pods, + capacity_cleanup_proof=capacity_cleanup_proof, + ) ) break time.sleep( @@ -579,10 +601,6 @@ def run_command( command=_command_record(before_cleanup_secrets_result), ) - capacity_wait_observation: dict[str, object] = { - "observed": False, - "markers": [], - } capacity_wait_result = CommandResult(args=[], returncode=0, dry_run=not execute) capacity_deadline = time.monotonic() + profile.timeout_seconds while True: @@ -650,21 +668,23 @@ def run_command( taskexecutor_observation = _pod_observation(taskexecutor_pods_result) current_pod_names = _pod_names(taskexecutor_observation) observed_pod_names.update(current_pod_names) - new_pod_names = [ + launched_after_capacity_opened = sorted( name - for name in current_pod_names + for name in observed_pod_names if name not in first_taskexecutor_pod_names - ] + ) if ( - new_pod_names - or len(observed_pod_names) >= profile.seed_run_count + len(observed_pod_names) >= profile.seed_run_count or not execute or taskexecutor_pods_result.dry_run ): break if time.monotonic() >= second_deadline: + observed_count = len(observed_pod_names) failures.append( - "Second TaskExecutor Pod was not observed after capacity opened." + "Expected " + f"{profile.seed_run_count} TaskExecutor Pods to launch after " + f"capacity opened, but observed {observed_count}." ) break time.sleep( @@ -697,11 +717,29 @@ def run_command( after_cleanup_pods=taskexecutor_observation, after_cleanup_secrets=after_cleanup_secret_observation, ) + cardinality_observation = _cardinality_observation( + profile=profile, + seed_run_ids=seed_run_ids, + first_taskexecutor_observation=first_taskexecutor_observation, + after_capacity_observation=taskexecutor_observation, + capacity_wait_observation=capacity_wait_observation, + cleanup_observation=cleanup_observation, + launched_after_capacity_opened=launched_after_capacity_opened, + ) if execute and not cleanup_observation["observed"]: failures.append( "Completed TaskExecutor Pod and credential Secret cleanup was " "not observed before namespace cleanup." ) + if ( + execute + and cardinality_observation["required"] + and not cardinality_observation["observed"] + ): + failures.append( + "TaskExecutor capacity cardinality was not observed: " + f"{cardinality_observation['reason']}" + ) writer.write_json("objects/capacity-blocked-pods.json", blocked_pod_snapshot) writer.write_json( "objects/secrets-before-cleanup.redacted.json", @@ -727,6 +765,7 @@ def run_command( "pods_after_cleanup": taskexecutor_observation["items"], "secrets_before_cleanup": before_cleanup_secret_observation["items"], "secrets_after_cleanup": after_cleanup_secret_observation["items"], + "cardinality": cardinality_observation, "second_pod_attempts": [ _command_record(result) for result in second_pod_attempts ], @@ -1038,6 +1077,7 @@ def run_command( ], }, "cleanup_observed": cleanup_observation, + "cardinality": cardinality_observation, "secrets": { "before_cleanup": before_cleanup_secret_evidence["items"], "after_cleanup": after_cleanup_secret_evidence["items"], @@ -1522,23 +1562,30 @@ def _proof_checklist( "fields": ["counts", "resources", "captured_before_namespace_cleanup"], }, ] - out_of_scope = [ - "budget-2/three-task cardinality behavior", - "AppIo TLS proof", - "production deployment readiness", - ] + cardinality_proof = _requires_cardinality_numbers( + active_pod_budget=active_pod_budget, + seed_run_count=seed_run_count, + ) + out_of_scope = ["AppIo TLS proof", "production deployment readiness"] + if not cardinality_proof: + out_of_scope.insert(0, "capacity cardinality behavior") if capacity_cleanup_proof: claims.extend( [ { - "claim": "The Kubernetes executor active Pod budget is one.", + "claim": ( + "The Kubernetes executor active Pod budget is " + f"{active_pod_budget}." + ), "status": proof_status, "artifact": "objects/executor-config.yaml", "fields": ["active-pod-budget"], "expected": {"active-pod-budget": active_pod_budget}, }, { - "claim": "Two deterministic ServerApp tasks were seeded.", + "claim": ( + f"{seed_run_count} deterministic ServerApp tasks were seeded." + ), "status": proof_status, "artifact": "summary.json", "fields": ["details.seed_run_ids"], @@ -1561,7 +1608,7 @@ def _proof_checklist( }, { "claim": ( - "A second TaskExecutor Pod launched after capacity opened." + "Additional TaskExecutor Pods launched after capacity opened." ), "status": proof_status, "artifact": "objects/cleanup-pods.json", @@ -1581,6 +1628,40 @@ def _proof_checklist( }, ] ) + if cardinality_proof: + claims.extend( + [ + { + "claim": ( + f"{active_pod_budget} TaskExecutor Pods were active " + "concurrently while the Kubernetes executor active " + f"Pod budget was {active_pod_budget}." + ), + "status": proof_status, + "artifact": "summary.json", + "fields": [ + "details.cardinality.first_active_pods", + "details.cardinality.active_pod_budget", + ], + "expected": { + "active-pod-budget": active_pod_budget, + "seed_run_count": seed_run_count, + }, + }, + { + "claim": ( + "After capacity opened, waiting TaskExecutor Pods " + "launched." + ), + "status": proof_status, + "artifact": "summary.json", + "fields": [ + "details.cardinality.removed_pods", + "details.cardinality.launched_after_capacity_opened", + ], + }, + ] + ) else: out_of_scope.extend( [ @@ -1629,6 +1710,141 @@ def _taskexecutor_secrets_args(profile: HarnessProfile, selector: str) -> list[s ) +def _required_initial_active_pod_count( + profile: HarnessProfile, *, capacity_cleanup_proof: bool +) -> int: + if not capacity_cleanup_proof or profile.active_pod_budget is None: + return 1 + return max(1, min(profile.active_pod_budget, profile.seed_run_count)) + + +def _taskexecutor_initial_observation_failure( + observation: Mapping[str, object], + *, + required_active_pods: int, + capacity_cleanup_proof: bool, +) -> str: + if not _sequence(observation.get("items")): + return ( + "No TaskExecutor Pod was observed through the local k8s selector " + "before timeout." + ) + if capacity_cleanup_proof: + return ( + f"Expected {required_active_pods} active TaskExecutor Pod(s) before " + "capacity waiting, but observed " + f"{len(_active_pod_names(observation))}." + ) + return ( + "TaskExecutor Pods were observed through the local k8s selector, but " + "the observation did not satisfy the harness readiness condition." + ) + + +def _active_pod_names(observation: Mapping[str, object]) -> list[str]: + names: list[str] = [] + raw_items = observation.get("items", []) + if not isinstance(raw_items, Sequence) or isinstance(raw_items, str): + return names + for item in raw_items: + if not isinstance(item, Mapping): + continue + phase = item.get("phase") + if phase in {"Succeeded", "Failed"}: + continue + name = item.get("name") + if isinstance(name, str) and name: + names.append(name) + return names + + +def _requires_cardinality_proof(profile: HarnessProfile) -> bool: + return _requires_cardinality_numbers( + active_pod_budget=profile.active_pod_budget, + seed_run_count=profile.seed_run_count, + ) + + +def _requires_cardinality_numbers( + *, active_pod_budget: int | None, seed_run_count: int +) -> bool: + return ( + active_pod_budget is not None + and active_pod_budget >= 2 + and seed_run_count > active_pod_budget + ) + + +def _cardinality_observation( + *, + profile: HarnessProfile, + seed_run_ids: Sequence[int], + first_taskexecutor_observation: Mapping[str, object], + after_capacity_observation: Mapping[str, object], + capacity_wait_observation: Mapping[str, object], + cleanup_observation: Mapping[str, object], + launched_after_capacity_opened: Sequence[str], +) -> dict[str, object]: + required = _requires_cardinality_proof(profile) + first_active_pods = _active_pod_names(first_taskexecutor_observation) + first_observed_pods = _pod_names(first_taskexecutor_observation) + post_capacity_pods = _pod_names(after_capacity_observation) + removed_pods = list(_sequence(cleanup_observation.get("removed_pods"))) + launched_pods = list(launched_after_capacity_opened) + expected_launched_pods = 0 + if profile.active_pod_budget is not None: + expected_launched_pods = max( + 1, profile.seed_run_count - profile.active_pod_budget + ) + observation: dict[str, object] = { + "required": required, + "observed": False, + "active_pod_budget": profile.active_pod_budget, + "seed_run_count": profile.seed_run_count, + "seed_run_ids": list(seed_run_ids), + "first_active_pods": first_active_pods, + "first_observed_pods": first_observed_pods, + "post_capacity_pods": post_capacity_pods, + "removed_pods": removed_pods, + "launched_after_capacity_opened": launched_pods, + "expected_launched_after_capacity_opened": expected_launched_pods, + } + if not required: + observation["reason"] = ( + "active Pod budget and seed count do not request cardinality" + ) + return observation + missing: list[str] = [] + if profile.active_pod_budget is None: + missing.append("active Pod budget") + elif len(first_active_pods) < profile.active_pod_budget: + missing.append( + f"{profile.active_pod_budget} first active Pods " + f"(observed {len(first_active_pods)})" + ) + if len(seed_run_ids) < profile.seed_run_count: + missing.append( + f"{profile.seed_run_count} seeded run IDs " + f"(observed {len(seed_run_ids)})" + ) + if capacity_wait_observation.get("observed") is not True: + missing.append("SuperExec capacity wait marker") + if not removed_pods: + missing.append("removed completed Pod") + if len(launched_pods) < expected_launched_pods: + missing.append( + f"{expected_launched_pods} new Pods after capacity opened " + f"(observed {len(launched_pods)})" + ) + observation["observed"] = not missing + observation["reason"] = "observed" if not missing else "; ".join(missing) + return observation + + +def _sequence(value: object) -> Sequence[object]: + return value if isinstance(value, list) else [] + + def _cleanup_observation( *, first_pod_names: Sequence[str], diff --git a/framework/dev/k8s/test-real-launch-path.sh b/framework/dev/k8s/test-real-launch-path.sh index fb4cec80fb96..20353d6e59f4 100755 --- a/framework/dev/k8s/test-real-launch-path.sh +++ b/framework/dev/k8s/test-real-launch-path.sh @@ -36,6 +36,7 @@ kubernetes_package="${KUBERNETES_PACKAGE:-}" build_images=true cleanup=true capacity_cleanup_proof=false +active_pod_budget="${ACTIVE_POD_BUDGET:-}" seed_run_count="${SEED_RUN_COUNT:-1}" probe_hold_seconds="${PROBE_HOLD_SECONDS:-0.0}" @@ -62,12 +63,17 @@ Options: --cluster-name NAME k3d cluster name (default: ${cluster_name}) --namespace NAME Kubernetes namespace (default: ${namespace}) --timeout-seconds SECS Harness wait timeout (default: ${timeout_seconds}) - --capacity-cleanup-proof Run the budget-1/two-task capacity and cleanup proof + --capacity-cleanup-proof Run the capacity and cleanup proof instead of the one-task launch-path proof + --active-pod-budget COUNT Kubernetes executor active Pod budget for + --capacity-cleanup-proof (default: 1) --seed-run-count COUNT Deterministic ServerApp runs to seed (default: ${seed_run_count}) --probe-hold-seconds SECS Seconds each probe ServerApp should stay active (default: ${probe_hold_seconds}) + --demo Demo preset: --capacity-cleanup-proof, + --active-pod-budget 4, --seed-run-count 8, + --probe-hold-seconds 45, and --skip-cleanup --platform PLATFORM Optional docker build platform, for example linux/arm64 --python-image IMAGE Optional Python base image passed to the image builder --kubernetes-package SPEC Optional Kubernetes package spec passed to the image @@ -141,14 +147,13 @@ while [[ "$#" -gt 0 ]]; do ;; --capacity-cleanup-proof) capacity_cleanup_proof=true - if [[ "${seed_run_count}" == "1" ]]; then - seed_run_count="2" - fi - if [[ "${probe_hold_seconds}" == "0.0" ]]; then - probe_hold_seconds="5.0" - fi shift ;; + --active-pod-budget) + require_value "$1" "${2:-}" + active_pod_budget="$2" + shift 2 + ;; --seed-run-count) require_value "$1" "${2:-}" seed_run_count="$2" @@ -159,6 +164,14 @@ while [[ "$#" -gt 0 ]]; do probe_hold_seconds="$2" shift 2 ;; + --demo) + capacity_cleanup_proof=true + active_pod_budget="4" + seed_run_count="8" + probe_hold_seconds="45" + cleanup=false + shift + ;; --platform) require_value "$1" "${2:-}" platform="$2" @@ -192,6 +205,18 @@ while [[ "$#" -gt 0 ]]; do esac done +if [[ "${capacity_cleanup_proof}" == true ]]; then + if [[ -z "${active_pod_budget}" ]]; then + active_pod_budget="1" + fi + if [[ "${seed_run_count}" == "1" ]]; then + seed_run_count="2" + fi + if [[ "${probe_hold_seconds}" == "0.0" ]]; then + probe_hold_seconds="5.0" + fi +fi + for command in docker k3d kubectl uv python; do if ! command -v "${command}" >/dev/null 2>&1; then die "${command} is required. Install dependencies, then rerun this script." @@ -205,6 +230,11 @@ echo "Evidence: ${output_dir}" echo "SuperLink image: ${superlink_image}" echo "SuperExec/TaskExecutor image: ${superexec_image}" echo "Capacity cleanup proof: ${capacity_cleanup_proof}" +if [[ "${capacity_cleanup_proof}" == true ]]; then + echo "Active Pod budget: ${active_pod_budget}" + echo "Seed run count: ${seed_run_count}" + echo "Probe hold seconds: ${probe_hold_seconds}" +fi echo if [[ "${build_images}" == true ]]; then @@ -250,10 +280,12 @@ verify_args=("${output_dir}") if [[ "${capacity_cleanup_proof}" == true ]]; then harness_args[1]="capacity-cleanup-proof" - harness_args+=(--active-pod-budget "1") + harness_args+=(--active-pod-budget "${active_pod_budget}") harness_args+=(--capacity-poll-interval "1.0") harness_args+=(--capacity-log-interval "1.0") verify_args+=(--expected-result "local-k8s-capacity-cleanup-proof") + verify_args+=(--expected-active-pod-budget "${active_pod_budget}") + verify_args+=(--expected-seed-run-count "${seed_run_count}") fi if [[ "${cleanup}" == true ]]; then diff --git a/framework/dev/k8s/verify_evidence.py b/framework/dev/k8s/verify_evidence.py index d31ed01224e1..7b117a316d3d 100755 --- a/framework/dev/k8s/verify_evidence.py +++ b/framework/dev/k8s/verify_evidence.py @@ -31,6 +31,8 @@ def verify_evidence( *, require_cleanup: bool = True, expected_result: str = "local-k8s-launch-path", + expected_active_pod_budget: int | None = None, + expected_seed_run_count: int | None = None, ) -> tuple[list[str], str]: """Verify a local k8s launch-path evidence bundle.""" evidence_path = Path(evidence_dir) @@ -239,6 +241,14 @@ def verify_evidence( capacity_wait = _mapping(details.get("capacity_wait")) cleanup_observed = _mapping(details.get("cleanup_observed")) if expected_result == "local-k8s-capacity-cleanup-proof": + required_active_pod_budget = ( + expected_active_pod_budget + if expected_active_pod_budget is not None + else 1 + ) + required_seed_run_count = ( + expected_seed_run_count if expected_seed_run_count is not None else 2 + ) removed_pods = _sequence(cleanup_observed.get("removed_pods")) remaining_pods = _sequence(cleanup_observed.get("remaining_pods")) removed_pod_names = {str(name) for name in removed_pods} @@ -249,18 +259,27 @@ def verify_evidence( if _mapping(pod).get("name") is not None } _expect( - details.get("active_pod_budget") == 1, - "active Pod budget is not 1", + details.get("active_pod_budget") == required_active_pod_budget, + f"active Pod budget is not {required_active_pod_budget}", failures, ) + recorded_seed_run_count = details.get("expected_seed_run_count") + if recorded_seed_run_count is not None: + _expect( + recorded_seed_run_count == required_seed_run_count, + f"expected seed run count is not {required_seed_run_count}", + failures, + ) _expect( - len(lineage_seed_run_ids) >= 2, - "capacity proof did not record at least two seeded run IDs", + len(lineage_seed_run_ids) >= required_seed_run_count, + "capacity proof did not record at least " + f"{required_seed_run_count} seeded run IDs", failures, ) _expect( - len(task_lineage_tasks) >= 2, - "capacity proof did not record at least two observed TaskExecutor tasks", + len(task_lineage_tasks) >= required_seed_run_count, + "capacity proof did not record at least " + f"{required_seed_run_count} observed TaskExecutor tasks", failures, ) _expect( @@ -298,6 +317,17 @@ def verify_evidence( "remaining cleanup Pods overlap removed Pods", failures, ) + if ( + required_active_pod_budget >= 2 + and required_seed_run_count > required_active_pod_budget + ): + _verify_cardinality_evidence( + details=details, + capacity_wait=capacity_wait, + expected_active_pod_budget=required_active_pod_budget, + expected_seed_run_count=required_seed_run_count, + failures=failures, + ) return failures, _format_report( evidence_path=evidence_path, @@ -315,6 +345,61 @@ def verify_evidence( ) +def _verify_cardinality_evidence( + *, + details: Mapping[str, object], + capacity_wait: Mapping[str, object], + expected_active_pod_budget: int, + expected_seed_run_count: int, + failures: list[str], +) -> None: + cardinality = _mapping(details.get("cardinality")) + first_active_pods = _sequence(cardinality.get("first_active_pods")) + launched_after_capacity_opened = _sequence( + cardinality.get("launched_after_capacity_opened") + ) + expected_launched_after_capacity_opened = max( + 1, expected_seed_run_count - expected_active_pod_budget + ) + _expect( + cardinality.get("observed") is True, + "TaskExecutor capacity cardinality was not observed", + failures, + ) + _expect( + len(first_active_pods) >= expected_active_pod_budget, + "cardinality proof did not record the active Pod budget full before wait", + failures, + ) + _expect( + len(launched_after_capacity_opened) + >= expected_launched_after_capacity_opened, + "cardinality proof did not record " + f"{expected_launched_after_capacity_opened} new Pod(s) after capacity opened", + failures, + ) + wait_text = _command_text(_sequence(capacity_wait.get("commands"))).lower() + _expect( + ( + f"{expected_active_pod_budget} active pods" in wait_text + and f"budget {expected_active_pod_budget}" in wait_text + ), + "capacity wait logs do not show the expected active Pod budget was full", + failures, + ) + + +def _command_text(records: Sequence[object]) -> str: + parts: list[str] = [] + for record in records: + mapping = _mapping(record) + for key in ("stdout", "stderr"): + value = mapping.get(key) + if isinstance(value, str): + parts.append(value) + return "\n".join(parts) + + def _read_json(path: Path, failures: list[str]) -> dict[str, object]: try: value = json.loads(path.read_text(encoding="utf-8")) @@ -354,6 +439,10 @@ def _format_report( ) -> str: pods = [_mapping(pod) for pod in _sequence(details.get("pods"))] taskexecutor_logs = _sequence(details.get("taskexecutor_logs")) + seed_run_ids = ", ".join( + str(item) for item in _sequence(details.get("seed_run_ids")) + ) + cardinality = _mapping(details.get("cardinality")) removed_pods = ", ".join( str(item) for item in _sequence(cleanup_observed.get("removed_pods")) ) @@ -372,6 +461,8 @@ def _format_report( f"Result: {summary.get('result')}", f"Run ID: {details.get('run_id')}", f"Seed run ID: {details.get('seed_run_id')}", + f"Seed run IDs: {seed_run_ids or ''}", + f"Active Pod budget: {details.get('active_pod_budget')}", f"TaskExecutor Pods: {len(pods)}", ] for pod in pods: @@ -388,6 +479,7 @@ def _format_report( f"TaskExecutor log captures: {len(taskexecutor_logs)}", f"TaskExecutor phases: {', '.join(pod_phases) or ''}", f"Capacity wait observed: {capacity_wait.get('observed')}", + f"Cardinality observed: {cardinality.get('observed')}", f"Removed Pods: {removed_pods or ''}", f"Removed Secrets: {removed_secrets or ''}", f"Cleanup required: {str(require_cleanup).lower()}", @@ -438,6 +530,18 @@ def _parse_args(argv: Sequence[str] | None = None) -> argparse.Namespace: default="local-k8s-launch-path", help="Expected harness result to verify.", ) + parser.add_argument( + "--expected-active-pod-budget", + type=int, + default=None, + help="Expected active Pod budget for capacity cleanup proof evidence.", + ) + parser.add_argument( + "--expected-seed-run-count", + type=int, + default=None, + help="Expected seeded ServerApp run count for capacity cleanup proof evidence.", + ) return parser.parse_args(argv) @@ -448,6 +552,8 @@ def main(argv: Sequence[str] | None = None) -> int: args.evidence_dir, require_cleanup=not args.no_require_cleanup, expected_result=args.expected_result, + expected_active_pod_budget=args.expected_active_pod_budget, + expected_seed_run_count=args.expected_seed_run_count, ) print(report, end="") return 1 if failures else 0 diff --git a/framework/py/flwr/supercore/superexec/executor/kubernetes_executor_harness_test.py b/framework/py/flwr/supercore/superexec/executor/kubernetes_executor_harness_test.py index 123f55b8eefb..cfa021c3be16 100644 --- a/framework/py/flwr/supercore/superexec/executor/kubernetes_executor_harness_test.py +++ b/framework/py/flwr/supercore/superexec/executor/kubernetes_executor_harness_test.py @@ -562,6 +562,24 @@ def test_render_appio_seed_manifests_can_create_two_held_runs() -> None: assert "K8s launch seed created run_ids=" in seed_script +def test_render_appio_seed_manifests_can_create_three_demo_runs() -> None: + """Test seed manifests can create three deterministic held ServerApp tasks.""" + profile = harness_module.generic_k3d_profile() + profile.seed_run_count = 3 + profile.probe_hold_seconds = 30.0 + + manifests = harness_module.render_appio_seed_manifests(profile, "k8s-test") + + seed_config, seed_job = manifests + seed_script = seed_config["data"]["seed_run.py"] + container = seed_job["spec"]["template"]["spec"]["containers"][0] + assert "--run-count" in container["args"] + assert "3" in container["args"] + assert "--probe-hold-seconds" in container["args"] + assert "30.0" in container["args"] + assert "K8s launch seed created run_ids=" in seed_script + + def test_rendered_local_k8s_outputs_do_not_use_sprint_identifiers() -> None: """Test rendered launch objects do not expose sprint-local identifiers.""" profile = harness_module.generic_k3d_profile() @@ -865,6 +883,128 @@ def test_run_capacity_cleanup_proof_records_wait_cleanup_and_second_launch( ) +def test_run_capacity_cleanup_proof_records_budget_two_cardinality( + tmp_path: Path, +) -> None: + """Test budget-2/three-task evidence captures cardinality.""" + runner = _CardinalityRunner() + output_dir = tmp_path / "cardinality-real" + profile = harness_module.generic_k3d_profile() + profile.active_pod_budget = 2 + profile.seed_run_count = 3 + profile.probe_hold_seconds = 30.0 + + summary = harness_module.run_local_k8s_launch_path( + output_dir, + profile=profile, + runner=runner, + execute=True, + apply_manifests=True, + import_images=True, + capacity_cleanup_proof=True, + ) + + assert summary.status == "passed" + assert summary.result == "local-k8s-capacity-cleanup-proof" + assert summary.details["seed_run_ids"] == [123, 456, 789] + assert summary.details["active_pod_budget"] == 2 + assert summary.details["capacity_wait"]["observed"] is True + assert summary.details["cardinality"] == { + "required": True, + "observed": True, + "active_pod_budget": 2, + "seed_run_count": 3, + "seed_run_ids": [123, 456, 789], + "first_active_pods": [ + "flwr-taskexecutor-123-abc", + "flwr-taskexecutor-456-def", + ], + "first_observed_pods": [ + "flwr-taskexecutor-123-abc", + "flwr-taskexecutor-456-def", + ], + "post_capacity_pods": [ + "flwr-taskexecutor-456-def", + "flwr-taskexecutor-789-ghi", + ], + "removed_pods": ["flwr-taskexecutor-123-abc"], + "launched_after_capacity_opened": ["flwr-taskexecutor-789-ghi"], + "expected_launched_after_capacity_opened": 1, + "reason": "observed", + } + + lineage = json.loads((output_dir / "task-lineage.json").read_text()) + assert lineage["seeded_run_ids"] == [123, 456, 789] + assert lineage["seeded_task_count"] == 3 + assert lineage["observed_task_count"] == 3 + assert [task["pod_name"] for task in lineage["tasks"]] == [ + "flwr-taskexecutor-123-abc", + "flwr-taskexecutor-456-def", + "flwr-taskexecutor-789-ghi", + ] + checklist = json.loads((output_dir / "proof-checklist.json").read_text()) + assert not any("cardinality" in item for item in checklist["out_of_scope"]) + assert any("active concurrently" in claim["claim"] for claim in checklist["claims"]) + assert any( + "waiting TaskExecutor Pods launched" in claim["claim"] + for claim in checklist["claims"] + ) + + +def test_run_capacity_cleanup_proof_waits_for_all_seeded_large_cardinality( + tmp_path: Path, +) -> None: + """Test a 4/8 proof polls until all seeded TaskExecutor Pods are observed.""" + seed_run_ids = [101, 202, 303, 404, 505, 606, 707, 808] + pod_names = [_pod_name_for_run_id(run_id) for run_id in seed_run_ids] + runner = _CardinalityRunner( + active_pod_budget=4, + seed_run_ids=seed_run_ids, + pod_waves=[ + pod_names[:4], + pod_names[1:5], + pod_names[1:6], + pod_names[1:7], + pod_names[1:8], + ], + final_pod_names=pod_names[1:], + ) + output_dir = tmp_path / "large-cardinality-real" + profile = harness_module.generic_k3d_profile() + profile.active_pod_budget = 4 + profile.seed_run_count = 8 + profile.probe_hold_seconds = 30.0 + + summary = harness_module.run_local_k8s_launch_path( + output_dir, + profile=profile, + runner=runner, + execute=True, + apply_manifests=True, + import_images=True, + capacity_cleanup_proof=True, + ) + + assert summary.status == "passed" + assert summary.details["seed_run_ids"] == seed_run_ids + assert summary.details["cardinality"]["first_active_pods"] == pod_names[:4] + assert summary.details["cardinality"]["launched_after_capacity_opened"] == ( + pod_names[4:] + ) + assert ( + summary.details["cardinality"]["expected_launched_after_capacity_opened"] == 4 + ) + assert summary.details["cardinality"]["post_capacity_pods"] == pod_names[1:] + assert summary.details["cleanup_observed"]["removed_pods"] == [pod_names[0]] + assert [pod["phase"] for pod in summary.details["pods"]] == ["Succeeded"] * 7 + + lineage = json.loads((output_dir / "task-lineage.json").read_text()) + assert lineage["seeded_run_ids"] == seed_run_ids + assert lineage["seeded_task_count"] == 8 + assert lineage["observed_task_count"] == 8 + assert [task["pod_name"] for task in lineage["tasks"]] == pod_names + + def test_run_local_k8s_launch_path_polls_until_taskexecutor_pod_appears( tmp_path: Path, ) -> None: @@ -996,6 +1136,85 @@ def test_verify_capacity_cleanup_evidence_accepts_passing_bundle( assert "Removed Pods: flwr-taskexecutor-123-abc" in report +def test_verify_capacity_cleanup_evidence_accepts_cardinality_bundle( + tmp_path: Path, +) -> None: + """Test the verifier accepts budget-2/three-task cardinality evidence.""" + output_dir = tmp_path / "evidence" + _write_verifier_evidence( + output_dir, + result="local-k8s-capacity-cleanup-proof", + active_pod_budget=2, + seed_run_ids=[123, 456, 789], + ) + + failures, report = verifier_module.verify_evidence( + output_dir, + expected_result="local-k8s-capacity-cleanup-proof", + expected_active_pod_budget=2, + expected_seed_run_count=3, + ) + + assert failures == [] + assert "Verification: PASSED" in report + assert "Active Pod budget: 2" in report + assert "Cardinality observed: True" in report + + +def test_verify_capacity_cleanup_evidence_accepts_large_cardinality_bundle( + tmp_path: Path, +) -> None: + """Test the verifier accepts budget-4/eight-task cardinality evidence.""" + output_dir = tmp_path / "evidence" + _write_verifier_evidence( + output_dir, + result="local-k8s-capacity-cleanup-proof", + active_pod_budget=4, + seed_run_ids=[101, 202, 303, 404, 505, 606, 707, 808], + ) + + failures, report = verifier_module.verify_evidence( + output_dir, + expected_result="local-k8s-capacity-cleanup-proof", + expected_active_pod_budget=4, + expected_seed_run_count=8, + ) + + assert failures == [] + assert "Verification: PASSED" in report + assert "Active Pod budget: 4" in report + assert "Task lineage records: 8" in report + + +def test_verify_capacity_cleanup_evidence_rejects_incomplete_cardinality_count( + tmp_path: Path, +) -> None: + """Test larger cardinality verification counts post-capacity launches.""" + output_dir = tmp_path / "evidence" + _write_verifier_evidence( + output_dir, + result="local-k8s-capacity-cleanup-proof", + active_pod_budget=4, + seed_run_ids=[101, 202, 303, 404, 505, 606, 707, 808], + ) + summary_path = output_dir / "summary.json" + summary = json.loads(summary_path.read_text(encoding="utf-8")) + summary["details"]["cardinality"]["launched_after_capacity_opened"] = [ + _pod_name_for_run_id(505) + ] + summary_path.write_text(json.dumps(summary), encoding="utf-8") + + failures, report = verifier_module.verify_evidence( + output_dir, + expected_result="local-k8s-capacity-cleanup-proof", + expected_active_pod_budget=4, + expected_seed_run_count=8, + ) + + assert any("4 new Pod(s) after capacity opened" in failure for failure in failures) + assert "Verification: FAILED" in report + + def test_verify_capacity_cleanup_evidence_rejects_single_task_record( tmp_path: Path, ) -> None: @@ -1014,7 +1233,7 @@ def test_verify_capacity_cleanup_evidence_rejects_single_task_record( ) assert any( - "at least two observed TaskExecutor tasks" in failure for failure in failures + "at least 2 observed TaskExecutor tasks" in failure for failure in failures ) assert "Verification: FAILED" in report @@ -1262,6 +1481,134 @@ def _result( ) +class _CardinalityRunner: + """Fake command runner for execute-mode cardinality evidence.""" + + def __init__( + self, + *, + active_pod_budget: int = 2, + seed_run_ids: list[int] | None = None, + pod_waves: list[list[str]] | None = None, + final_pod_names: list[str] | None = None, + ) -> None: + self.commands: list[list[str]] = [] + self.pod_get_count = 0 + self.secret_get_count = 0 + self.superexec_log_count = 0 + self.active_pod_budget = active_pod_budget + self.seed_run_ids = seed_run_ids or [123, 456, 789] + self.pod_names = [_pod_name_for_run_id(run_id) for run_id in self.seed_run_ids] + self.final_pod_names = final_pod_names or self.pod_names[1:] + self.pod_waves = pod_waves or [ + self.pod_names[:active_pod_budget], + self.final_pod_names, + ] + + def run(self, args: list[str]) -> Any: + """Return realistic command output for the cardinality proof.""" + self.commands.append(list(args)) + if args[:3] == ["docker", "image", "inspect"]: + return self._result(args) + if args[:3] == ["k3d", "cluster", "list"]: + return self._result(args, stdout="NAME\nflower-local-k8s\n") + if args[:3] == ["k3d", "image", "import"]: + return self._result(args, stdout="imported\n") + if "auth" in args and "can-i" in args: + allowed = _RealLaunchRunner._rbac_allowed(args) + return self._result( + args, + returncode=0 if allowed else 1, + stdout="yes\n" if allowed else "no\n", + ) + if "wait" in args and "--for=jsonpath={.status.phase}=Succeeded" in args: + return self._result(args) + if "get" in args and "pods" in args and "-o" in args and "json" in args: + self.pod_get_count += 1 + if self.pod_get_count <= len(self.pod_waves): + pod_names = self.pod_waves[self.pod_get_count - 1] + phase = "Running" + else: + pod_names = self.final_pod_names + phase = "Succeeded" + return self._result( + args, + stdout=json.dumps( + _pod_list_items( + *[ + _taskexecutor_pod(pod_name, phase) + for pod_name in pod_names + ] + ) + ), + ) + if "get" in args and "secrets" in args and "-o" in args and "json" in args: + self.secret_get_count += 1 + if self.secret_get_count == 1: + secret_names = self.pod_names[: self.active_pod_budget] + else: + secret_names = self.final_pod_names + return self._result( + args, + stdout=json.dumps( + _secret_list_for_names( + *[f"{pod_name}-appio" for pod_name in secret_names] + ) + ), + ) + if "get" in args and "jobs" in args and "-o" in args and "json" in args: + return self._result(args, stdout=json.dumps(_object_list("Job"))) + if "get" in args and "services" in args and "-o" in args and "json" in args: + return self._result(args, stdout=json.dumps(_object_list("Service"))) + if "get" in args and "namespace" in args and "-o" in args and "json" in args: + return self._result(args, stdout=json.dumps(_namespace())) + if "logs" in args and "job/flower-local-k8s-seed-run" in args: + seed_log_lines = [ + f"K8s launch seed created run_id={run_id}\n" + for run_id in self.seed_run_ids + ] + seed_log_lines.append( + "K8s launch seed created run_ids=" + f"{','.join(str(run_id) for run_id in self.seed_run_ids)}\n" + ) + return self._result( + args, + stdout="".join(seed_log_lines), + ) + if "logs" in args and "pod/flower-superexec" in args: + self.superexec_log_count += 1 + stdout = "claim launch task_id taskexecutor\n" + if self.superexec_log_count > 1: + stdout += ( + "Waiting for Kubernetes TaskExecutor capacity: " + f"{self.active_pod_budget} active Pods, " + f"budget {self.active_pod_budget}, " + "selector app.kubernetes.io/name=flower\n" + ) + return self._result(args, stdout=stdout) + matching_pod_names = [ + pod_name for pod_name in self.final_pod_names if f"pod/{pod_name}" in args + ] + if "logs" in args and matching_pod_names: + task_id = matching_pod_names[0].split("-")[2] + return self._result( + args, + stdout=f"K8s launch probe ServerApp ran run_id={task_id}\n", + ) + return self._result(args) + + @staticmethod + def _result( + args: list[str], *, returncode: int = 0, stdout: str = "", stderr: str = "" + ) -> Any: + return harness_module.CommandResult( + args=list(args), + returncode=returncode, + stdout=stdout, + stderr=stderr, + ) + + def _pod_list(phase: str) -> dict[str, Any]: return { "items": [ @@ -1408,6 +1755,13 @@ def _secret_list_for_name(name: str) -> dict[str, Any]: } +def _secret_list_for_names(*names: str) -> dict[str, Any]: + items: list[dict[str, Any]] = [] + for name in names: + items.extend(_secret_list_for_name(name)["items"]) + return {"items": items} + + def _object_list(kind: str) -> dict[str, Any]: return { "items": [ @@ -1433,35 +1787,44 @@ def _namespace() -> dict[str, Any]: } +def _pod_name_for_run_id(run_id: int) -> str: + suffixes = {123: "abc", 456: "def", 789: "ghi"} + return f"flwr-taskexecutor-{run_id}-{suffixes.get(run_id, 'xyz')}" + + def _write_verifier_evidence( output_dir: Path, *, taskexecutor_log_text: str = "K8s launch probe ServerApp ran\n", result: str = "local-k8s-launch-path", + active_pod_budget: int | None = None, + seed_run_ids: list[int] | None = None, ) -> None: (output_dir / "diagnostics").mkdir(parents=True) capacity_proof = result == "local-k8s-capacity-cleanup-proof" - final_pod_name = ( - "flwr-taskexecutor-456-def" if capacity_proof else "flwr-taskexecutor-test" - ) - lineage_tasks = ( - [ - { - "pod_name": "flwr-taskexecutor-123-abc", - "credential_secret_name": "flwr-taskexecutor-123-abc-appio", - }, - { - "pod_name": final_pod_name, - "credential_secret_name": f"{final_pod_name}-appio", - }, - ] - if capacity_proof - else [ - { - "pod_name": final_pod_name, - "credential_secret_name": f"{final_pod_name}-appio", - } - ] + if capacity_proof: + active_pod_budget = active_pod_budget if active_pod_budget is not None else 1 + seed_run_ids = seed_run_ids or [123, 456] + else: + seed_run_ids = seed_run_ids or [123] + pod_names = [_pod_name_for_run_id(run_id) for run_id in seed_run_ids] + final_pod_name = pod_names[-1] if capacity_proof else "flwr-taskexecutor-test" + if not capacity_proof: + pod_names = [final_pod_name] + lineage_tasks = [ + { + "pod_name": pod_name, + "credential_secret_name": f"{pod_name}-appio", + } + for pod_name in pod_names + ] + first_active_pods = pod_names[: active_pod_budget or 1] + launched_after_capacity_opened = pod_names[active_pod_budget or 1 :] + cardinality_observed = ( + capacity_proof + and active_pod_budget is not None + and active_pod_budget >= 2 + and len(pod_names) > active_pod_budget ) summary = { "status": "passed", @@ -1470,8 +1833,9 @@ def _write_verifier_evidence( "details": { "run_id": "k8s-launch-test", "seed_run_id": 123, - "seed_run_ids": [123, 456] if capacity_proof else [123], - "active_pod_budget": 1 if capacity_proof else None, + "seed_run_ids": seed_run_ids, + "expected_seed_run_count": len(seed_run_ids), + "active_pod_budget": active_pod_budget if capacity_proof else None, "dry_run": False, "image_preflight": { "docker_inspect": {"returncode": 0}, @@ -1480,7 +1844,22 @@ def _write_verifier_evidence( "rbac": {"status": "passed"}, "pods": [{"name": final_pod_name, "phase": "Succeeded"}], "taskexecutor_logs": [{"returncode": 0}], - "capacity_wait": {"observed": capacity_proof}, + "capacity_wait": { + "observed": capacity_proof, + "commands": ( + [ + { + "stdout": ( + "Waiting for Kubernetes TaskExecutor capacity: " + f"{active_pod_budget} active Pods, " + f"budget {active_pod_budget}\n" + ) + } + ] + if capacity_proof + else [] + ), + }, "cleanup_observed": { "observed": capacity_proof, "removed_pods": ( @@ -1491,6 +1870,17 @@ def _write_verifier_evidence( ), "remaining_pods": [final_pod_name] if capacity_proof else [], }, + "cardinality": { + "required": cardinality_observed, + "observed": cardinality_observed, + "active_pod_budget": active_pod_budget, + "seed_run_count": len(seed_run_ids), + "first_active_pods": first_active_pods, + "launched_after_capacity_opened": launched_after_capacity_opened, + "expected_launched_after_capacity_opened": max( + 1, len(seed_run_ids) - (active_pod_budget or 1) + ), + }, "cleanup": {"requested": True, "result": {"returncode": 0}}, }, } @@ -1512,8 +1902,8 @@ def _write_verifier_evidence( json.dumps( { "seeded_run_id": 123, - "seeded_run_ids": [123, 456] if capacity_proof else [123], - "seeded_task_count": 2 if capacity_proof else 1, + "seeded_run_ids": seed_run_ids, + "seeded_task_count": len(seed_run_ids), "observed_task_count": len(lineage_tasks), "tasks": lineage_tasks, } @@ -1553,9 +1943,13 @@ def _write_verifier_evidence( { "claims": [{"claim": "TaskExecutor Pod observed"}], "out_of_scope": ( - ["budget-2/three-task cardinality behavior"] - if capacity_proof - else ["capacity wait proof"] + [] + if cardinality_observed + else ( + ["capacity cardinality behavior"] + if capacity_proof + else ["capacity wait proof"] + ) ), } ),