Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Discriminated ISO probe cycle outcomes (`CycleResult`: success / empty / failed) with distinct logging and `/health` fields (`last_cycle_status`, `last_cycle_error`).
- Atomic `SchedulerSnapshot` for `/health` scheduler extras (`last_updated`, `poll_count`, probe stats) published under a lock from `Scheduler.health_snapshot()`.
- Post the same Slack **status** summary as the interactive command to `NOTIFICATION_CHANNEL` once when the process starts (when that channel is configured).
- Open-source hygiene: contributing guide, security policy, code of conduct, onboarding and handoff docs, pre-commit (Ruff), GitHub issue templates, Dependabot, CodeQL, CODEOWNERS template, and `.gitattributes`.

Expand Down
4 changes: 2 additions & 2 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ These components share one thread and the main event loop. They may await I/O bu

- **`Scheduler.run_forever` / `poll_once`** — orchestrates index refresh, probing, and notifications.
- **`WG21Index.refresh`** — fetches and parses wg21.link index (httpx async).
- **`ISOProber.run_cycle` / `_probe_one`** — concurrent HEAD probes via `asyncio.gather` and an httpx async client.
- **`ISOProber.run_cycle` / `_probe_one`** — concurrent HEAD probes via `asyncio.gather` and an httpx async client. `run_cycle` returns a discriminated `CycleResult` (success / empty / failed).
- **Slack Bolt handlers** — run on Bolt’s thread; they should not read mutable source state directly (use snapshots or health callbacks).

`ISOProber._stats` is updated from many coroutines in one `run_cycle()`. This is safe on the event loop because asyncio never preempts between awaits. A `threading.Lock` guards `_stats` as defense-in-depth if code is ever called from a worker thread by mistake.
Expand All @@ -17,7 +17,7 @@ These components share one thread and the main event loop. They may await I/O bu

| Thread | Role |
|--------|------|
| **Health server** (`health.py`) | Serves `GET /health`; reads `len(index.papers)` via a callback and scheduler snapshot fields. |
| **Health server** (`health.py`) | Serves `GET /health`; reads `len(index.papers)` via a callback and scheduler fields from `Scheduler.health_snapshot()` (immutable snapshot, lock-protected publish). |
| **MessageQueue sender** (`scout.py`) | Drains Slack post queue with rate limiting. |
| **`run_blocking_io` / `asyncio.to_thread`** | Runs blocking psycopg2 calls (e.g. `UserWatchlist.matches_for_users`) off the loop. |

Expand Down
87 changes: 73 additions & 14 deletions src/paperscout/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,74 @@

log = logging.getLogger("paperscout")

# MessageQueue keys allowed in /health extras (must not overlap scheduler.health_snapshot()).
_MQ_HEALTH_FIELD_NAMES = frozenset(
{
"mq_depth",
"mq_max_size",
"mq_utilization",
"mq_circuit_state",
}
)


def _mq_health_fields(mq: MessageQueue) -> dict:
"""MQ metrics for /health; from health_fields() when present, else depth only."""
if hasattr(mq, "health_fields"):
try:
raw = mq.health_fields()
except Exception as exc:
log.warning(
"health: mq.health_fields() failed for %s id=%s: %s",
type(mq).__name__,
id(mq),
exc,
exc_info=True,
)
try:
return {"mq_depth": mq.depth()}
except Exception:
log.warning(
"health: mq.depth() fallback failed; omitting MQ fields",
exc_info=True,
)
return {}
if isinstance(raw, dict):
return raw
log.warning("health: mq.health_fields() returned non-dict, using mq_depth only")
try:
return {"mq_depth": mq.depth()}
except Exception:
log.warning("health: mq.depth() failed; omitting MQ fields", exc_info=True)
return {}


def _merge_extra_health_fields(
scheduler_snap: dict,
mq_extra: dict,
db_pool: dict,
) -> dict:
"""Merge health JSON with scheduler winning on key conflicts."""
scheduler_keys = set(scheduler_snap)
mq_filtered: dict = {}
for key, value in mq_extra.items():
if key in _MQ_HEALTH_FIELD_NAMES:
if key in scheduler_keys:
log.debug(
"health: mq_extra key %r conflicts with scheduler snapshot; scheduler wins",
key,
)
else:
mq_filtered[key] = value
elif key in scheduler_keys:
log.debug(
"health: mq_extra key %r not allow-listed; scheduler snapshot kept",
key,
)
else:
log.debug("health: mq_extra key %r not allow-listed, dropping", key)
return {**scheduler_snap, **mq_filtered, "db_pool": db_pool}


def _setup_logging(data_dir: Path, console_level: str = "INFO", retention_days: int = 7) -> None:
"""Console + daily rotating file logging; third-party loggers capped at WARNING."""
Expand Down Expand Up @@ -141,20 +209,11 @@ def _pool_status(p) -> dict:
)

def _extra_health_fields() -> dict:
lsp = scheduler._last_successful_poll
s = scheduler._last_probe_stats
# HTTP 200 outcomes / non-skipped probe attempts (excludes skipped_discovered, skipped_in_index).
hits = s.get("hit_recent", 0) + s.get("hit_old", 0) + s.get("hit_no_lm", 0)
attempted = hits + s.get("miss", 0) + s.get("error", 0)
probe_success_rate = hits / attempted if attempted > 0 else None
return {
"last_successful_poll": (
datetime.fromtimestamp(lsp, tz=timezone.utc).isoformat() if lsp else None
),
"probe_success_rate": probe_success_rate,
"mq_depth": mq.depth(),
"db_pool": _pool_status(pool),
}
return _merge_extra_health_fields(
scheduler.health_snapshot(),
_mq_health_fields(mq),
_pool_status(pool),
)

register_handlers(app, user_watchlist, state, paper_count_fn, launch_time)

Expand Down
6 changes: 5 additions & 1 deletion src/paperscout/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,12 @@ class Settings(BaseSettings):
notification_channel: str = ""
# Slack channel ID for ops alerts (stale poll). Empty = disabled.
ops_alert_channel: str = ""
# Log a warning when MessageQueue depth reaches or exceeds this (unbounded queue).
# Log a warning when MessageQueue depth reaches or exceeds this (legacy threshold).
mq_backpressure_threshold: int = Field(default=100, ge=1)
mq_max_size: int = Field(default=1000, ge=1)
mq_max_retries: int = Field(default=10, ge=0)
mq_circuit_breaker_threshold: int = Field(default=5, ge=1)
mq_circuit_breaker_cooldown_seconds: int = Field(default=60, ge=1)
notify_on_frontier_hit: bool = True
notify_on_any_draft: bool = True
# Alert when a D-paper we previously probed appears in the wg21.link index
Expand Down
4 changes: 3 additions & 1 deletion src/paperscout/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,9 @@ def do_GET(self) -> None:
except Exception:
log.exception("health: extra_fields_fn failed")
extra = {}
body = json.dumps({**base, **extra}).encode()
# Base handler fields win if extra_fields_fn returns overlapping keys.
safe_extra = {k: v for k, v in extra.items() if k not in base}
body = json.dumps({**base, **safe_extra}).encode()

self.send_response(200)
self.send_header("Content-Type", "application/json")
Expand Down
32 changes: 32 additions & 0 deletions src/paperscout/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,38 @@ class ProbeHit:
is_recent: bool = False


class CycleStatus(str, Enum):
"""Outcome of one ``ISOProber.run_cycle()`` invocation."""

SUCCESS = "success"
EMPTY = "empty"
FAILED = "failed"


@dataclass(frozen=True, slots=True)
class CycleResult:
"""Discriminated probe cycle result (success vs empty vs failed)."""

status: CycleStatus
results: tuple[ProbeHit, ...] = ()
error: str | None = None

@property
def hits(self) -> list[ProbeHit]:
"""Probe hits when ``status`` is ``SUCCESS``; otherwise empty."""
return list(self.results) if self.status == CycleStatus.SUCCESS else []

def __post_init__(self) -> None:
if self.status == CycleStatus.FAILED and not self.error:
raise ValueError("CycleResult FAILED must carry a non-empty error string")
if self.status == CycleStatus.SUCCESS and not self.results:
raise ValueError("CycleResult SUCCESS must carry at least one ProbeHit")
if self.status == CycleStatus.EMPTY and self.results:
raise ValueError("CycleResult EMPTY must not carry results")
if self.status != CycleStatus.FAILED and self.error is not None:
raise ValueError("CycleResult error is only valid for FAILED status")


@dataclass
class PerUserMatches:
"""One user's watchlist hits: ``(paper|hit, 'author'|'paper')`` tuples."""
Expand Down
Loading