Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions autoptz/config/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ class ThemeConfig(BaseModel, frozen=True):


class SourceConfig(BaseModel, frozen=True):
type: Literal["usb", "rtsp", "onvif", "ndi"] = "usb"
address: str = "" # index (USB), URL (RTSP/ONVIF), name (NDI)
type: Literal["usb", "rtsp", "onvif", "ndi", "synthetic"] = "usb"
address: str = "" # index (USB), URL (RTSP/ONVIF), name (NDI), content path/keyword (synthetic)
unique_id: str | None = None # stable device id (USB: AVFoundation uniqueID)
source_label: str = "" # friendly kind ("Built-in"/"Continuity Camera"/…)
username: str = ""
Expand Down
12 changes: 12 additions & 0 deletions autoptz/config/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import json
import logging
import os
import sqlite3
import sys
import threading
Expand Down Expand Up @@ -60,6 +61,17 @@ def default_config_dir() -> Path:


def default_db_path() -> Path:
"""Path to the config DB.

Honours ``AUTOPTZ_DB_PATH`` so an alternate profile (or an isolated
synthetic-camera setup for scaling/CPU tests) can be selected without
disturbing the default ``~/Library/Application Support/AutoPTZ/autoptz.db``.
Models and caches resolve independently of this path, so only the
camera/identity/settings store is redirected.
"""
override = os.environ.get("AUTOPTZ_DB_PATH", "").strip()
if override:
return Path(override).expanduser()
return default_config_dir() / "autoptz.db"


Expand Down
94 changes: 87 additions & 7 deletions autoptz/engine/pipeline/identify.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@

from __future__ import annotations

import contextlib
import logging
import os
import threading
from collections.abc import Iterator
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any
Expand All @@ -44,6 +47,82 @@
# Default cosine-similarity floor for a confident identity match.
DEFAULT_MATCH_THRESHOLD = 0.35


def _face_intra_threads() -> int:
"""Per-session intra-op thread budget for the insightface sessions.

Follows the supervisor's published per-camera budget
(``AUTOPTZ_ORT_INTRA_THREADS``) so face stays in lockstep with detector/pose;
falls back to a small fixed cap when nothing is published. Face runs only a
few Hz on one target, so a tight cap costs little latency and avoids a
cores-wide CPU pool per session.
"""
raw = os.environ.get("AUTOPTZ_ORT_INTRA_THREADS", "").strip()
if raw:
try:
return max(1, int(raw))
except ValueError:
pass
cores = os.cpu_count() or 4
return max(1, min(4, cores - 1))


# The monkeypatch below swaps a process-global method (``ort.InferenceSession``).
# The normal path (shared pool) serialises face builds via the pool lock, but the
# degraded *per-worker* fallback (``worker.stacks._build_face_stack``) can run on
# all camera threads at once. Two overlapping patch/restore cycles would corrupt
# the saved original, so we serialise entry here.
_PATCH_LOCK = threading.Lock()


@contextlib.contextmanager
def _capped_insightface_sessions(intra_threads: int) -> Iterator[None]:
"""Force every ORT session insightface builds to be capped + non-spinning.

insightface's ``model_zoo.get_model`` only forwards ``providers`` /
``provider_options`` to ``onnxruntime.InferenceSession`` — it gives no hook to
pass ``SessionOptions``. Left to its defaults, each buffalo_l sub-model
(SCRFD detector + ArcFace embedder) spins up a **cores-wide, busy-spinning**
intra-op pool, which on a multi-camera box is a dominant, mostly-idle CPU sink
(profiling: ORT ``WorkerLoop`` was the single largest consumer).

We therefore wrap construction in a scoped patch of
``onnxruntime.InferenceSession.__init__`` that injects a tuned ``SessionOptions``
(small intra-op pool, single sequential inter-op pool, spinning disabled) when
the caller didn't supply one. Patching ``__init__`` on the class — not
rebinding the name — is required so insightface's ``PickableInferenceSession``
subclass (whose ``super().__init__`` resolves to the live method) picks it up.
Always restored, even on failure; a no-op if onnxruntime is unavailable.
"""
try:
import onnxruntime as ort # noqa: PLC0415
except Exception: # noqa: BLE001 — no ORT → nothing to cap
yield
return

orig_init = ort.InferenceSession.__init__

def patched_init(self: Any, *args: Any, **kwargs: Any) -> None:
if not kwargs.get("sess_options"):
so = ort.SessionOptions()
so.intra_op_num_threads = max(1, int(intra_threads))
so.inter_op_num_threads = 1
with contextlib.suppress(Exception):
so.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
with contextlib.suppress(Exception):
so.add_session_config_entry("session.intra_op.allow_spinning", "0")
so.add_session_config_entry("session.inter_op.allow_spinning", "0")
kwargs["sess_options"] = so
orig_init(self, *args, **kwargs)

with _PATCH_LOCK:
ort.InferenceSession.__init__ = patched_init
try:
yield
finally:
ort.InferenceSession.__init__ = orig_init


# Logged-once guard so a missing dependency is reported a single time.
_WARNED_UNAVAILABLE = False

Expand Down Expand Up @@ -268,13 +347,14 @@ def _try_init(self) -> None:
# keypoints) + the ArcFace embedding. The default pack also loads 2D/3D
# landmark + gender/age models we never use; skipping them trims memory
# and load time.
app = FaceAnalysis(
name=self._model_name,
root=insightface_root(),
providers=providers,
allowed_modules=["detection", "recognition"],
)
app.prepare(ctx_id=ctx_id, det_size=(self._det_size, self._det_size))
with _capped_insightface_sessions(_face_intra_threads()):
app = FaceAnalysis(
name=self._model_name,
root=insightface_root(),
providers=providers,
allowed_modules=["detection", "recognition"],
)
app.prepare(ctx_id=ctx_id, det_size=(self._det_size, self._det_size))
self._app = app
self._available = True
self.last_error = None
Expand Down
180 changes: 180 additions & 0 deletions autoptz/engine/pipeline/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
from __future__ import annotations

import logging
import os
import platform
import threading
import time
from abc import ABC, abstractmethod
from collections.abc import Iterator
from dataclasses import dataclass
from enum import Enum
from pathlib import Path

import cv2
import numpy as np
Expand Down Expand Up @@ -1093,3 +1095,181 @@ def _close(self) -> None:
self._receiver = None
self._finder = None
self._video_frame = None


# ── Synthetic Adapter (test / scaling, no camera or OS permission needed) ────────


def _find_people_sample() -> str | None:
"""Best-effort path to a bundled people image (ultralytics asset), else None.

Used only when present at runtime; AutoPTZ never redistributes these samples.
"""
try:
import importlib.util # noqa: PLC0415

spec = importlib.util.find_spec("ultralytics")
if spec and spec.submodule_search_locations:
base = Path(next(iter(spec.submodule_search_locations)))
for rel in ("assets/zidane.jpg", "assets/bus.jpg"):
cand = base / rel
if cand.exists():
return str(cand)
except Exception: # noqa: BLE001
pass
return None


class SyntheticAdapter(SourceAdapter):
"""Procedural / file-backed synthetic source — needs no camera or OS permission.

Built for scaling tests and headless CPU validation: it fabricates a moving
BGR scene at the pipeline resolution so the *whole* pipeline (capture handoff →
detection → tracking → pose → face → ego-motion → paint) runs exactly as for a
real camera, but deterministically and on any machine (no AVFoundation/USB/NDI,
no camera permission). ``address`` selects the content:

* a path to a **video** file → looped frame-by-frame (real motion + people);
* a path to an **image** file → panned/zoomed to synthesise motion;
* ``""`` / ``"anim"`` → a procedurally generated animated scene;
* ``"people"`` → a bundled ultralytics people sample if present.

The scene is panned every frame so motion-dependent stages (tracking, pose,
ego-motion optical flow) get genuine work rather than a static image.
"""

def __init__(
self,
camera_id: str,
*,
address: str = "",
width: int = 1280,
height: int = 720,
target_fps: float = 30.0,
shm_writer: ShmWriter | None = None,
stall_timeout: float = _STALL_TIMEOUT_DEFAULT,
) -> None:
super().__init__(camera_id, shm_writer, target_fps, stall_timeout)
self._address = (address or "").strip()
self._w = int(shm_writer.width) if shm_writer is not None else int(width)
self._h = int(shm_writer.height) if shm_writer is not None else int(height)
self._base: NDArray[np.uint8] | None = None
self._video: object | None = None
self._idx = 0
# Effective delivered-fps telemetry (logged every few seconds): when the
# downstream pipeline can't keep up, the worker pulls slower than target
# and this drops below the configured fps — the frame-drop signal.
self._fps_t0 = 0.0
self._fps_n0 = 0
# Per-camera phase offset so N synthetic cameras don't move in lockstep
# (keeps detection/tracking/ego work uncorrelated across cameras).
self._phase = (abs(hash(camera_id)) % 997) / 997.0 * 2.0 * float(np.pi)

def _resolve_path(self) -> str | None:
addr = self._address
if addr in ("", "anim", "synthetic"):
return None
if addr == "people":
return _find_people_sample()
return addr

def _open(self) -> bool:
self._idx = 0
path = self._resolve_path()
if path and os.path.exists(path):
cap = cv2.VideoCapture(path)
if cap.isOpened():
ok, _ = cap.read()
count = cap.get(cv2.CAP_PROP_FRAME_COUNT)
if ok and count and count > 1.5: # a real (multi-frame) video
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
self._video = cap
log.info("camera_id=%s SyntheticAdapter looping video %s", self.camera_id, path)
return True
cap.release()
img = cv2.imread(path)
if img is not None:
self._base = cv2.resize(img, (self._w, self._h))
log.info("camera_id=%s SyntheticAdapter animating image %s", self.camera_id, path)
return True
# Nothing usable supplied → fall back to a people sample (so detection-driven
# stages still fire) and finally to a pure procedural scene.
ps = _find_people_sample()
if ps:
img = cv2.imread(ps)
if img is not None:
self._base = cv2.resize(img, (self._w, self._h))
log.info(
"camera_id=%s SyntheticAdapter %s scene %dx%d@%.0ffps",
self.camera_id,
"people-sample" if self._base is not None else "procedural",
self._w,
self._h,
self._target_fps,
)
return True

def _read_frame(self) -> NDArray[np.uint8] | None:
self._idx += 1
now = time.monotonic()
if self._fps_t0 == 0.0:
self._fps_t0, self._fps_n0 = now, self._idx
elif now - self._fps_t0 >= 5.0:
eff = (self._idx - self._fps_n0) / (now - self._fps_t0)
# INFO normally (shows in the in-app log panel); WARNING under the test
# debug flag so headless scaling runs capture it on stderr.
emit = (
log.warning
if os.environ.get("AUTOPTZ_SYNTH_DEBUG", "").strip().lower()
in ("1", "true", "yes", "on")
else log.info
)
emit(
"camera_id=%s synthetic effective fps=%.1f (target=%.0f)",
self.camera_id,
eff,
self._target_fps,
)
self._fps_t0, self._fps_n0 = now, self._idx
cap = self._video
if cap is not None:
ok, frm = cap.read() # type: ignore[union-attr]
if not ok:
cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # type: ignore[union-attr]
ok, frm = cap.read() # type: ignore[union-attr]
if not ok or frm is None:
return None
if frm.shape[1] != self._w or frm.shape[0] != self._h:
frm = cv2.resize(frm, (self._w, self._h))
return np.ascontiguousarray(frm)
return self._compose()

def _compose(self) -> NDArray[np.uint8]:
w, h, i = self._w, self._h, self._idx
t = i / max(1.0, self._target_fps)
# Pan (simulated camera/ego motion): slow sinusoid + tiny high-freq jitter.
dx = int(round(0.04 * w * np.sin(t * 0.8 + self._phase) + 2.0 * np.sin(t * 11.0)))
dy = int(round(0.03 * h * np.cos(t * 0.6 + self._phase)))
if self._base is not None:
frame = np.roll(self._base, (dy, dx), axis=(0, 1))
else:
ramp = ((np.arange(w) + i * 2) % 256).astype(np.uint8)
grad = np.tile(ramp, (h, 1))
frame = np.empty((h, w, 3), dtype=np.uint8)
frame[..., 0] = grad
frame[..., 1] = np.uint8(80)
frame[..., 2] = 255 - grad
# A moving foreground block: extra motion for the tracker to chase.
bx = int((0.5 + 0.35 * np.sin(t * 1.3 + self._phase)) * w)
by = int((0.5 + 0.25 * np.cos(t * 1.1)) * h)
cv2.rectangle(frame, (bx - 60, by - 120), (bx + 60, by + 120), (30, 30, 220), -1)
return np.ascontiguousarray(frame)

def _close(self) -> None:
cap = self._video
if cap is not None:
try:
cap.release() # type: ignore[union-attr]
except Exception: # noqa: BLE001
pass
self._video = None
30 changes: 30 additions & 0 deletions autoptz/engine/runtime/inference.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,35 @@ def _provider_options(ep: EP, prefs: HardwarePrefs | None) -> dict[str, object]:
return {}


def _apply_low_idle_threading(so: ort.SessionOptions) -> None:
"""Stop ORT worker threads from busy-spinning while idle.

AutoPTZ runs each model only intermittently (detect every Nth frame, pose/face
a few Hz), so an ORT session spends most of its life *idle between runs*. By
default ORT's intra-op worker threads **busy-spin** (~200 ms) before parking,
which on a multi-camera box turns several mostly-idle sessions into a wall of
CPU — profiling showed ``ThreadPoolTempl::WorkerLoop`` as the single largest
consumer. Disabling spinning makes workers block immediately on a condition
variable instead, collapsing idle/intermittent CPU at the cost of a few hundred
microseconds of wake-up latency per run (negligible next to a 10-40 ms model).

We also force a single, sequential inter-op pool: AutoPTZ runs one graph per
call, so a parallel inter-op pool only adds another idle spinner.
"""
# Keys are stable ORT session-config entries (>=1.6); unknown keys are ignored
# by older runtimes, so this stays safe across the EP matrix.
try:
so.add_session_config_entry("session.intra_op.allow_spinning", "0")
so.add_session_config_entry("session.inter_op.allow_spinning", "0")
except Exception: # noqa: BLE001 — a tuning hint must never block session build
pass
try:
so.execution_mode = ort.ExecutionMode.ORT_SEQUENTIAL
so.inter_op_num_threads = 1
except Exception: # noqa: BLE001
pass


def _build_session_options(
prefs: HardwarePrefs | None,
base: ort.SessionOptions | None,
Expand All @@ -303,6 +332,7 @@ def _build_session_options(
so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALL
if prefs and prefs.intra_op_threads:
so.intra_op_num_threads = max(1, int(prefs.intra_op_threads))
_apply_low_idle_threading(so)
return so


Expand Down
Loading
Loading