Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions autoptz/config/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,19 @@ class TrackingConfig(BaseModel, frozen=True):
# Never run the detector and the pose pass on the same inference frame, so a
# heavy detect tick and a heavy pose tick don't stack into a 200ms frame.
stage_spread: bool = True
# Center Stage multi-person *group framing* (digital crop path only). When ON
# and more than one confident person is present WITHOUT an explicit locked
# target, the digital framer frames the UNION of everyone's boxes (auto-widens
# to keep the group in shot) instead of a single subject. An explicitly locked
# target (by id or identity) always wins — it keeps following that one person.
# Off by default → no behaviour change.
group_framing: bool = False
# Subtle digital lead-room ("nose room") for the Center Stage crop: bias the
# crop centre toward the framed subject's motion so a walking subject sits a
# touch back-of-centre. The offset is this gain × the EMA subject-centre
# velocity, capped to a small fraction of the crop so it can't destabilise
# framing. Conservative default (0.0 = off / centred, exactly as before).
lead_room: float = Field(default=0.0, ge=0.0, le=1.0)


# Vertical aim point as a fraction of the person-box height measured from the TOP
Expand Down
111 changes: 93 additions & 18 deletions autoptz/engine/camera_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,20 @@ def _push_due(now: float, last: float, min_period: float) -> bool:


# Center Stage crop tightness per "Framing" preset → (subject fill of crop,
# max crop as a fraction of the frame). A smaller ``max_frac`` forces a tighter
# zoom even on a close subject that already fills the sensor; the live "Framing"
# dropdown (tracking.framing) picks the preset, so the user dials the shot
# without a restart. ``upper_body`` is the default head-and-shoulders look.
_CENTERSTAGE_FRAMING: dict[str, tuple[float, float]] = {
"face": (0.86, 0.50), # tight head/face closeup (~2.0x on a close subject)
"head_shoulders": (0.80, 0.62), # head + shoulders (~1.6x)
"upper_body": (0.70, 0.74), # head + chest (~1.35x) — default
"full_body": (0.58, 0.94), # whole person, gentle crop
# max crop as a fraction of the frame, headroom). A smaller ``max_frac`` forces a
# tighter zoom even on a close subject that already fills the sensor; the live
# "Framing" dropdown (tracking.framing) picks the preset, so the user dials the
# shot without a restart. ``upper_body`` is the default head-and-shoulders look.
#
# ``headroom`` is shot-size-aware: a tight face/closeup wants only a sliver of
# space above the head, while a full-body shot wants more margin so the subject
# isn't jammed against the top. Closer shots → less headroom, wider shots → more.
# The old fixed 0.10 is kept for ``upper_body`` as the midpoint.
_CENTERSTAGE_FRAMING: dict[str, tuple[float, float, float]] = {
"face": (0.86, 0.50, 0.06), # tight head/face closeup (~2.0x), minimal headroom
"head_shoulders": (0.80, 0.62, 0.08), # head + shoulders (~1.6x)
"upper_body": (0.70, 0.74, 0.10), # head + chest (~1.35x) — default midpoint
"full_body": (0.58, 0.94, 0.14), # whole person, more margin above the head
}

_DEFAULT_TELEMETRY_HZ = 10.0
Expand Down Expand Up @@ -608,6 +613,10 @@ def __init__(
self._shm: ShmWriter | None = None
self._vcam: Any | None = None # VirtualCamSink (lazily created when vcam_out enabled)
self._digital_framer: Any | None = None # Center Stage auto-framer (lazy)
# True when the last _current_digital_target() returned a multi-person group
# UNION box (which must fit-width); False for a single locked person (which
# keeps the prior height-only sizing). Read by the Center Stage crop path.
self._digital_target_is_group: bool = False
self._cs_diag_t: float = 0.0 # throttle for the Center Stage diagnostic log
self._detect: _DetectStack | None = None
self._pooled_detector = False
Expand Down Expand Up @@ -3491,14 +3500,21 @@ def _framed_output(self, frame: NDArray[np.uint8]) -> NDArray[np.uint8]:
from autoptz.engine.pipeline.digital_framer import DigitalFramer

framer = self._digital_framer = DigitalFramer(out_aspect=aspect)
# Crop tightness follows the live "Framing" dropdown.
# Crop tightness AND shot-size-aware headroom follow the live "Framing"
# dropdown (set live so a preset change re-composes without a restart).
framing = getattr(self.config.tracking, "framing", "upper_body")
framer.fill, framer.max_frac = _CENTERSTAGE_FRAMING.get(
framer.fill, framer.max_frac, framer.headroom = _CENTERSTAGE_FRAMING.get(
framing, _CENTERSTAGE_FRAMING["upper_body"]
)
# Subtle digital lead-room ("nose room"): bias the crop toward the
# subject's motion. Conservative default; 0 reproduces centred framing.
framer.lead = float(getattr(self.config.tracking, "lead_room", 0.0))
target = self._current_digital_target()
if target is not None:
x, y, cw, ch = framer.frame_for(target, w, h)
# fit_width only for a multi-person group UNION (so it auto-widens to
# keep everyone in shot); a single locked/standalone person stays on
# the prior height-only sizing.
x, y, cw, ch = framer.frame_for(target, w, h, fit_width=self._digital_target_is_group)
else:
x, y, cw, ch = framer.full_frame(w, h)
nowm = time.monotonic()
Expand All @@ -3525,15 +3541,28 @@ def _framed_output(self, frame: NDArray[np.uint8]) -> NDArray[np.uint8]:
return cv2.resize(crop, (ow, oh), interpolation=interp)

def _current_digital_target(self) -> tuple[float, float, float, float] | None:
"""The selected target's bbox (x1,y1,x2,y2) for Center Stage, or None.
"""The bbox (x1,y1,x2,y2) Center Stage should frame this tick, or None.

Runs on the capture thread; a slightly stale box is fine for smooth
framing. Prefers the live track for the current target id, but falls back
to the maintained *trusted* target box so Center Stage keeps framing
through track-id churn / identity re-binding (when ``_target_track_id``
momentarily points at a track not in the latest ``_last_tracks``).
framing.

**Explicit lock wins.** When the user has locked a specific person (by
track id OR by configured identity), Center Stage follows *that* single
person even with group framing on — explicit selection always beats the
crowd. It prefers the live track for the current id but falls back to the
maintained *trusted* box so the crop holds through track-id churn /
identity re-binding (when ``_target_track_id`` momentarily points at a
track not in the latest ``_last_tracks``).

**Group framing** (``tracking.group_framing``, default off) only applies
when NO explicit target is locked: with more than one confident, non-lost
person present the crop frames the UNION of their boxes (auto-widening,
capped by ``max_frac``). One person, or the toggle off, is the single
target's box exactly as before.
"""
self._digital_target_is_group = False
tid = self._target_track_id
explicit_lock = tid is not None or self._target_identity_id is not None
if tid is not None:
for t in self._last_tracks or ():
if (
Expand All @@ -3545,12 +3574,58 @@ def _current_digital_target(self) -> tuple[float, float, float, float] | None:
return (bb.x1, bb.y1, bb.x2, bb.y2)
# Fallback: the last trusted target box (set whenever a target is locked,
# by track id OR by identity), so the crop holds through brief track gaps.
if self._target_track_id is not None or self._target_identity_id is not None:
if explicit_lock:
tb = getattr(self._target_lock, "trusted_bbox", None)
if tb is not None:
return (tb.x1, tb.y1, tb.x2, tb.y2)
# An explicit lock ALWAYS wins: never fall through to the group union
# just because the locked track is momentarily absent (transient — e.g.
# the first frame(s) after selecting a target, before trusted_bbox is
# populated). Returning None holds the prior crop / full frame instead.
return None

# No explicit lock: optionally frame the whole confident group as a union.
if bool(getattr(self.config.tracking, "group_framing", False)):
boxes = self._confident_person_boxes(self._last_tracks)
if not boxes:
return None
# fit-width only when the union actually spans MORE THAN ONE person; a
# single confident person keeps the prior height-only single-target feel.
self._digital_target_is_group = len(boxes) > 1
from autoptz.engine.pipeline.digital_framer import union_bbox

return union_bbox(boxes)
return None

@staticmethod
def _confident_person_boxes(
tracks: list[TrackInfo],
) -> list[tuple[float, float, float, float]]:
"""Every confident, non-lost person box in *tracks* (pure, testable)."""
boxes: list[tuple[float, float, float, float]] = []
for t in tracks or ():
if getattr(t, "lost", False):
continue
bb = getattr(t, "bbox", None)
if bb is None:
continue
boxes.append((bb.x1, bb.y1, bb.x2, bb.y2))
return boxes

@staticmethod
def _group_union_bbox(
tracks: list[TrackInfo],
) -> tuple[float, float, float, float] | None:
"""Union of every confident, non-lost person box in *tracks*, or None.

Pure (no instance state) so it's unit-testable directly. Returns None when
fewer than one usable person is present so the caller falls back to the
full-frame path; a single usable person yields just that person's box.
"""
from autoptz.engine.pipeline.digital_framer import union_bbox

return union_bbox(CameraWorker._confident_person_boxes(tracks))

def _push_frame(self, frame: NDArray[np.uint8]) -> None:
if self._shm is None:
return
Expand Down
110 changes: 107 additions & 3 deletions autoptz/engine/pipeline/digital_framer.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,25 @@ def _clamp(v: float, lo: float, hi: float) -> float:
return lo if v < lo else hi if v > hi else v


def union_bbox(
boxes: list[tuple[float, float, float, float]],
) -> tuple[float, float, float, float] | None:
"""The smallest ``(x1, y1, x2, y2)`` box covering every box in *boxes*.

Used for multi-person *group framing*: pass the bboxes of the confident
people and frame the union so the auto-zoom widens to keep everyone in shot.
Returns ``None`` for an empty list (the caller falls back to the single
target / full-frame path). A single box round-trips unchanged.
"""
if not boxes:
return None
x1 = min(b[0] for b in boxes)
y1 = min(b[1] for b in boxes)
x2 = max(b[2] for b in boxes)
y2 = max(b[3] for b in boxes)
return (float(x1), float(y1), float(x2), float(y2))


def desired_crop(
bbox: tuple[float, float, float, float],
frame_w: int,
Expand All @@ -31,6 +50,7 @@ def desired_crop(
min_frac: float,
max_frac: float,
headroom: float = 0.10,
fit_width: bool = False,
) -> tuple[float, float, float, float]:
"""The crop ``(x, y, w, h)`` (pixels) that frames *bbox*.

Expand All @@ -40,15 +60,32 @@ def desired_crop(
window (never the whole frame) and never an extreme zoom. ``out_aspect`` keeps
the crop matching the output so the resize doesn't distort. ``headroom`` lifts
the centre so the head sits a little below the top.

By default the crop is sized *height-only* from the subject height (a tall
single person frames exactly as before, and an arms-spread / T-pose single box
is NOT zoomed out). ``fit_width=True`` additionally grows the crop so its
aspect-locked width covers the subject *width* too — used only for the
multi-person *group framing* union, so the crop auto-widens to keep everyone
in shot (still aspect-locked and capped at ``max_frac``). Single-person /
non-group framing keeps ``fit_width=False`` for byte-identical prior behaviour.
"""
bx1, by1, bx2, by2 = (float(v) for v in bbox)
subj_h = max(1.0, by2 - by1)
subj_w = max(1.0, bx2 - bx1)
cx = (bx1 + bx2) * 0.5
cy = (by1 + by2) * 0.5
fw, fh = float(frame_w), float(frame_h)

# Size the crop to the subject, then constrain it to a window of the frame.
ch = subj_h / _clamp(fill, 0.1, 1.0)
# Height from the subject height. Only when ``fit_width`` is set (the group
# union path) do we ALSO grow the crop height so its aspect-locked width covers
# a wide subject — the single-person default stays strictly height-driven so it
# never zooms out more than before. ``max_frac`` then caps the result.
fill_c = _clamp(fill, 0.1, 1.0)
ch = subj_h / fill_c
if fit_width:
ch_for_width = (subj_w / fill_c) / out_aspect
ch = max(ch, ch_for_width)
ch = _clamp(ch, min_frac * fh, max_frac * fh)
cw = ch * out_aspect
# If that is wider than the frame, cap width (keeps aspect; only happens for
Expand Down Expand Up @@ -93,21 +130,44 @@ class DigitalFramer:
deadzone: float = 0.04 # hold centre while desired moves < this frac of crop w/h
size_deadband: float = 0.03 # ignore size changes under this fraction
headroom: float = 0.10
# Digital lead-room ("nose room"): bias the crop CENTRE in the direction of the
# subject's motion so a walking subject sits a touch back-of-centre rather than
# trailing the edge. The offset is ``lead`` × the EMA subject-centre velocity
# (px/frame), capped to ``_LEAD_MAX_FRAC`` of the crop so it can never
# destabilise framing. ``lead=0.0`` reproduces the prior centred behaviour.
lead: float = 0.0 # default OFF/very subtle; 0 = no lead-room
lead_smooth: float = 0.2 # EMA weight for the subject-centre velocity estimate
_crop: tuple[float, float, float, float] | None = None
_following: bool = False # hysteresis: True once the centre is being tracked
_prev_subj_c: tuple[float, float] | None = None # last subject centre (for velocity)
_subj_vel: tuple[float, float] = (0.0, 0.0) # EMA of subject-centre velocity

# Once moving, keep following until the desired centre is back inside this
# (tighter) fraction of the dead-zone band — prevents boundary chatter.
_INNER_BAND_FRAC: float = 0.5
# Lead-room offset is capped to this fraction of the crop width/height so a
# fast subject can never shove the framing more than a gentle nudge off-centre.
_LEAD_MAX_FRAC: float = 0.12

def reset(self) -> None:
self._crop = None
self._following = False
self._prev_subj_c = None
self._subj_vel = (0.0, 0.0)

def frame_for(
self, bbox: tuple[float, float, float, float], frame_w: int, frame_h: int
self,
bbox: tuple[float, float, float, float],
frame_w: int,
frame_h: int,
*,
fit_width: bool = False,
) -> tuple[int, int, int, int]:
"""Smoothed integer crop framing *bbox*."""
"""Smoothed integer crop framing *bbox*.

``fit_width=True`` widens the crop to cover a wide subject (the group-union
box); the default keeps the prior height-only sizing for single people.
"""
tgt = desired_crop(
bbox,
frame_w,
Expand All @@ -117,13 +177,57 @@ def frame_for(
min_frac=self.min_frac,
max_frac=self.max_frac,
headroom=self.headroom,
fit_width=fit_width,
)
tgt = self._apply_lead(bbox, tgt, frame_w, frame_h)
return self._step(tgt)

def full_frame(self, frame_w: int, frame_h: int) -> tuple[int, int, int, int]:
"""Ease the crop back toward the whole frame (no target to follow)."""
# No subject to lead — forget the velocity so the next acquisition starts
# clean instead of carrying stale motion.
self._prev_subj_c = None
self._subj_vel = (0.0, 0.0)
return self._step((0.0, 0.0, float(frame_w), float(frame_h)))

def _apply_lead(
self,
bbox: tuple[float, float, float, float],
tgt: tuple[float, float, float, float],
frame_w: int,
frame_h: int,
) -> tuple[float, float, float, float]:
"""Offset the desired crop centre toward the subject's motion (nose room).

Tracks an EMA of the subject-centre velocity (px/frame) and shifts the
crop's top-left by ``lead × velocity``, capped to ``_LEAD_MAX_FRAC`` of the
crop and re-clamped inside the frame. ``lead == 0`` is a no-op (returns
*tgt* unchanged), so prior behaviour is exactly reproduced.
"""
bx1, by1, bx2, by2 = bbox
subj_c = ((bx1 + bx2) * 0.5, (by1 + by2) * 0.5)
if self._prev_subj_c is not None:
dx = subj_c[0] - self._prev_subj_c[0]
dy = subj_c[1] - self._prev_subj_c[1]
a = _clamp(self.lead_smooth, 0.0, 1.0)
self._subj_vel = (
self._subj_vel[0] + a * (dx - self._subj_vel[0]),
self._subj_vel[1] + a * (dy - self._subj_vel[1]),
)
self._prev_subj_c = subj_c
if self.lead <= 0.0:
return tgt
x, y, w, h = tgt
ox = _clamp(
self.lead * self._subj_vel[0], -self._LEAD_MAX_FRAC * w, self._LEAD_MAX_FRAC * w
)
oy = _clamp(
self.lead * self._subj_vel[1], -self._LEAD_MAX_FRAC * h, self._LEAD_MAX_FRAC * h
)
x = _clamp(x + ox, 0.0, max(0.0, float(frame_w) - w))
y = _clamp(y + oy, 0.0, max(0.0, float(frame_h) - h))
return (x, y, w, h)

def _step(self, tgt: tuple[float, float, float, float]) -> tuple[int, int, int, int]:
if self._crop is None:
self._crop = tgt
Expand Down
Loading
Loading