Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Discriminated ISO probe cycle outcomes (`CycleResult`: success / empty / failed) with distinct logging and `/health` fields (`last_cycle_status`, `last_cycle_error`).
- Atomic `SchedulerSnapshot` for `/health` scheduler extras (`last_updated`, `poll_count`, probe stats) published under a lock from `Scheduler.health_snapshot()`.
- Post the same Slack **status** summary as the interactive command to `NOTIFICATION_CHANNEL` once when the process starts (when that channel is configured).
- Open-source hygiene: contributing guide, security policy, code of conduct, onboarding and handoff docs, pre-commit (Ruff), GitHub issue templates, Dependabot, CodeQL, CODEOWNERS template, and `.gitattributes`.

Expand Down
4 changes: 2 additions & 2 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ These components share one thread and the main event loop. They may await I/O bu

- **`Scheduler.run_forever` / `poll_once`** — orchestrates index refresh, probing, and notifications.
- **`WG21Index.refresh`** — fetches and parses wg21.link index (httpx async).
- **`ISOProber.run_cycle` / `_probe_one`** — concurrent HEAD probes via `asyncio.gather` and an httpx async client.
- **`ISOProber.run_cycle` / `_probe_one`** — concurrent HEAD probes via `asyncio.gather` and an httpx async client. `run_cycle` returns a discriminated `CycleResult` (success / empty / failed).
- **Slack Bolt handlers** — run on Bolt’s thread; they should not read mutable source state directly (use snapshots or health callbacks).

`ISOProber._stats` is updated from many coroutines in one `run_cycle()`. This is safe on the event loop because asyncio never preempts between awaits. A `threading.Lock` guards `_stats` as defense-in-depth if code is ever called from a worker thread by mistake.
Expand All @@ -17,7 +17,7 @@ These components share one thread and the main event loop. They may await I/O bu

| Thread | Role |
|--------|------|
| **Health server** (`health.py`) | Serves `GET /health`; reads `len(index.papers)` via a callback and scheduler snapshot fields. |
| **Health server** (`health.py`) | Serves `GET /health`; reads `len(index.papers)` via a callback and scheduler fields from `Scheduler.health_snapshot()` (immutable snapshot, lock-protected publish). |
| **MessageQueue sender** (`scout.py`) | Drains Slack post queue with rate limiting. |
| **`run_blocking_io` / `asyncio.to_thread`** | Runs blocking psycopg2 calls (e.g. `UserWatchlist.matches_for_users`) off the loop. |

Expand Down
16 changes: 2 additions & 14 deletions src/paperscout/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,20 +141,8 @@ def _pool_status(p) -> dict:
)

def _extra_health_fields() -> dict:
lsp = scheduler._last_successful_poll
s = scheduler._last_probe_stats
# HTTP 200 outcomes / non-skipped probe attempts (excludes skipped_discovered, skipped_in_index).
hits = s.get("hit_recent", 0) + s.get("hit_old", 0) + s.get("hit_no_lm", 0)
attempted = hits + s.get("miss", 0) + s.get("error", 0)
probe_success_rate = hits / attempted if attempted > 0 else None
return {
"last_successful_poll": (
datetime.fromtimestamp(lsp, tz=timezone.utc).isoformat() if lsp else None
),
"probe_success_rate": probe_success_rate,
"mq_depth": mq.depth(),
"db_pool": _pool_status(pool),
}
mq_extra = mq.health_fields() if hasattr(mq, "health_fields") else {"mq_depth": mq.depth()}
return {**scheduler.health_snapshot(), **mq_extra, "db_pool": _pool_status(pool)}

register_handlers(app, user_watchlist, state, paper_count_fn, launch_time)

Expand Down
22 changes: 22 additions & 0 deletions src/paperscout/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,28 @@ class ProbeHit:
is_recent: bool = False


class CycleStatus(str, Enum):
"""Outcome of one ``ISOProber.run_cycle()`` invocation."""

SUCCESS = "success"
EMPTY = "empty"
FAILED = "failed"


@dataclass(frozen=True, slots=True)
class CycleResult:
"""Discriminated probe cycle result (success vs empty vs failed)."""

status: CycleStatus
results: tuple[ProbeHit, ...] = ()
error: str | None = None

@property
def hits(self) -> list[ProbeHit]:
"""Probe hits when ``status`` is ``SUCCESS``; otherwise empty."""
return list(self.results) if self.status == CycleStatus.SUCCESS else []


@dataclass
class PerUserMatches:
"""One user's watchlist hits: ``(paper|hit, 'author'|'paper')`` tuples."""
Expand Down
127 changes: 118 additions & 9 deletions src/paperscout/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@
from __future__ import annotations

import asyncio
import dataclasses
import logging
import threading
import time
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import Any

import httpx

from .concurrency import run_blocking_io
from .config import Settings, settings
from .errors import ConfigurationError, FailureCategory
from .models import Paper, PerUserMatches, ProbeHit
from .models import CycleResult, CycleStatus, Paper, PerUserMatches, ProbeHit
from .sources import ISOProber, WG21Index
from .storage import ProbeState, UserWatchlist

Expand Down Expand Up @@ -99,6 +102,40 @@ def __init__(
self.per_user_matches = per_user_matches or {}


# ── Health snapshot (issue 04) ───────────────────────────────────────────────


def _compute_probe_success_rate(stats: dict[str, int]) -> float | None:
"""HTTP 200 outcomes / non-skipped probe attempts."""
hits = stats.get("hit_recent", 0) + stats.get("hit_old", 0) + stats.get("hit_no_lm", 0)
attempted = hits + stats.get("miss", 0) + stats.get("error", 0)
return hits / attempted if attempted > 0 else None


@dataclass(frozen=True, slots=True)
class SchedulerSnapshot:
"""Immutable scheduler state published for the health endpoint."""

last_updated: str
poll_count: int
last_successful_poll: str | None
last_cycle_status: str | None
last_cycle_error: str | None
probe_stats: dict[str, int]
probe_success_rate: float | None


_HEALTH_SNAPSHOT_DEFAULTS: dict[str, Any] = {
"last_updated": None,
"poll_count": 0,
"last_successful_poll": None,
"last_cycle_status": None,
"last_cycle_error": None,
"probe_stats": {},
"probe_success_rate": None,
}
Comment thread
coderabbitai[bot] marked this conversation as resolved.


# ── Scheduler ────────────────────────────────────────────────────────────────


Expand Down Expand Up @@ -127,7 +164,60 @@ def __init__(
self._poll_count = 0
self._last_successful_poll: float | None = None
self._last_probe_stats: dict[str, int] = {}
self._last_cycle_status: CycleStatus | None = None
self._last_cycle_error: str | None = None
self._last_ops_alert: float | None = None
self._health_lock = threading.Lock()
self._health_snapshot: SchedulerSnapshot | None = None

def _probe_hits_from_cycle(self, cycle: CycleResult) -> list[ProbeHit]:
"""Extract hits and record last cycle status for health / staleness."""
self._last_cycle_status = cycle.status
self._last_cycle_error = cycle.error
match cycle.status:
case CycleStatus.SUCCESS:
return cycle.hits
case CycleStatus.EMPTY:
log.info("POLL probe cycle empty")
return []
case CycleStatus.FAILED:
log.error("POLL probe cycle failed: %s", cycle.error)
return []

def _record_probe_cycle_completion(self) -> None:
"""Update probe stats after any completed cycle (including FAILED)."""
self._last_probe_stats = self.prober.snapshot_stats()

def _mark_poll_successful_if_probe_ok(self) -> None:
"""Advance staleness clock only when the last probe cycle did not fail."""
if self._last_cycle_status is not CycleStatus.FAILED:
self._last_successful_poll = time.time()

def _publish_health_snapshot(self) -> None:
"""Publish immutable snapshot for cross-thread health reads (event loop only)."""
lsp = self._last_successful_poll
stats = dict(self._last_probe_stats)
snap = SchedulerSnapshot(
last_updated=datetime.now(timezone.utc).isoformat(),
poll_count=self._poll_count,
last_successful_poll=(
datetime.fromtimestamp(lsp, tz=timezone.utc).isoformat() if lsp else None
),
last_cycle_status=(self._last_cycle_status.value if self._last_cycle_status else None),
last_cycle_error=self._last_cycle_error,
probe_stats=stats,
probe_success_rate=_compute_probe_success_rate(stats),
)
with self._health_lock:
self._health_snapshot = snap

def health_snapshot(self) -> dict[str, Any]:
"""Return a consistent copy of scheduler fields for ``/health`` extras."""
with self._health_lock:
snap = self._health_snapshot
if snap is None:
return dict(_HEALTH_SNAPSHOT_DEFAULTS)
return dataclasses.asdict(snap)

async def seed(self) -> SeedResult:
"""Gather current index and probe state.
Expand All @@ -147,7 +237,9 @@ async def seed(self) -> SeedResult:

hits: list[ProbeHit] = []
if self.cfg.enable_iso_probe:
hits = await self.prober.run_cycle()
cycle = await self.prober.run_cycle()
hits = self._probe_hits_from_cycle(cycle)
self._record_probe_cycle_completion()
log.info("SEED isocpp.org probe existing=%d", len(hits))

self._seeded = True
Expand All @@ -169,8 +261,13 @@ async def poll_once(self) -> PollResult:
if not self._seeded:
seed_result = await self.seed()
if not seed_result.had_prior_state:
self._last_successful_poll = time.time()
self._last_probe_stats = self.prober.snapshot_stats()
if self.cfg.enable_iso_probe:
self._mark_poll_successful_if_probe_ok()
self._record_probe_cycle_completion()
else:
self._last_successful_poll = time.time()
self._record_probe_cycle_completion()
self._publish_health_snapshot()
return PollResult(
diff=DiffResult(new_papers=[], updated_papers=[]),
probe_hits=[],
Expand Down Expand Up @@ -208,8 +305,13 @@ async def poll_once(self) -> PollResult:
)
if self.notify_callback:
self.notify_callback(result)
self._last_successful_poll = time.time()
self._last_probe_stats = self.prober.snapshot_stats()
if self.cfg.enable_iso_probe:
self._mark_poll_successful_if_probe_ok()
self._record_probe_cycle_completion()
else:
self._last_successful_poll = time.time()
self._record_probe_cycle_completion()
self._publish_health_snapshot()
return result

previous = dict(self._previous_papers)
Expand Down Expand Up @@ -239,7 +341,9 @@ async def poll_once(self) -> PollResult:

probe_hits: list[ProbeHit] = []
if self.cfg.enable_iso_probe:
probe_hits = await self.prober.run_cycle()
cycle = await self.prober.run_cycle()
probe_hits = self._probe_hits_from_cycle(cycle)
self._record_probe_cycle_completion()

recent_hits = [h for h in probe_hits if h.is_recent]
old_hits = [h for h in probe_hits if not h.is_recent]
Expand Down Expand Up @@ -324,8 +428,13 @@ async def poll_once(self) -> PollResult:
len(dp_transitions),
len(per_user_matches),
)
self._last_successful_poll = time.time()
self._last_probe_stats = self.prober.snapshot_stats()
if self.cfg.enable_iso_probe:
self._mark_poll_successful_if_probe_ok()
self._record_probe_cycle_completion()
else:
self._last_successful_poll = time.time()
self._record_probe_cycle_completion()
self._publish_health_snapshot()
return result

async def run_forever(self) -> None:
Expand Down
Loading