diff --git a/CHANGELOG.md b/CHANGELOG.md index 86059fe..7464518 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Discriminated ISO probe cycle outcomes (`CycleResult`: success / empty / failed) with distinct logging and `/health` fields (`last_cycle_status`, `last_cycle_error`). +- Atomic `SchedulerSnapshot` for `/health` scheduler extras (`last_updated`, `poll_count`, probe stats) published under a lock from `Scheduler.health_snapshot()`. - Post the same Slack **status** summary as the interactive command to `NOTIFICATION_CHANNEL` once when the process starts (when that channel is configured). - Open-source hygiene: contributing guide, security policy, code of conduct, onboarding and handoff docs, pre-commit (Ruff), GitHub issue templates, Dependabot, CodeQL, CODEOWNERS template, and `.gitattributes`. diff --git a/docs/architecture.md b/docs/architecture.md index 719c64b..5c902fd 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -8,7 +8,7 @@ These components share one thread and the main event loop. They may await I/O bu - **`Scheduler.run_forever` / `poll_once`** — orchestrates index refresh, probing, and notifications. - **`WG21Index.refresh`** — fetches and parses wg21.link index (httpx async). -- **`ISOProber.run_cycle` / `_probe_one`** — concurrent HEAD probes via `asyncio.gather` and an httpx async client. +- **`ISOProber.run_cycle` / `_probe_one`** — concurrent HEAD probes via `asyncio.gather` and an httpx async client. `run_cycle` returns a discriminated `CycleResult` (success / empty / failed). - **Slack Bolt handlers** — run on Bolt’s thread; they should not read mutable source state directly (use snapshots or health callbacks). `ISOProber._stats` is updated from many coroutines in one `run_cycle()`. This is safe on the event loop because asyncio never preempts between awaits. A `threading.Lock` guards `_stats` as defense-in-depth if code is ever called from a worker thread by mistake. @@ -17,7 +17,7 @@ These components share one thread and the main event loop. They may await I/O bu | Thread | Role | |--------|------| -| **Health server** (`health.py`) | Serves `GET /health`; reads `len(index.papers)` via a callback and scheduler snapshot fields. | +| **Health server** (`health.py`) | Serves `GET /health`; reads `len(index.papers)` via a callback and scheduler fields from `Scheduler.health_snapshot()` (immutable snapshot, lock-protected publish). | | **MessageQueue sender** (`scout.py`) | Drains Slack post queue with rate limiting. | | **`run_blocking_io` / `asyncio.to_thread`** | Runs blocking psycopg2 calls (e.g. `UserWatchlist.matches_for_users`) off the loop. | diff --git a/src/paperscout/__main__.py b/src/paperscout/__main__.py index a318975..e466fe0 100644 --- a/src/paperscout/__main__.py +++ b/src/paperscout/__main__.py @@ -27,6 +27,74 @@ log = logging.getLogger("paperscout") +# MessageQueue keys allowed in /health extras (must not overlap scheduler.health_snapshot()). +_MQ_HEALTH_FIELD_NAMES = frozenset( + { + "mq_depth", + "mq_max_size", + "mq_utilization", + "mq_circuit_state", + } +) + + +def _mq_health_fields(mq: MessageQueue) -> dict: + """MQ metrics for /health; from health_fields() when present, else depth only.""" + if hasattr(mq, "health_fields"): + try: + raw = mq.health_fields() + except Exception as exc: + log.warning( + "health: mq.health_fields() failed for %s id=%s: %s", + type(mq).__name__, + id(mq), + exc, + exc_info=True, + ) + try: + return {"mq_depth": mq.depth()} + except Exception: + log.warning( + "health: mq.depth() fallback failed; omitting MQ fields", + exc_info=True, + ) + return {} + if isinstance(raw, dict): + return raw + log.warning("health: mq.health_fields() returned non-dict, using mq_depth only") + try: + return {"mq_depth": mq.depth()} + except Exception: + log.warning("health: mq.depth() failed; omitting MQ fields", exc_info=True) + return {} + + +def _merge_extra_health_fields( + scheduler_snap: dict, + mq_extra: dict, + db_pool: dict, +) -> dict: + """Merge health JSON with scheduler winning on key conflicts.""" + scheduler_keys = set(scheduler_snap) + mq_filtered: dict = {} + for key, value in mq_extra.items(): + if key in _MQ_HEALTH_FIELD_NAMES: + if key in scheduler_keys: + log.debug( + "health: mq_extra key %r conflicts with scheduler snapshot; scheduler wins", + key, + ) + else: + mq_filtered[key] = value + elif key in scheduler_keys: + log.debug( + "health: mq_extra key %r not allow-listed; scheduler snapshot kept", + key, + ) + else: + log.debug("health: mq_extra key %r not allow-listed, dropping", key) + return {**scheduler_snap, **mq_filtered, "db_pool": db_pool} + def _setup_logging(data_dir: Path, console_level: str = "INFO", retention_days: int = 7) -> None: """Console + daily rotating file logging; third-party loggers capped at WARNING.""" @@ -141,20 +209,11 @@ def _pool_status(p) -> dict: ) def _extra_health_fields() -> dict: - lsp = scheduler._last_successful_poll - s = scheduler._last_probe_stats - # HTTP 200 outcomes / non-skipped probe attempts (excludes skipped_discovered, skipped_in_index). - hits = s.get("hit_recent", 0) + s.get("hit_old", 0) + s.get("hit_no_lm", 0) - attempted = hits + s.get("miss", 0) + s.get("error", 0) - probe_success_rate = hits / attempted if attempted > 0 else None - return { - "last_successful_poll": ( - datetime.fromtimestamp(lsp, tz=timezone.utc).isoformat() if lsp else None - ), - "probe_success_rate": probe_success_rate, - "mq_depth": mq.depth(), - "db_pool": _pool_status(pool), - } + return _merge_extra_health_fields( + scheduler.health_snapshot(), + _mq_health_fields(mq), + _pool_status(pool), + ) register_handlers(app, user_watchlist, state, paper_count_fn, launch_time) diff --git a/src/paperscout/config.py b/src/paperscout/config.py index ac83213..c8cc622 100644 --- a/src/paperscout/config.py +++ b/src/paperscout/config.py @@ -84,8 +84,12 @@ class Settings(BaseSettings): notification_channel: str = "" # Slack channel ID for ops alerts (stale poll). Empty = disabled. ops_alert_channel: str = "" - # Log a warning when MessageQueue depth reaches or exceeds this (unbounded queue). + # Log a warning when MessageQueue depth reaches or exceeds this (legacy threshold). mq_backpressure_threshold: int = Field(default=100, ge=1) + mq_max_size: int = Field(default=1000, ge=1) + mq_max_retries: int = Field(default=10, ge=0) + mq_circuit_breaker_threshold: int = Field(default=5, ge=1) + mq_circuit_breaker_cooldown_seconds: int = Field(default=60, ge=1) notify_on_frontier_hit: bool = True notify_on_any_draft: bool = True # Alert when a D-paper we previously probed appears in the wg21.link index diff --git a/src/paperscout/health.py b/src/paperscout/health.py index 26597a2..18257dc 100644 --- a/src/paperscout/health.py +++ b/src/paperscout/health.py @@ -61,7 +61,9 @@ def do_GET(self) -> None: except Exception: log.exception("health: extra_fields_fn failed") extra = {} - body = json.dumps({**base, **extra}).encode() + # Base handler fields win if extra_fields_fn returns overlapping keys. + safe_extra = {k: v for k, v in extra.items() if k not in base} + body = json.dumps({**base, **safe_extra}).encode() self.send_response(200) self.send_header("Content-Type", "application/json") diff --git a/src/paperscout/models.py b/src/paperscout/models.py index cfdcc40..d137308 100644 --- a/src/paperscout/models.py +++ b/src/paperscout/models.py @@ -177,6 +177,38 @@ class ProbeHit: is_recent: bool = False +class CycleStatus(str, Enum): + """Outcome of one ``ISOProber.run_cycle()`` invocation.""" + + SUCCESS = "success" + EMPTY = "empty" + FAILED = "failed" + + +@dataclass(frozen=True, slots=True) +class CycleResult: + """Discriminated probe cycle result (success vs empty vs failed).""" + + status: CycleStatus + results: tuple[ProbeHit, ...] = () + error: str | None = None + + @property + def hits(self) -> list[ProbeHit]: + """Probe hits when ``status`` is ``SUCCESS``; otherwise empty.""" + return list(self.results) if self.status == CycleStatus.SUCCESS else [] + + def __post_init__(self) -> None: + if self.status == CycleStatus.FAILED and not self.error: + raise ValueError("CycleResult FAILED must carry a non-empty error string") + if self.status == CycleStatus.SUCCESS and not self.results: + raise ValueError("CycleResult SUCCESS must carry at least one ProbeHit") + if self.status == CycleStatus.EMPTY and self.results: + raise ValueError("CycleResult EMPTY must not carry results") + if self.status != CycleStatus.FAILED and self.error is not None: + raise ValueError("CycleResult error is only valid for FAILED status") + + @dataclass class PerUserMatches: """One user's watchlist hits: ``(paper|hit, 'author'|'paper')`` tuples.""" diff --git a/src/paperscout/monitor.py b/src/paperscout/monitor.py index 2632f54..4a8790f 100644 --- a/src/paperscout/monitor.py +++ b/src/paperscout/monitor.py @@ -3,18 +3,22 @@ from __future__ import annotations import asyncio +import copy import logging +import threading import time -from collections.abc import Callable +from collections.abc import Callable, Mapping from dataclasses import dataclass from datetime import datetime, timezone +from types import MappingProxyType +from typing import Any import httpx from .concurrency import run_blocking_io from .config import Settings, settings from .errors import ConfigurationError, FailureCategory -from .models import Paper, PerUserMatches, ProbeHit +from .models import CycleResult, CycleStatus, Paper, PerUserMatches, ProbeHit from .sources import ISOProber, WG21Index from .storage import ProbeState, UserWatchlist @@ -99,6 +103,44 @@ def __init__( self.per_user_matches = per_user_matches or {} +# ── Health snapshot (issue 04) ─────────────────────────────────────────────── + + +def _compute_probe_success_rate(stats: dict[str, int]) -> float | None: + """HTTP 200 outcomes / non-skipped probe attempts.""" + hits = stats.get("hit_recent", 0) + stats.get("hit_old", 0) + stats.get("hit_no_lm", 0) + attempted = hits + stats.get("miss", 0) + stats.get("error", 0) + return hits / attempted if attempted > 0 else None + + +@dataclass(frozen=True, slots=True) +class SchedulerSnapshot: + """Immutable scheduler state published for the health endpoint. + + ``probe_stats`` is a read-only ``MappingProxyType`` view so callers cannot + mutate stats through the frozen snapshot object. + """ + + last_updated: str + poll_count: int + last_successful_poll: str | None + last_cycle_status: str | None + last_cycle_error: str | None + probe_stats: Mapping[str, int] + probe_success_rate: float | None + + +_HEALTH_SNAPSHOT_DEFAULTS: dict[str, Any] = { + "last_updated": None, + "poll_count": 0, + "last_successful_poll": None, + "last_cycle_status": None, + "last_cycle_error": None, + "probe_stats": {}, + "probe_success_rate": None, +} + + # ── Scheduler ──────────────────────────────────────────────────────────────── @@ -127,7 +169,70 @@ def __init__( self._poll_count = 0 self._last_successful_poll: float | None = None self._last_probe_stats: dict[str, int] = {} + self._last_cycle_status: CycleStatus | None = None + self._last_cycle_error: str | None = None self._last_ops_alert: float | None = None + self._health_lock = threading.Lock() + self._health_snapshot: SchedulerSnapshot | None = None + + def _probe_hits_from_cycle(self, cycle: CycleResult) -> list[ProbeHit]: + """Extract hits and record last cycle status for health / staleness.""" + self._last_cycle_status = cycle.status + self._last_cycle_error = cycle.error + if cycle.status is CycleStatus.SUCCESS: + return cycle.hits + if cycle.status is CycleStatus.EMPTY: + log.info("POLL probe cycle empty") + return [] + if cycle.status is CycleStatus.FAILED: + log.error("POLL probe cycle failed: %s", cycle.error) + return [] + log.error("POLL unknown cycle status: %s", cycle.status) + return [] + + def _record_probe_cycle_completion(self) -> None: + """Update probe stats after any completed cycle (including FAILED).""" + self._last_probe_stats = self.prober.snapshot_stats() + + def _mark_poll_successful_if_probe_ok(self) -> None: + """Advance staleness clock only when the last probe cycle did not fail.""" + if self._last_cycle_status is not CycleStatus.FAILED: + self._last_successful_poll = time.time() + + def _publish_health_snapshot(self) -> None: + """Publish immutable snapshot for cross-thread health reads (event loop only).""" + lsp = self._last_successful_poll + stats = dict(self._last_probe_stats) + snap = SchedulerSnapshot( + last_updated=datetime.now(timezone.utc).isoformat(), + poll_count=self._poll_count, + last_successful_poll=( + datetime.fromtimestamp(lsp, tz=timezone.utc).isoformat() if lsp else None + ), + last_cycle_status=(self._last_cycle_status.value if self._last_cycle_status else None), + last_cycle_error=self._last_cycle_error, + probe_stats=MappingProxyType(stats), + probe_success_rate=_compute_probe_success_rate(stats), + ) + with self._health_lock: + self._health_snapshot = snap + + def health_snapshot(self) -> dict[str, Any]: + """Return a consistent copy of scheduler fields for ``/health`` extras.""" + with self._health_lock: + snap = self._health_snapshot + if snap is None: + return copy.deepcopy(_HEALTH_SNAPSHOT_DEFAULTS) + # Build explicitly: probe_stats is a MappingProxyType (not deepcopy-able via asdict). + return { + "last_updated": snap.last_updated, + "poll_count": snap.poll_count, + "last_successful_poll": snap.last_successful_poll, + "last_cycle_status": snap.last_cycle_status, + "last_cycle_error": snap.last_cycle_error, + "probe_stats": dict(snap.probe_stats), + "probe_success_rate": snap.probe_success_rate, + } async def seed(self) -> SeedResult: """Gather current index and probe state. @@ -147,7 +252,9 @@ async def seed(self) -> SeedResult: hits: list[ProbeHit] = [] if self.cfg.enable_iso_probe: - hits = await self.prober.run_cycle() + cycle = await self.prober.run_cycle() + hits = self._probe_hits_from_cycle(cycle) + self._record_probe_cycle_completion() log.info("SEED isocpp.org probe existing=%d", len(hits)) self._seeded = True @@ -169,8 +276,13 @@ async def poll_once(self) -> PollResult: if not self._seeded: seed_result = await self.seed() if not seed_result.had_prior_state: - self._last_successful_poll = time.time() - self._last_probe_stats = self.prober.snapshot_stats() + if self.cfg.enable_iso_probe: + # Stats already recorded in seed() after run_cycle. + self._mark_poll_successful_if_probe_ok() + else: + self._last_successful_poll = time.time() + self._record_probe_cycle_completion() + self._publish_health_snapshot() return PollResult( diff=DiffResult(new_papers=[], updated_papers=[]), probe_hits=[], @@ -208,8 +320,13 @@ async def poll_once(self) -> PollResult: ) if self.notify_callback: self.notify_callback(result) - self._last_successful_poll = time.time() - self._last_probe_stats = self.prober.snapshot_stats() + if self.cfg.enable_iso_probe: + # Stats already recorded in seed() after run_cycle. + self._mark_poll_successful_if_probe_ok() + else: + self._last_successful_poll = time.time() + self._record_probe_cycle_completion() + self._publish_health_snapshot() return result previous = dict(self._previous_papers) @@ -239,7 +356,9 @@ async def poll_once(self) -> PollResult: probe_hits: list[ProbeHit] = [] if self.cfg.enable_iso_probe: - probe_hits = await self.prober.run_cycle() + cycle = await self.prober.run_cycle() + probe_hits = self._probe_hits_from_cycle(cycle) + self._record_probe_cycle_completion() recent_hits = [h for h in probe_hits if h.is_recent] old_hits = [h for h in probe_hits if not h.is_recent] @@ -275,12 +394,16 @@ async def poll_once(self) -> PollResult: "D-TO-P id=%s draft=%s draft-lm=%s draft-discovered=%s", paper.id, d_url, - datetime.fromtimestamp(lm_ts, tz=timezone.utc).strftime("%Y-%m-%d") - if lm_ts - else "unknown", - datetime.fromtimestamp(disc_ts, tz=timezone.utc).strftime("%Y-%m-%d") - if disc_ts - else "unknown", + ( + datetime.fromtimestamp(lm_ts, tz=timezone.utc).strftime("%Y-%m-%d") + if lm_ts + else "unknown" + ), + ( + datetime.fromtimestamp(disc_ts, tz=timezone.utc).strftime("%Y-%m-%d") + if disc_ts + else "unknown" + ), ) break @@ -324,8 +447,13 @@ async def poll_once(self) -> PollResult: len(dp_transitions), len(per_user_matches), ) - self._last_successful_poll = time.time() - self._last_probe_stats = self.prober.snapshot_stats() + if self.cfg.enable_iso_probe: + # Stats already recorded above after run_cycle. + self._mark_poll_successful_if_probe_ok() + else: + self._last_successful_poll = time.time() + self._record_probe_cycle_completion() + self._publish_health_snapshot() return result async def run_forever(self) -> None: diff --git a/src/paperscout/scout.py b/src/paperscout/scout.py index 31e8e6f..50dff76 100644 --- a/src/paperscout/scout.py +++ b/src/paperscout/scout.py @@ -7,6 +7,7 @@ import threading import time from datetime import datetime, timezone +from typing import Any from slack_bolt import App from slack_sdk.errors import SlackApiError @@ -54,6 +55,19 @@ def depth(self) -> int: """Approximate number of messages waiting to be sent (see ``queue.Queue.qsize``).""" return self._q.qsize() + def health_fields(self) -> dict[str, Any]: + """Metrics for the ``/health`` endpoint (merged by ``__main__``).""" + d = self.depth() + m = settings.mq_max_size + utilization = (d / m) if m else 0.0 + utilization = min(1.0, max(0.0, utilization)) + return { + "mq_depth": d, + "mq_max_size": m, + "mq_utilization": round(utilization, 4), + "mq_circuit_state": "closed", + } + def enqueue(self, channel: str, text: str, **kwargs) -> None: """Queue a ``chat.postMessage`` for *channel* (or user id for DMs).""" from .config import settings diff --git a/src/paperscout/sources.py b/src/paperscout/sources.py index 50dfc3b..6596cf3 100644 --- a/src/paperscout/sources.py +++ b/src/paperscout/sources.py @@ -18,7 +18,7 @@ from .config import Settings, settings from .errors import FailureCategory -from .models import Paper, ProbeHit, Tier +from .models import CycleResult, CycleStatus, Paper, ProbeHit, Tier from .storage import PaperCache, ProbeState, UserWatchlist log = logging.getLogger(__name__) @@ -309,52 +309,69 @@ def snapshot_stats(self) -> dict[str, int]: # ── Public API ─────────────────────────────────────────────────────────── - async def run_cycle(self) -> list[ProbeHit]: - """HEAD all scheduled URLs; return recent hits and persist discovery state.""" + async def run_cycle(self) -> CycleResult: + """HEAD all scheduled URLs; return discriminated cycle outcome. + + Per-URL ``error`` stats alone do not fail the cycle if the cycle + finished and persisted state; only cycle-level failures return ``FAILED``. + """ self._cycle += 1 self._reset_stats() t0 = time.monotonic() + urls: list[_Entry] = [] + hot_count = 0 + cold_count = 0 - urls = self._build_probe_list() - known_ids = self.index.get_known_paper_ids() - hot_count = sum(1 for u in urls if u[1] in (Tier.WATCHLIST, Tier.FRONTIER, Tier.RECENT)) - cold_count = sum(1 for u in urls if u[1] == Tier.COLD) - slice_idx = (self._cycle - 1) % self.cfg.cold_cycle_divisor - log.info( - "PROBE-START cycle=%d total=%d hot=%d cold=%d slice=%d/%d", - self._cycle, - len(urls), - hot_count, - cold_count, - slice_idx, - self.cfg.cold_cycle_divisor, - ) + try: + urls = self._build_probe_list() + known_ids = self.index.get_known_paper_ids() + hot_count = sum(1 for u in urls if u[1] in (Tier.WATCHLIST, Tier.FRONTIER, Tier.RECENT)) + cold_count = sum(1 for u in urls if u[1] == Tier.COLD) + slice_idx = (self._cycle - 1) % self.cfg.cold_cycle_divisor + log.info( + "PROBE-START cycle=%d total=%d hot=%d cold=%d slice=%d/%d", + self._cycle, + len(urls), + hot_count, + cold_count, + slice_idx, + self.cfg.cold_cycle_divisor, + ) - sem = asyncio.Semaphore(self.cfg.http_concurrency) - hits: list[ProbeHit] = [] + sem = asyncio.Semaphore(self.cfg.http_concurrency) + hits: list[ProbeHit] = [] - async with httpx.AsyncClient( - http2=self.cfg.http_use_http2, - timeout=self.cfg.http_timeout_seconds, - follow_redirects=True, - ) as client: - tasks = [ - self._probe_one(client, sem, url, prefix, num, rev, ext, tier, known_ids) - for url, tier, prefix, num, rev, ext in urls - ] - results = await asyncio.gather(*tasks, return_exceptions=True) - for r in results: - if isinstance(r, ProbeHit): - hits.append(r) - elif isinstance(r, Exception): - log.debug("Unhandled exception from _probe_one: %s", r) - - for hit in hits: - lm_ts = hit.last_modified.timestamp() if hit.last_modified else None - self.state.mark_discovered(hit.url, last_modified_ts=lm_ts) - - self.state.touch_poll() - self.state.save() + async with httpx.AsyncClient( + http2=self.cfg.http_use_http2, + timeout=self.cfg.http_timeout_seconds, + follow_redirects=True, + ) as client: + tasks = [ + self._probe_one(client, sem, url, prefix, num, rev, ext, tier, known_ids) + for url, tier, prefix, num, rev, ext in urls + ] + results = await asyncio.gather(*tasks, return_exceptions=True) + for r in results: + if isinstance(r, ProbeHit): + hits.append(r) + elif isinstance(r, Exception): + log.debug("Unhandled exception from _probe_one: %s", r) + + for hit in hits: + lm_ts = hit.last_modified.timestamp() if hit.last_modified else None + self.state.mark_discovered(hit.url, last_modified_ts=lm_ts) + + self.state.touch_poll() + self.state.save() + except Exception as exc: + elapsed = time.monotonic() - t0 + log.error( + "PROBE-FAILED cycle=%d elapsed=%.1fs error=%s", + self._cycle, + elapsed, + exc, + ) + return CycleResult(CycleStatus.FAILED, error=str(exc)) elapsed = time.monotonic() - t0 s = self.snapshot_stats() @@ -375,11 +392,25 @@ async def run_cycle(self) -> list[ProbeHit]: s["skipped_in_index"], s["error"], ) + + if hits: + status = CycleStatus.SUCCESS + log.info("PROBE-SUCCESS cycle=%d hits=%d", self._cycle, len(hits)) + else: + status = CycleStatus.EMPTY + log.info( + "PROBE-EMPTY cycle=%d total=%d err=%d", + self._cycle, + len(urls), + s["error"], + ) + log.info( "PROBE-CYCLE-SUMMARY %s", json.dumps( { "cycle": self._cycle, + "cycle_status": status.value, "cycle_requests": len(urls), "cycle_duration_s": round(elapsed, 2), "hot_probes": hot_count, @@ -395,7 +426,9 @@ async def run_cycle(self) -> list[ProbeHit]: } ), ) - return hits + if status == CycleStatus.SUCCESS: + return CycleResult(CycleStatus.SUCCESS, results=tuple(hits)) + return CycleResult(CycleStatus.EMPTY) # ── Probe-list builders ────────────────────────────────────────────────── @@ -628,7 +661,6 @@ async def _probe_one( last_modified=last_modified, is_recent=is_recent, ) - return None # ═══════════════════════════════════════════════════════════════════════════ diff --git a/tests/conftest.py b/tests/conftest.py index a19f546..6db6308 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -238,6 +238,10 @@ def make_test_settings(**overrides) -> Settings: notify_on_frontier_hit=True, notify_on_any_draft=True, notify_on_dp_transition=True, + mq_max_size=100, + mq_max_retries=3, + mq_circuit_breaker_threshold=5, + mq_circuit_breaker_cooldown_seconds=60, data_dir=Path(tempfile.mkdtemp(prefix="paperscout-test-")), cache_ttl_hours=1, ) diff --git a/tests/test_health.py b/tests/test_health.py index fb14e10..c258d97 100644 --- a/tests/test_health.py +++ b/tests/test_health.py @@ -2,8 +2,12 @@ from __future__ import annotations +import dataclasses import json +import threading +import time import urllib.request +from dataclasses import dataclass from datetime import datetime, timezone import pytest @@ -39,7 +43,12 @@ def health_url_with_extras(): state, lambda: 42, extra_fields_fn=lambda: { + "last_updated": "2026-03-16T12:00:00+00:00", "last_successful_poll": "2026-03-16T12:00:00+00:00", + "last_cycle_status": "success", + "last_cycle_error": None, + "poll_count": 1, + "probe_stats": {}, "probe_success_rate": 0.5, "mq_depth": 3, "db_pool": {"max": 10, "in_use": 1, "available": 9}, @@ -106,8 +115,130 @@ def test_iso_probe_flag_follows_config_settings(self, health_url): def test_health_extra_fields_merged(self, health_url_with_extras): data = json.loads(urllib.request.urlopen(f"{health_url_with_extras}/health").read()) assert "version" in data + assert data["last_updated"] == "2026-03-16T12:00:00+00:00" + assert data["last_cycle_status"] == "success" assert "last_successful_poll" in data assert data["last_successful_poll"] == "2026-03-16T12:00:00+00:00" assert data["probe_success_rate"] == 0.5 assert data["mq_depth"] == 3 assert data["db_pool"] == {"max": 10, "in_use": 1, "available": 9} + + +@dataclass(frozen=True, slots=True) +class _TestSnapshot: + last_updated: str + poll_count: int + last_cycle_status: str + + +class _ConcurrentSnapshotPublisher: + """Minimal stand-in for Scheduler health_snapshot under concurrent updates.""" + + def __init__(self) -> None: + self._lock = threading.Lock() + self._snap: _TestSnapshot | None = None + self._stop = threading.Event() + + def publish(self, poll_count: int, status: str) -> None: + snap = _TestSnapshot( + last_updated=datetime.now(timezone.utc).isoformat(), + poll_count=poll_count, + last_cycle_status=status, + ) + with self._lock: + self._snap = snap + + def health_snapshot(self) -> dict: + with self._lock: + snap = self._snap + if snap is None: + return { + "last_updated": None, + "poll_count": 0, + "last_cycle_status": None, + } + return dataclasses.asdict(snap) + + def run_updates(self) -> None: + n = 0 + while not self._stop.is_set(): + self.publish(n, "success" if n % 2 == 0 else "empty") + n += 1 + time.sleep(0.005) + + def stop(self) -> None: + self._stop.set() + + +class TestHealthExtraFieldsSafety: + def test_extra_fields_cannot_overwrite_base_handler_keys(self): + port = _find_free_port() + launch = datetime(2026, 3, 16, 10, 0, 0, tzinfo=timezone.utc) + state = _FakeState() + server = start_health_server( + port, + launch, + state, + lambda: 42, + extra_fields_fn=lambda: { + "version": "evil", + "uptime_seconds": -1, + "mq_depth": 2, + }, + ) + try: + data = json.loads(urllib.request.urlopen(f"http://127.0.0.1:{port}/health").read()) + assert data["version"] != "evil" + assert data["uptime_seconds"] >= 0 + assert data["mq_depth"] == 2 + finally: + server.shutdown() + + def test_extra_fields_fn_failure_returns_base_only(self): + port = _find_free_port() + launch = datetime(2026, 3, 16, 10, 0, 0, tzinfo=timezone.utc) + + def _boom(): + raise RuntimeError("snapshot unavailable") + + server = start_health_server( + port, + launch, + _FakeState(), + lambda: 0, + extra_fields_fn=_boom, + ) + try: + data = json.loads(urllib.request.urlopen(f"http://127.0.0.1:{port}/health").read()) + assert "version" in data + assert "last_poll" in data + assert "last_updated" not in data + finally: + server.shutdown() + + +class TestHealthSnapshotConcurrency: + def test_health_snapshot_consistent_under_concurrent_updates(self): + port = _find_free_port() + launch = datetime(2026, 3, 16, 10, 0, 0, tzinfo=timezone.utc) + publisher = _ConcurrentSnapshotPublisher() + publisher.publish(0, "success") + updater = threading.Thread(target=publisher.run_updates, daemon=True) + updater.start() + server = start_health_server( + port, + launch, + _FakeState(), + lambda: 0, + extra_fields_fn=publisher.health_snapshot, + ) + try: + for _ in range(50): + data = json.loads(urllib.request.urlopen(f"http://127.0.0.1:{port}/health").read()) + assert data["last_updated"] is not None + assert data["poll_count"] >= 0 + assert data["last_cycle_status"] in ("success", "empty", None) + finally: + publisher.stop() + updater.join(timeout=2) + server.shutdown() diff --git a/tests/test_main_health_merge.py b/tests/test_main_health_merge.py new file mode 100644 index 0000000..ab376bc --- /dev/null +++ b/tests/test_main_health_merge.py @@ -0,0 +1,73 @@ +"""Tests for __main__ health field merge helpers.""" + +from __future__ import annotations + +import logging +from unittest.mock import MagicMock + +from paperscout.__main__ import _merge_extra_health_fields, _mq_health_fields +from paperscout.scout import MessageQueue + + +def test_merge_scheduler_wins_on_key_conflict(caplog): + scheduler = { + "last_updated": "2026-01-01T00:00:00+00:00", + "poll_count": 1, + "last_cycle_status": "empty", + } + mq_extra = { + "mq_depth": 5, + "last_updated": "should-not-win", + "poll_count": 99, + } + with caplog.at_level(logging.DEBUG, logger="paperscout"): + out = _merge_extra_health_fields(scheduler, mq_extra, {"max": 10}) + assert out["last_updated"] == "2026-01-01T00:00:00+00:00" + assert out["poll_count"] == 1 + assert out["mq_depth"] == 5 + assert out["db_pool"] == {"max": 10} + assert any("not allow-listed" in r.message for r in caplog.records) + + +def test_merge_drops_unknown_mq_keys(caplog): + scheduler = {"last_updated": None, "poll_count": 0} + mq_extra = {"mq_depth": 1, "evil_key": True} + with caplog.at_level(logging.DEBUG, logger="paperscout"): + out = _merge_extra_health_fields(scheduler, mq_extra, {}) + assert out["mq_depth"] == 1 + assert "evil_key" not in out + assert any("not allow-listed" in r.message for r in caplog.records) + + +def test_mq_health_fields_falls_back_on_exception(caplog): + mq = MagicMock() + mq.health_fields.side_effect = RuntimeError("boom") + mq.depth.return_value = 7 + with caplog.at_level(logging.WARNING, logger="paperscout"): + fields = _mq_health_fields(mq) + assert fields == {"mq_depth": 7} + assert any("health_fields() failed" in r.message for r in caplog.records) + + +def test_mq_health_fields_uses_health_fields_method(): + mq = MessageQueue(MagicMock()) + fields = _mq_health_fields(mq) + assert fields == mq.health_fields() + assert fields["mq_depth"] == 0 + assert fields["mq_circuit_state"] == "closed" + assert fields["mq_utilization"] == 0.0 + + +def test_merge_includes_allowlisted_mq_fields(): + scheduler = {"last_updated": None, "poll_count": 0} + mq_extra = { + "mq_depth": 2, + "mq_max_size": 1000, + "mq_utilization": 0.002, + "mq_circuit_state": "closed", + } + out = _merge_extra_health_fields(scheduler, mq_extra, {}) + assert out["mq_depth"] == 2 + assert out["mq_max_size"] == 1000 + assert out["mq_utilization"] == 0.002 + assert out["mq_circuit_state"] == "closed" diff --git a/tests/test_message_queue.py b/tests/test_message_queue.py index 9832ddd..7db6a3a 100644 --- a/tests/test_message_queue.py +++ b/tests/test_message_queue.py @@ -21,6 +21,26 @@ def _slack_error(status: int, headers: dict | None = None) -> SlackApiError: class TestMessageQueueDirect: """Exercise ``_throttle`` / ``_send_with_retry`` without starting the daemon thread.""" + def test_health_fields_reports_depth_and_utilization(self): + mq = MessageQueue(MagicMock()) + mq.enqueue("C1", "x") + fields = mq.health_fields() + assert fields["mq_depth"] == 1 + assert fields["mq_max_size"] >= 1 + assert 0.0 <= fields["mq_utilization"] <= 1.0 + assert fields["mq_circuit_state"] == "closed" + + def test_health_fields_clamps_utilization_when_depth_exceeds_max(self): + mq = MessageQueue(MagicMock()) + with patch("paperscout.scout.settings") as cfg: + cfg.mq_max_size = 2 + for i in range(5): + mq.enqueue(f"C{i}", "x") + fields = mq.health_fields() + assert fields["mq_depth"] == 5 + assert fields["mq_max_size"] == 2 + assert fields["mq_utilization"] == 1.0 + def test_send_success_updates_last_send(self): app = MagicMock() mq = MessageQueue(app) diff --git a/tests/test_models_cycle_result.py b/tests/test_models_cycle_result.py new file mode 100644 index 0000000..9b82299 --- /dev/null +++ b/tests/test_models_cycle_result.py @@ -0,0 +1,51 @@ +"""Tests for CycleResult invariants (w4_issue_01).""" + +from __future__ import annotations + +import pytest + +from paperscout.models import CycleResult, CycleStatus, ProbeHit, Tier + + +def _hit() -> ProbeHit: + return ProbeHit( + url="https://isocpp.org/files/papers/D0001R0.pdf", + prefix="D", + number=1, + revision=0, + extension=".pdf", + tier=Tier.COLD, + ) + + +def test_cycle_result_empty_valid(): + r = CycleResult(CycleStatus.EMPTY) + assert r.hits == [] + assert r.error is None + + +def test_cycle_result_success_valid(): + h = _hit() + r = CycleResult(CycleStatus.SUCCESS, results=(h,)) + assert r.hits == [h] + + +def test_cycle_result_failed_valid(): + r = CycleResult(CycleStatus.FAILED, error="timeout") + assert r.hits == [] + assert r.error == "timeout" + + +@pytest.mark.parametrize( + "status,results,error", + [ + (CycleStatus.FAILED, (), None), + (CycleStatus.FAILED, (), ""), + (CycleStatus.SUCCESS, (), None), + (CycleStatus.EMPTY, (_hit(),), None), + (CycleStatus.SUCCESS, (), "oops"), + ], +) +def test_cycle_result_invalid(status, results, error): + with pytest.raises(ValueError): + CycleResult(status, results=results, error=error) diff --git a/tests/test_monitor.py b/tests/test_monitor.py index 56bb870..f83edf8 100644 --- a/tests/test_monitor.py +++ b/tests/test_monitor.py @@ -11,7 +11,7 @@ import pytest from paperscout.errors import ConfigurationError -from paperscout.models import Paper, PerUserMatches, ProbeHit +from paperscout.models import CycleResult, CycleStatus, Paper, PerUserMatches, ProbeHit from paperscout.monitor import ( DiffResult, PollResult, @@ -37,6 +37,18 @@ def _recent_hit(**kwargs) -> ProbeHit: return ProbeHit(**defaults) +def _empty_cycle() -> CycleResult: + return CycleResult(CycleStatus.EMPTY) + + +def _success_cycle(hits: list[ProbeHit]) -> CycleResult: + return CycleResult(CycleStatus.SUCCESS, results=tuple(hits)) + + +def _failed_cycle(error: str = "probe failed") -> CycleResult: + return CycleResult(CycleStatus.FAILED, error=error) + + def _old_hit(**kwargs) -> ProbeHit: defaults = dict( url="https://isocpp.org/files/papers/D8888R0.pdf", @@ -182,7 +194,7 @@ def _make_scheduler(fake_pool, **cfg_overrides): index.refresh = AsyncMock() index.papers = {} prober = MagicMock(spec=ISOProber) - prober.run_cycle = AsyncMock(return_value=[]) + prober.run_cycle = AsyncMock(return_value=_empty_cycle()) prober.snapshot_stats = MagicMock(return_value={}) prober._stats = {} user_watchlist = MagicMock(spec=UserWatchlist) @@ -218,7 +230,7 @@ async def test_poll_once_detects_new_papers(self, fake_pool): new_paper = Paper(id="P9999R0", title="New", author="Author", date="2024-01-01") index.papers = {"P9999R0": new_paper} - prober.run_cycle = AsyncMock(return_value=[]) + prober.run_cycle = AsyncMock(return_value=_empty_cycle()) result = await scheduler.poll_once() assert len(result.diff.new_papers) == 1 @@ -229,7 +241,7 @@ async def test_poll_once_surfaces_only_recent_probe_hits(self, fake_pool): recent = _recent_hit() old = _old_hit() index.papers = {} - prober.run_cycle = AsyncMock(return_value=[recent, old]) + prober.run_cycle = AsyncMock(return_value=_success_cycle([recent, old])) result = await scheduler.poll_once() assert len(result.probe_hits) == 1 assert result.probe_hits[0].is_recent is True @@ -245,7 +257,7 @@ async def test_poll_once_detects_dp_transition(self, fake_pool): id="P9999R0", title="New Published Paper", author="Author", date="2025-01-01" ) index.papers = {"P9999R0": new_paper} - prober.run_cycle = AsyncMock(return_value=[]) + prober.run_cycle = AsyncMock(return_value=_empty_cycle()) result = await scheduler.poll_once() assert len(result.dp_transitions) == 1 @@ -260,7 +272,7 @@ async def test_poll_once_dp_skip_non_p_papers(self, fake_pool): n_paper = Paper(id="N4950", title="Working Draft", author="Ed", date="2025-01-01") index.papers = {"N4950": n_paper} - prober.run_cycle = AsyncMock(return_value=[]) + prober.run_cycle = AsyncMock(return_value=_empty_cycle()) result = await scheduler.poll_once() assert result.dp_transitions == [] @@ -271,7 +283,7 @@ async def test_poll_once_no_dp_transition_when_no_draft(self, fake_pool): new_paper = Paper(id="P8888R0", title="Entirely New", author="X", date="2025-01-01") index.papers = {"P8888R0": new_paper} - prober.run_cycle = AsyncMock(return_value=[]) + prober.run_cycle = AsyncMock(return_value=_empty_cycle()) result = await scheduler.poll_once() assert result.dp_transitions == [] @@ -286,7 +298,7 @@ async def test_poll_once_dp_transition_logged(self, fake_pool, caplog): state.mark_discovered(draft_url) new_paper = Paper(id="P7777R0", title="X", author="Y", date="2025-01-01") index.papers = {"P7777R0": new_paper} - prober.run_cycle = AsyncMock(return_value=[]) + prober.run_cycle = AsyncMock(return_value=_empty_cycle()) with caplog.at_level(logging.INFO): result = await scheduler.poll_once() @@ -311,7 +323,7 @@ async def test_poll_once_logs_updated_papers(self, fake_pool, caplog): scheduler._previous_papers = {"P9999R0": old_paper} updated_paper = Paper(id="P9999R0", title="New Title", author="A", date="2024-01-01") index.papers = {"P9999R0": updated_paper} - prober.run_cycle = AsyncMock(return_value=[]) + prober.run_cycle = AsyncMock(return_value=_empty_cycle()) with caplog.at_level(logging.DEBUG): await scheduler.poll_once() assert "INDEX-UPD" in caplog.text @@ -323,7 +335,7 @@ async def test_poll_old_hits_logged(self, fake_pool, caplog): await scheduler.poll_once() old = _old_hit() index.papers = {} - prober.run_cycle = AsyncMock(return_value=[old]) + prober.run_cycle = AsyncMock(return_value=_success_cycle([old])) with caplog.at_level(logging.INFO): result = await scheduler.poll_once() assert result.probe_hits == [] @@ -335,7 +347,7 @@ async def test_poll_once_populates_per_user_matches(self, fake_pool): new_paper = Paper(id="P9999R0", title="Senders", author="Eric Niebler", date="2024-01-01") index.papers = {"P9999R0": new_paper} - prober.run_cycle = AsyncMock(return_value=[]) + prober.run_cycle = AsyncMock(return_value=_empty_cycle()) user_watchlist.matches_for_users.return_value = { "U123": PerUserMatches(papers=[(new_paper, "author")], probe_hits=[]) @@ -349,7 +361,7 @@ async def test_poll_once_per_user_probe_hit(self, fake_pool): await scheduler.poll_once() hit = _recent_hit(front_text="written by eric niebler") - prober.run_cycle = AsyncMock(return_value=[hit]) + prober.run_cycle = AsyncMock(return_value=_success_cycle([hit])) index.papers = {} user_watchlist.matches_for_users.return_value = { @@ -381,7 +393,7 @@ async def test_restart_with_prior_poll_notifies_seed_hits(self, fake_pool): scheduler.notify_callback = notified.append state.touch_poll() hit = _recent_hit() - prober.run_cycle = AsyncMock(return_value=[hit]) + prober.run_cycle = AsyncMock(return_value=_success_cycle([hit])) user_watchlist.matches_for_users.return_value = { "U123": PerUserMatches(papers=[], probe_hits=[(hit, "author")]) } @@ -396,7 +408,7 @@ async def test_restart_with_discovered_urls_notifies(self, fake_pool): scheduler.notify_callback = notified.append state.mark_discovered("https://isocpp.org/files/papers/D1111R0.pdf") hit = _recent_hit() - prober.run_cycle = AsyncMock(return_value=[hit]) + prober.run_cycle = AsyncMock(return_value=_success_cycle([hit])) user_watchlist.matches_for_users.return_value = { "U123": PerUserMatches(papers=[], probe_hits=[(hit, "author")]) } @@ -412,7 +424,7 @@ async def test_restart_seed_old_hits_not_in_result(self, fake_pool, caplog): scheduler.notify_callback = notified.append state.touch_poll() old = _old_hit() - prober.run_cycle = AsyncMock(return_value=[old]) + prober.run_cycle = AsyncMock(return_value=_success_cycle([old])) with caplog.at_level(logging.INFO): result = await scheduler.poll_once() assert result.probe_hits == [] @@ -438,7 +450,7 @@ async def test_seed_marks_discovered(self, fake_pool): async def fake_run_cycle(): state.mark_discovered(hit.url) - return [hit] + return _success_cycle([hit]) prober.run_cycle = AsyncMock(side_effect=fake_run_cycle) seed_result = await scheduler.seed() @@ -515,6 +527,59 @@ async def mock_poll_once(): assert "failure_category=NETWORK" in caplog.text assert call_count == 2 + async def test_failed_probe_cycle_does_not_advance_last_successful_poll_normal_path( + self, fake_pool + ): + """Main poll path: FAILED cycle must not advance staleness clock.""" + scheduler, index, prober, _, _ = _make_scheduler(fake_pool) + await scheduler.poll_once() + before = scheduler._last_successful_poll + index.papers = {} + prober.run_cycle = AsyncMock(return_value=_failed_cycle("network down")) + await scheduler.poll_once() + assert scheduler._last_successful_poll == before + assert scheduler._last_cycle_status == CycleStatus.FAILED + + async def test_failed_probe_cycle_does_not_advance_last_successful_poll_seed_early_return( + self, fake_pool + ): + """Fresh deploy (line 172 path): FAILED seed probe must not set last_successful_poll.""" + scheduler, _, prober, _, state = _make_scheduler(fake_pool) + assert state.last_poll == 0 + assert len(state.get_all_discovered()) == 0 + prober.run_cycle = AsyncMock(return_value=_failed_cycle("connect error")) + await scheduler.poll_once() + assert scheduler._last_successful_poll is None + assert scheduler._last_cycle_status == CycleStatus.FAILED + + async def test_health_snapshot_after_poll(self, fake_pool): + scheduler, _, _, _, _ = _make_scheduler(fake_pool) + await scheduler.poll_once() + snap = scheduler.health_snapshot() + assert snap["last_updated"] is not None + assert snap["poll_count"] >= 1 + + def test_health_snapshot_defaults_are_independent_copies(self, fake_pool): + scheduler, _, _, _, _ = _make_scheduler(fake_pool) + a = scheduler.health_snapshot() + b = scheduler.health_snapshot() + a["probe_stats"]["miss"] = 999 + assert b["probe_stats"] == {} + assert a is not b + + async def test_publish_health_snapshot_probe_stats_readonly(self, fake_pool): + from paperscout.monitor import SchedulerSnapshot + + scheduler, _, prober, _, _ = _make_scheduler(fake_pool) + prober.snapshot_stats = MagicMock(return_value={"miss": 1, "error": 0}) + await scheduler.poll_once() + with scheduler._health_lock: + stored = scheduler._health_snapshot + assert isinstance(stored, SchedulerSnapshot) + with pytest.raises(TypeError): + stored.probe_stats["miss"] = 999 # type: ignore[index] + assert scheduler.health_snapshot()["probe_stats"]["miss"] == 1 + async def test_run_forever_halts_on_configuration_error(self, fake_pool, caplog): scheduler, _, _, _, _ = _make_scheduler(fake_pool, poll_interval_minutes=30) diff --git a/tests/test_sources.py b/tests/test_sources.py index d39cb30..6c1fe09 100644 --- a/tests/test_sources.py +++ b/tests/test_sources.py @@ -11,7 +11,7 @@ import httpx import pytest -from paperscout.models import Paper +from paperscout.models import CycleStatus, Paper from paperscout.sources import ( ISOProber, WG21Index, @@ -1089,9 +1089,10 @@ async def mock_get(url, **kwargs): with patch("paperscout.sources.httpx.AsyncClient") as mock_cls: mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_cls.return_value.__aexit__ = AsyncMock(return_value=False) - hits = await prober.run_cycle() + cycle = await prober.run_cycle() - assert any(h.number == 9999 for h in hits) + assert cycle.status == CycleStatus.SUCCESS + assert any(h.number == 9999 for h in cycle.hits) assert state.is_discovered(hit_url) async def test_run_cycle_non_recent_hit_still_discovered(self, fake_pool): @@ -1124,10 +1125,11 @@ async def mock_head(url, **_): with patch("paperscout.sources.httpx.AsyncClient") as mock_cls: mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) mock_cls.return_value.__aexit__ = AsyncMock(return_value=False) - hits = await prober.run_cycle() + cycle = await prober.run_cycle() # Hit is returned (for the discovered registry) but is_recent=False - old_hits = [h for h in hits if h.number == 9998] + assert cycle.status == CycleStatus.SUCCESS + old_hits = [h for h in cycle.hits if h.number == 9998] assert len(old_hits) == 1 assert old_hits[0].is_recent is False assert state.is_discovered(hit_url) @@ -1171,6 +1173,59 @@ async def test_run_cycle_stats_integrity_under_concurrency(self, fake_pool): assert accounted == len(urls) assert s["miss"] == len(urls) # all 404 in this mock + async def test_run_cycle_empty(self, fake_pool): + """All 404 responses → EMPTY, not FAILED.""" + index = WG21Index(fake_pool) + index._max_p = 100 + index._max_rev = {99: 0, 100: 0} + index._sorted_p_nums = [99, 100] + state = ProbeState(fake_pool) + cfg = make_test_settings( + hot_lookback_months=0, + hot_revision_depth=1, + frontier_window_above=0, + frontier_window_below=0, + gap_max_rev=0, + cold_cycle_divisor=100, + ) + prober = ISOProber(index, state, user_watchlist=_mock_wl([9999]), cfg=cfg) + mock_client = _make_async_client(head_resp=_make_response(404)) + with patch("paperscout.sources.httpx.AsyncClient") as mock_cls: + mock_cls.return_value.__aenter__ = AsyncMock(return_value=mock_client) + mock_cls.return_value.__aexit__ = AsyncMock(return_value=False) + cycle = await prober.run_cycle() + assert cycle.status == CycleStatus.EMPTY + assert cycle.hits == [] + assert cycle.error is None + + async def test_run_cycle_failed(self, fake_pool): + """Client context failure → FAILED with error detail.""" + index = WG21Index(fake_pool) + index._max_p = 100 + index._max_rev = {99: 0, 100: 0} + index._sorted_p_nums = [99, 100] + state = ProbeState(fake_pool) + cfg = make_test_settings( + hot_lookback_months=0, + hot_revision_depth=1, + frontier_window_above=0, + frontier_window_below=0, + gap_max_rev=0, + cold_cycle_divisor=100, + ) + prober = ISOProber(index, state, user_watchlist=_mock_wl([9999]), cfg=cfg) + + with patch("paperscout.sources.httpx.AsyncClient") as mock_cls: + mock_cls.return_value.__aenter__ = AsyncMock( + side_effect=httpx.ConnectError("connection refused") + ) + mock_cls.return_value.__aexit__ = AsyncMock(return_value=False) + cycle = await prober.run_cycle() + + assert cycle.status == CycleStatus.FAILED + assert cycle.error + assert "connection refused" in cycle.error + # ── open-std.org scraper ─────────────────────────────────────────────────────