diff --git a/docs/UPSTREAM_SYNC.md b/docs/UPSTREAM_SYNC.md index a905793..46f0ebc 100644 --- a/docs/UPSTREAM_SYNC.md +++ b/docs/UPSTREAM_SYNC.md @@ -479,14 +479,19 @@ stay explicit instead of being rediscovered in code review. | Chat resolver | 3-level: explicit → ContextVar → global | Process-global singleton | See [DECISIONS.md](DECISIONS.md#why-3-level-chat-resolver) | | PostableObject history | Cached in message history with real message ID | Not cached (skips history) | Upstream gap — posted messages should appear in thread/channel history | | Teams `msteams` transport key | Stripped from action values | Not stripped | Upstream gap — SDK-injected metadata should not leak to handlers | -| Fallback streaming with whitespace-only streams | Placeholder cleared to `" "` on final edit | Placeholder left visible (`"..."` stuck) | Upstream 4.26 guards against empty edits but leaves the placeholder stranded on the message. We issue one final `edit_message(" ")` so the placeholder disappears when no real content was produced. | +| Fallback streaming with whitespace-only streams (non-Teams adapters) | Placeholder cleared to `" "` on final edit | Placeholder left visible (`"..."` stuck) | Upstream 4.26 guards against empty edits but leaves the placeholder stranded on the message. We issue one final `edit_message(" ")` so the placeholder disappears when no real content was produced. Teams no longer routes through `_fallback_stream` after vercel/chat#416 (DMs use native streaming, group chats accumulate-and-post), so this divergence applies only to Slack / Discord / GitHub / Telegram / Google Chat / Linear / WhatsApp. | | Google Chat `` round-trip | `to_ast()` / `extract_plain_text()` parse the custom-label syntax back to a link node / bare label | `toAst()` / `extractPlainText()` leave `` as raw text (or parse the whole string as an autolink with a malformed URL) | Upstream 4.26 emits `` in `from_ast` but never taught the reverse direction to parse it. A message posted with `[label](url)` then read back through `fetch_messages` comes back as unstructured text (or worse, a link node with the full `url\|text` as its URL) in upstream. We close the round-trip via an AST placeholder substitution: each `` is extracted to a private-use sentinel, Markdown is parsed on the rest, and link nodes are injected where the sentinels landed. This avoids the Markdown parser's incomplete handling of balanced-parens link destinations, so URLs like `https://en.wikipedia.org/wiki/Foo_(bar)` round-trip intact. | | `from_json(data, adapter=X)` → `_adapter_name` | Updated to `X.name` so `to_json()` reflects the bound adapter | Kept at `json.adapterName`, so re-serialization can emit a name that no longer matches the actual adapter | Upstream TS has the same gap but only exposes it via the `fromJSON(json, adapter?)` overload. In Python we lean on this API more (explicit `chat=` / explicit `adapter=` is preferred over the singleton). We sync the name on rebind so runtime and serialize agree. | | Google Chat link labels with `\|` / `>` / `]` / newline, empty labels, URLs without a scheme, or URLs containing `\|` / `>` | Fall back to `text (url)` (or bare URL for empty labels) when the `` form can't round-trip safely | Always ``, producing malformed or un-parseable output | Google Chat's `` has no escape for `\|` or `>`; `]` breaks our own `to_ast()` regex (which converts `` to Markdown `[text](url)`, and Markdown closes the label at the first `]`); newline breaks the single-line form; schemeless URLs and URLs containing `\|`/`>` don't match our reverse parser. Upstream emits the malformed form regardless; we fall back to the pre-4.26 `text (url)` form (or the bare URL for empty labels) so the label/URL stays intact and Google Chat's auto-link detection still fires for http(s). | | Google Chat heading rendering | `#`-headings emit as `*text*` (bold) so they're visually distinct | Falls through to default node-to-text (plain concatenation) | Google Chat has no heading syntax; emitting plain text loses the visual hierarchy. Bold is the closest approximation the platform supports. | | Google Chat image rendering | Images emit as `{alt} ({url})` or bare `url` | No image branch — falls through to default which concatenates children only, dropping the URL | Upstream silently drops image URLs when rendering to Google Chat text. We preserve the URL so the message content isn't lost. | -| Fallback streaming stream-exception capture | `_fallback_stream` captures exceptions from the stream iterator, flushes whatever content was already rendered, awaits `pending_edit`, and re-raises after cleanup | `try/finally` only — exception propagates immediately, `pendingEdit` is un-awaited, and the placeholder is stranded as `"..."` | Upstream leaves a hard UX failure when streams crash mid-flight (common: LLM connection drops): placeholder visible forever, orphan background task. We flush + clean up before re-raising so the caller still sees the original error and users see the partial content instead of a spinner. | -| Fallback streaming final SentMessage content | SentMessage + final edit carry `final_content` (remend'd — inline markers auto-closed) | SentMessage + final edit carry raw `accumulated` | Narrow UX refinement. If a stream ends with an unclosed `*`/`~~`/etc., upstream ships the unclosed marker; we run `_remend` so the user sees a clean final message. Not observable in the common case where streams close their own markers. | +| Fallback streaming stream-exception capture (non-Teams adapters) | `_fallback_stream` captures exceptions from the stream iterator, flushes whatever content was already rendered, awaits `pending_edit`, and re-raises after cleanup | `try/finally` only — exception propagates immediately, `pendingEdit` is un-awaited, and the placeholder is stranded as `"..."` | Upstream leaves a hard UX failure when streams crash mid-flight (common: LLM connection drops): placeholder visible forever, orphan background task. We flush + clean up before re-raising so the caller still sees the original error and users see the partial content instead of a spinner. As above, this divergence no longer applies to Teams after vercel/chat#416: `_stream_via_emit` cancels the session on iterator exception so `_close_stream_session` skips the final-message activity, and the original exception still propagates to the caller. | +| Fallback streaming final SentMessage content (non-Teams adapters) | SentMessage + final edit carry `final_content` (remend'd — inline markers auto-closed) | SentMessage + final edit carry raw `accumulated` | Narrow UX refinement. If a stream ends with an unclosed `*`/`~~`/etc., upstream ships the unclosed marker; we run `_remend` so the user sees a clean final message. Not observable in the common case where streams close their own markers. Teams native streaming and the Teams accumulate-and-post path both ship raw `accumulated`, matching upstream after #416; this divergence applies only to the remaining adapters that still route through `_fallback_stream`. | +| Teams group-chat / channel streaming via accumulate-and-post | `TeamsAdapter.stream` accumulates the full text and issues a single `post_message` instead of post+edit, even for group chats and channel threads | Same after vercel/chat#416 (`if (activeStream && !activeStream.canceled) … else { accumulate; postMessage }`) — no divergence at the adapter level | Documented for clarity: the Python port matches upstream's post-#416 behavior of avoiding the post+edit flicker where Teams doesn't support native streaming. The adapter no longer touches `_teams_update` from the streaming path. | +| Teams native streaming hand-rolled wire format (DMs) — **transitional** | `_stream_via_emit` and `_close_stream_session` build Bot Framework streaming payloads (`channelData`/`entities`/`streamSequence`/`streamId`) by hand and post them via `_teams_send`, including a per-emit throttle gate (`native_stream_min_emit_interval_ms`, default 1500ms) honoring caller-supplied `StreamOptions.update_interval_ms` | `IStreamer.emit(text)` from `@microsoft/teams.apps` (npm) handles the wire format and throttling under the hood. The equivalent Python SDK (`microsoft-teams-apps`) only went GA on 2026-05-01 — too late for the 4.27 sync window. | Hand-rolled streaming primitives are tracked for migration to `microsoft-teams-apps` (Python) in a follow-up release (0.4.28 / future Python-only). Once we migrate, this row, the divider-rendering row, and the `_active_streams` accounting all simplify or disappear. Until then we own the wire format ourselves — including the 1500ms emit throttle that protects against Teams' ~1 req/sec streaming-endpoint quota — and surface send failures by re-raising (so `Thread.stream`'s outer accumulator can't record text Teams rejected). | +| Teams native streaming `streamId` placement on `streaminfo` entity (DMs) | `streamId` is added to BOTH `channelData` and the `streaminfo` entity on subsequent and final activities (it is omitted from both on the first chunk because the server has not yet assigned an id) | `IStreamer.emit` handles the wire shape; the upstream JS SDK includes `streamId` on both sites as part of its `streaminfo` entity contract per [MS Bot Framework streaming docs](https://learn.microsoft.com/microsoftteams/platform/bots/streaming-ux#continue-streaming) | Bot Framework REST contract requires the entity-level `streamId` for continuation activities — earlier Python versions set it only on `channelData`, which Teams may treat as a malformed continuation and detach from the original stream. This row documents the contract, not a divergence; both Python and the JS SDK comply. | +| Teams native streaming final-send when first chunk's `id` was empty (DMs) | `_close_stream_session` sends the final `message` activity whenever `text` is non-empty, even if `stream_id` is `None` (Bot Framework REST response returned `{"id": ""}` on the first chunk). The final activity omits `streamId` from `channelData` rather than serializing `None`. | Upstream's `streamViaEmit` awaits the `chunk` event for the first activity's `id`; if Teams returns an empty id, `messageId` becomes `""` and the SDK's auto-close emits the final activity through `IStreamer` regardless | Without the final `message` activity, the Teams client's streaming UI keeps spinning until the platform times the session out — a stuck-loading-state UX failure with no user workaround. We mirror upstream's looser check (text non-empty → ship the final) so the streaming indicator clears even when the Bot Framework REST response surface returns an empty `id`. | +| `RawMessage.text` override field — **transitional** | New optional `text: str \| None = None` field on `RawMessage` (`src/chat_sdk/types.py`). When set by an adapter, `Thread._handle_stream` MUST prefer it over its own local accumulator when constructing the recorded `SentMessage` body / message-history entry. `None` falls back to the local accumulator (backward-compatible default for adapters that don't need the override). Set by `_stream_via_emit` so the recorded message matches what Teams actually shipped, even when chunks were buffered into the throttle window and cancellation skipped the flush emit. | Upstream's `RawMessage` (`packages/chat/src/types.ts`) is `{ id; raw; threadId }` only. Cancellation-text reconciliation lives inside `@microsoft/teams.apps`'s `IStreamer.emit` (the npm SDK owns the buffer and never surfaces a buffered-but-unsent suffix to `chat`). | Direct consequence of the hand-rolled Teams native streaming row above. Without the override, the SDK's local accumulator (which captures every chunk yielded to the adapter, including chunks coalesced into the throttle window) would diverge from what Teams actually accepted whenever a session is canceled with buffered text pending — recording text the user never saw. Disappears alongside the hand-rolled wire format once we migrate to `microsoft-teams-apps` (Python). Regression coverage: `tests/test_thread_faithful.py::test_should_prefer_raw_message_text_override_over_local_accumulator` (would fail if someone "fixes" Thread.stream back to upstream's local-accumulator-only behavior) and `tests/test_teams_native_streaming.py::test_canceled_stream_sets_raw_message_text_override`. | | Teams divider rendering | `card_to_adaptive_card` hoists `separator: True` onto the next sibling (or emits a non-empty Container for a trailing divider) | `convertDividerToElement` emits an empty `Container` with `separator: True` | Upstream shares the same bug: Microsoft Teams renders an empty Container at zero height, so the separator line is effectively invisible. Python port fixes locally (issue #45) rather than blocking on upstream. | | `SlackAdapter.current_token` / `current_client` | Public `@property` accessors that return the request-context-bound token and a preconfigured `AsyncWebClient` | Not exposed (`getToken()` is private on the TS `SlackAdapter`) | Python-only addition (issue #47). Downstream code that calls Slack Web APIs from inside a handler — email resolution, user profile fetches, reaction bookkeeping — otherwise depends on underscore-prefixed helpers. | | `ConcurrencyConfig.max_concurrent` | Enforced via `asyncio.Semaphore` in the `"concurrent"` strategy path; rejects non-integer or `<= 0` values, and rejects any non-`None` `max_concurrent` paired with a non-`"concurrent"` strategy | Accepted into the config type with docstring "Default: Infinity" but never read (3 writes, 0 reads) | Silent correctness bug upstream — consumers setting `max_concurrent=N` with `strategy="concurrent"` reasonably expect an N-way bound on in-flight handlers. We honor the documented contract via a semaphore and fail-fast on misconfiguration so it's never silent. `max_concurrent=None` stays compatible with every strategy (unbounded default). | diff --git a/src/chat_sdk/adapters/teams/adapter.py b/src/chat_sdk/adapters/teams/adapter.py index 33bc85f..dbcc8f1 100644 --- a/src/chat_sdk/adapters/teams/adapter.py +++ b/src/chat_sdk/adapters/teams/adapter.py @@ -59,6 +59,77 @@ _parse_iso, ) + +class _TeamsStreamSession: + """Bookkeeping for a single in-flight native streaming activity. + + Mirrors the upstream Teams SDK ``IStreamer`` surface (``emit``, ``canceled``) + that ``streamViaEmit`` uses in ``packages/adapter-teams/src/index.ts``. The + Python adapter constructs one of these per DM message-handler invocation, + registers it in ``TeamsAdapter._active_streams`` so ``stream()`` can find + it, and closes it after the handler completes. + + Carries the running ``stream_id`` (allocated by Teams on the first + ``streaming`` activity) and an incrementing ``stream_sequence`` so the + Bot Framework streaming protocol's wire shape stays valid. + """ + + __slots__ = ( + "stream_id", + "sequence", + "canceled", + "first_chunk_id", + "_text", + "last_emit_at_ms", + "emit_interval_ms", + ) + + def __init__(self) -> None: + self.stream_id: str | None = None + # Per Bot Framework streaming protocol: streamSequence starts at 1 + # for the first informative/streaming activity and increments by 1. + self.sequence: int = 0 + self.canceled: bool = False + # Captured from the first activity returned by the Bot Framework REST + # API; this becomes the ``streamId`` for subsequent chunks and the + # final message. + self.first_chunk_id: str = "" + self._text: str = "" + # Timestamp (ms, same frame as ``TeamsAdapter._stream_clock_ms``) of + # the most recent successful emit. Read by ``_close_stream_session`` + # so the final ``message`` activity honors the same 1-req/sec quota + # as the streaming ``typing`` activities — Teams' streaming endpoint + # rate-limits ALL activities on a given streamId together, so a close + # immediately after a one-chunk emit would 429 without this throttle. + # ``-inf`` means "no emit happened" → close path skips the wait. + self.last_emit_at_ms: float = float("-inf") + # Per-stream emit interval (ms). Cached on the session by + # ``_stream_via_emit`` so ``_close_stream_session`` honors caller- + # supplied ``StreamOptions.update_interval_ms`` overrides without + # the close path needing access to ``options``. ``None`` means + # "use the adapter default" (e.g. for sessions constructed outside + # the normal stream path, like tests). + self.emit_interval_ms: int | None = None + + def cancel(self) -> None: + """Mark the session canceled. ``stream()`` checks this each chunk.""" + self.canceled = True + + @property + def text(self) -> str: + """Read-only view of the cumulative streamed text. + + External callers (tests, other adapter helpers) should read this + instead of poking at the private ``_text`` attribute. Writes go + through ``_stream_via_emit`` which owns the buffer. + """ + return self._text + + +# Bot Framework streaming protocol values for ``channelData.streamType``. +_STREAM_TYPE_STREAMING = "streaming" +_STREAM_TYPE_FINAL = "final" + MESSAGEID_CAPTURE_PATTERN = re.compile(r"messageid=(\d+)") MESSAGEID_STRIP_PATTERN = re.compile(r";messageid=\d+") CACHE_TTL_MS = 30 * 24 * 60 * 60 * 1000 # 30 days @@ -188,6 +259,29 @@ def __init__(self, config: TeamsAdapterConfig | None = None) -> None: # Shared aiohttp session for connection pooling self._http_session: Any | None = None + # In-flight native streaming sessions, keyed by thread_id. Populated + # by ``_handle_message_activity`` for DMs (which awaits the handler + # so the session stays alive); consulted by ``stream()`` to decide + # between native streaming via emit and the accumulate-and-post + # fallback path. + self._active_streams: dict[str, _TeamsStreamSession] = {} + # Throttle for native DM streaming — Bot Framework streaming is + # ~1 request/second; Microsoft recommends 1.5-2s buffering. See the + # field docstring on TeamsAdapterConfig for full context. + self._native_stream_min_emit_interval_ms: int = config.native_stream_min_emit_interval_ms + # Monotonic-clock callable returning milliseconds since some epoch. + # Injectable so tests can drive throttle behavior without real sleeps. + # Default reads the running event loop's clock — matches what + # ``asyncio.sleep`` would observe. The lazy lambda is intentional: + # there is no running loop at ``__init__`` time. + self._stream_clock_ms: Callable[[], float] = lambda: asyncio.get_running_loop().time() * 1000.0 + # Awaitable sleep keyed by milliseconds. Pairs with + # ``_stream_clock_ms`` so the end-of-stream flush can honor the + # throttle window without forcing real ``asyncio.sleep`` calls in + # tests. Default = real ``asyncio.sleep``; tests substitute an + # AsyncMock so they don't actually wait the configured interval. + self._stream_sleep_ms: Callable[[float], Awaitable[None]] = lambda ms: asyncio.sleep(ms / 1000.0) + @property def name(self) -> str: return self._name @@ -319,7 +413,19 @@ async def _handle_message_activity( activity: dict[str, Any], options: WebhookOptions | None = None, ) -> None: - """Handle message activities.""" + """Handle message activities. + + For DMs we register a :class:`_TeamsStreamSession` and ``await`` the + chat-handler task so :meth:`stream` can dispatch through the native + Bot Framework streaming protocol while the session is live. Group + chats remain fire-and-forget — Teams doesn't support native streaming + in channels/group threads, so :meth:`stream` falls through to the + accumulate-and-post path. + + Mirrors upstream ``handleMessageActivity`` in + ``packages/adapter-teams/src/index.ts``: capture ``ctx.stream`` for + DMs, block until processing completes, then drop the session. + """ if not self._chat: self._logger.warn("Chat instance not initialized, ignoring event") return @@ -354,7 +460,123 @@ async def _handle_message_activity( if is_mention: message.is_mention = True - self._chat.process_message(self, thread_id, message, options) + if not self.is_dm(thread_id): + # Group chat / channel — fire-and-forget. ``stream()`` will see no + # active session and accumulate-and-post. + self._chat.process_message(self, thread_id, message, options) + return + + # DM path: register a streaming session, then block on the handler so + # ``stream()`` can dispatch through the native streaming protocol + # while the session stays alive. We chain a ``waitUntil`` shim on + # top of the caller-supplied one (if any) so a hosting webhook + # framework that respects ``waitUntil`` still gets the underlying + # task — the local ``await`` is purely so we know when to reap the + # session. + session = _TeamsStreamSession() + # Keyed by ``thread_id`` to match upstream ``activeStreams.set(threadId, …)`` + # in ``packages/chat-teams/src/index.ts``. Safe because the default + # per-thread concurrency strategy in ``Chat.handle_incoming_message`` + # serialises DM handlers for the same thread (overlapping webhooks are + # deduped or dropped before they reach a handler, so two ``stream()`` + # calls cannot share a session). A per-handler ``ContextVar`` would + # decouple this from the concurrency strategy but would be a Python-only + # divergence — tracked as a follow-up rather than landed inside the + # parity sync. + self._active_streams[thread_id] = session + loop = asyncio.get_running_loop() + processing_done: asyncio.Future[None] = loop.create_future() + + def _resolve_processing(task: Awaitable[Any]) -> None: + # ``WebhookOptions.wait_until`` receives the chat task; we hook + # done so we can release ``processing_done`` regardless of + # success/failure (mirrors the upstream ``task.then(resolve, + # resolve)`` pattern). + if isinstance(task, asyncio.Task): + + def _on_done(_t: asyncio.Task[Any]) -> None: + if not processing_done.done(): + processing_done.set_result(None) + + task.add_done_callback(_on_done) + elif not processing_done.done(): + # Non-Task awaitables are uncommon on this path, but if we + # ever get one we still need to unblock — resolve eagerly + # so we don't deadlock the webhook handler. + processing_done.set_result(None) + + upstream_wait_until = options.wait_until if options is not None else None + # Track whether the chained wait_until fired synchronously during + # ``process_message``. Used below to detect deduped/dropped + # messages where no chat task was scheduled and we'd otherwise + # hang on ``await processing_done``. + wait_until_invoked = False + + def _chained_wait_until(task: Awaitable[Any]) -> None: + nonlocal wait_until_invoked + wait_until_invoked = True + # Resolve our own gate FIRST, before invoking the upstream + # ``wait_until`` callback. This way, even if the upstream + # callback raises, blocks, or never fires, ``processing_done`` + # is still wired up — making the deadlock-immunity argument + # trivially obvious: the await on ``processing_done`` below + # cannot starve due to a misbehaving caller-supplied hook. + _resolve_processing(task) + if upstream_wait_until is not None: + # Catch synchronous failures in the caller's hook. If we + # let it escape, ``Chat.process_message`` propagates the + # exception, the outer ``try`` skips ``await processing_done``, + # and the ``finally`` tears down the session while the + # underlying chat task is still scheduled — handlers that + # later call ``thread.stream()`` would then miss native + # streaming and fall back to a normal post. Logging keeps + # the failure visible without breaking the streaming path. + try: + upstream_wait_until(task) + except Exception as exc: + self._logger.warn( + "Caller-supplied WebhookOptions.wait_until raised", + {"threadId": thread_id, "error": str(exc)}, + ) + + chained_options = WebhookOptions(wait_until=_chained_wait_until) + + try: + self._chat.process_message(self, thread_id, message, chained_options) + # If ``process_message`` returned without invoking + # ``wait_until`` synchronously, no chat task was scheduled + # (deduped, dropped by the concurrency strategy, or the + # message wasn't admitted for handling). Resolve the gate + # immediately so ``await processing_done`` doesn't hang + # forever — there is no in-flight handler to wait on. + # Note: we check ``wait_until_invoked`` rather than + # ``processing_done.done()`` because the latter is set via + # an ``add_done_callback`` on task COMPLETION; the task is + # scheduled but has not run yet at this point. + if not wait_until_invoked and not processing_done.done(): + processing_done.set_result(None) + try: + await processing_done + except asyncio.CancelledError: + # Caller cancelled the webhook handler — propagate cancel + # into the streaming session so any in-flight ``stream()`` + # exits cleanly without sending more chunks. + session.cancel() + raise + finally: + # Always close the session — sending a final activity if any + # chunks were emitted — and drop the registry entry so a + # subsequent message can register fresh. + current = self._active_streams.get(thread_id) + if current is session: + self._active_streams.pop(thread_id, None) + try: + await self._close_stream_session(thread_id, session) + except Exception as exc: # pragma: no cover — diagnostic-only + self._logger.warn( + "Teams stream finalization failed", + {"threadId": thread_id, "error": str(exc)}, + ) # Keys injected by the SDK's card renderer or Teams transport — not user input. _ACTION_TRANSPORT_KEYS = frozenset({"actionId", "msteams"}) @@ -913,13 +1135,26 @@ async def stream( self, thread_id: str, text_stream: Any, - _options: StreamOptions | None = None, + options: StreamOptions | None = None, ) -> RawMessage: - """Stream responses via post+edit.""" - decoded = self.decode_thread_id(thread_id) - accumulated = "" - message_id: str | None = None + """Stream responses to a Teams conversation. + + DMs use the Bot Framework streaming protocol via :meth:`_stream_via_emit` + when an active streaming session exists (set up by + :meth:`_handle_message_activity`). Group chats / channels accumulate + the stream and post a single message — matching upstream's + post-#416 behavior of avoiding the post+edit flicker where Teams + doesn't support native streaming. See + ``packages/adapter-teams/src/index.ts`` ``stream`` and + ``streamViaEmit`` at upstream commit ``ed46bae``. + """ + session = self._active_streams.get(thread_id) + if session is not None and not session.canceled: + return await self._stream_via_emit(thread_id, text_stream, session, options) + # No native streamer (group chats, proactive messages, or DMs whose + # session was already canceled). Accumulate and post once. + accumulated = "" async for chunk in text_stream: text = "" if isinstance(chunk, str): @@ -928,27 +1163,416 @@ async def stream( text = chunk.get("text", "") if not text: continue - accumulated += text - activity_payload = { - "type": "message", - "text": accumulated, - "textFormat": "markdown", - } - - if message_id: - await self._teams_update(decoded, message_id, activity_payload) - else: - result = await self._teams_send(decoded, activity_payload) - message_id = result.get("id", "") + if not accumulated: + return RawMessage(id="", thread_id=thread_id, raw={"text": ""}) + decoded = self.decode_thread_id(thread_id) + activity_payload = { + "type": "message", + "text": accumulated, + "textFormat": "markdown", + } + result = await self._teams_send(decoded, activity_payload) return RawMessage( - id=message_id or "", + id=result.get("id", ""), thread_id=thread_id, raw={"text": accumulated}, ) + async def _stream_via_emit( + self, + thread_id: str, + text_stream: Any, + session: _TeamsStreamSession, + options: StreamOptions | None = None, + ) -> RawMessage: + """Native Bot Framework streaming: typing chunks + final message. + + Wire format (per Bot Framework streaming protocol): + + - Each non-empty chunk is a ``typing`` activity with + ``channelData = {streamType: "streaming", streamSequence: N, + streamId?: }`` and a parallel ``streaminfo`` entity. Per + the Bot Framework streaming contract, ``streamId`` MUST appear + on the ``streaminfo`` entity (not just ``channelData``) for + subsequent and final activities; the first chunk omits it + everywhere because the server hasn't assigned an id yet. + - On stream completion, a final ``message`` activity is sent by + :meth:`_close_stream_session` (it carries ``streamType: "final"``). + + Throttling: Teams' streaming endpoint enforces ~1 request/second + and Microsoft recommends 1.5-2s buffering. We accumulate every + non-empty chunk locally but only ship a ``typing`` activity once + the emit interval has elapsed since the previous send; in-window + chunks are coalesced into the next eligible emit. The interval + defaults to ``TeamsAdapterConfig.native_stream_min_emit_interval_ms`` + (1500ms) and is overridden per-call by + ``StreamOptions.update_interval_ms`` when provided. + + End-of-stream flush: when the iterator ends, any text that was + buffered (coalesced into the throttle window) but never emitted + as a ``typing`` activity is shipped now via one final forced + ``typing`` emit before this method returns. Without that flush, + buffered text would only ship in :meth:`_close_stream_session`'s + final ``message`` activity — and if THAT send fails (429 / network + blip), ``Thread.stream`` would have already built a ``SentMessage`` + from this method's return value containing text Teams never + accepted (the chat handler returns and ``SentMessage`` is created + before the close runs from the handler's finally block). With the + flush, ``accumulated`` is confirmed-accepted by Teams before we + return, so the close-path final ``message`` activity becomes a + UI-clearing marker whose failure is a stale-streaming-UI cost + rather than a recording inconsistency. + + We never emit a chunk after :attr:`_TeamsStreamSession.canceled` is + set, and we surface stream-iterator and send exceptions to the + caller (after canceling the session) so the close path won't post + a final message that doesn't reflect what the user saw. + """ + decoded = self.decode_thread_id(thread_id) + accumulated = "" + # The cumulative-text snapshot last confirmed-accepted by Teams + # via a successful ``_teams_send``. After the loop we compare + # against ``accumulated`` to decide whether the throttle window + # buffered text that needs an end-of-stream flush. + last_committed_text = "" + + emit_interval_ms: int = ( + options.update_interval_ms + if options is not None and options.update_interval_ms is not None + else self._native_stream_min_emit_interval_ms + ) + # Persist the resolved interval on the session so + # ``_close_stream_session`` can honor the same caller-supplied + # override when throttling the final ``message`` activity. + session.emit_interval_ms = emit_interval_ms + # Tracks when the most recent successful emit landed, in the same + # ms-since-arbitrary-epoch frame as ``self._stream_clock_ms()``. + # ``-inf`` so the first chunk always passes the interval gate + # regardless of what value the clock returns on its first call. + last_emit_at_ms: float = float("-inf") + + try: + async for chunk in text_stream: + if session.canceled: + self._logger.debug("Teams stream canceled by user", {"threadId": thread_id}) + break + + text = "" + if isinstance(chunk, str): + text = chunk + elif isinstance(chunk, dict) and chunk.get("type") == "markdown_text": + text = chunk.get("text", "") + if not text: + continue + + # Always accumulate locally so the close-path final + # ``message`` activity carries the full user-visible + # text. The decision below only governs whether THIS + # chunk triggers an intermediate ``typing`` send. + accumulated += text + + now_ms = self._stream_clock_ms() + if now_ms - last_emit_at_ms < emit_interval_ms: + # Inside the throttle window — coalesce. Buffered text + # ships in the next eligible emit, or in the + # end-of-stream flush below if the iterator ends + # before another window opens. + continue + + result = await self._emit_streaming_activity( + decoded=decoded, + thread_id=thread_id, + session=session, + text=accumulated, + ) + last_committed_text = accumulated + last_emit_at_ms = now_ms + # Persist for the close-path throttle (see + # ``_close_stream_session``). Same value as the local, + # mirrored on the session so close has access without + # plumbing an extra parameter through. + session.last_emit_at_ms = now_ms + + if session.stream_id is None: + chunk_id = result.get("id") or "" + session.first_chunk_id = chunk_id + if chunk_id: + session.stream_id = chunk_id + except (asyncio.CancelledError, KeyboardInterrupt, SystemExit, GeneratorExit): + # Control-flow exceptions: cancel the session so close() doesn't + # post a final message, then re-raise so the caller's cancellation + # propagates correctly. Hazard #5 — orphaned tasks here would + # leave a half-finished streaming activity visible to the user. + session.cancel() + raise + except Exception: + # Iterator raised mid-stream. Cancel so the close path doesn't + # ship a final message. The exception surfaces to the caller + # (the chat handler), which will propagate to the message + # processing task — same shape as the fallback path's + # stream-exception capture, sized for native streaming. + session.cancel() + raise + + # End-of-stream flush — see method docstring for the data-corruption + # rationale. Only runs when the iterator ended normally AND the + # throttle window buffered text since the last successful emit. + # ``session.canceled`` is checked because mid-stream cancellation + # via ``session.cancel()`` (e.g. user-initiated abort) breaks the + # loop above without an exception, and we shouldn't flush text the + # user explicitly canceled out of. + if not session.canceled and accumulated != last_committed_text: + # Wrap the throttle wait + emit in the same control-flow guard + # as the in-loop try/except: ``_stream_sleep_ms`` (and the + # clock read above it) can raise — most importantly + # ``asyncio.CancelledError`` when the chat task is cancelled by + # a supervisor mid-wait. Without this guard the session is + # left ``canceled=False`` while the exception propagates, + # and the finally-block close path would proceed as if the + # stream ran to completion. Mirroring the in-loop pattern + # keeps the close path's invariant consistent: any exception + # leaving this method implies ``session.canceled`` is True. + try: + # Honor the throttle even for the end-of-stream flush — a + # fast LLM stream that finishes inside the throttle window + # after the last successful emit would 429 the Bot Framework + # streaming endpoint otherwise (1 req/sec quota), cancelling + # the stream mid-flight. Wait for the remaining window + # before shipping. + elapsed_ms = self._stream_clock_ms() - last_emit_at_ms + if elapsed_ms < emit_interval_ms: + await self._stream_sleep_ms(emit_interval_ms - elapsed_ms) + # Re-check cancellation after the wait — the chat handler can + # call ``session.cancel()`` from another task while we sleep. + # If we're canceled, skip the emit entirely; the bottom return + # block will surface only ``last_committed_text`` so the + # ``SentMessage`` matches what Teams actually shipped to the + # user (not the buffered suffix the user explicitly canceled + # out of). Same shape as in-loop cancellation. + if not session.canceled: + result = await self._emit_streaming_activity( + decoded=decoded, + thread_id=thread_id, + session=session, + text=accumulated, + ) + last_committed_text = accumulated + # Update the session timestamp post-emit so the + # close-path throttle sees the FLUSH as the most + # recent activity (not the prior in-loop emit). + # Same frame as ``_stream_clock_ms``. + session.last_emit_at_ms = self._stream_clock_ms() + if session.stream_id is None: + chunk_id = result.get("id") or "" + session.first_chunk_id = chunk_id + if chunk_id: + session.stream_id = chunk_id + except (asyncio.CancelledError, KeyboardInterrupt, SystemExit, GeneratorExit): + # Control-flow exceptions during the flush wait or emit: + # cancel the session so the finally-block close path skips + # its final ``message`` activity. ``_emit_streaming_activity`` + # already calls ``session.cancel()`` on its own failures + # before re-raising, so this is a no-op there; the cancel + # here is what covers the throttle-wait path. + session.cancel() + raise + except Exception: + # Same shape as the in-loop ``except Exception``: cancel + # the session before propagating so the close path doesn't + # ship a final ``message`` containing text Teams never + # accepted via this flush emit. + session.cancel() + raise + + # Pick the cumulative text that Teams actually accepted: when + # canceled (in-loop break or during-wait cancellation), some + # text may have been buffered locally but never shipped — return + # only ``last_committed_text`` so ``Thread.stream``'s outer + # accumulator records what the user actually saw. When the stream + # ran to completion, ``last_committed_text`` and ``accumulated`` + # are equal (the flush above committed the final batch), so this + # collapses to ``accumulated`` in the happy path. + final_text = last_committed_text if session.canceled else accumulated + # Persist accumulated text on the session so close() can emit the + # final ``message`` activity with the same content the user saw. + # Direct ``_text`` write is the canonical mutator (the public + # ``text`` property is read-only by design); both classes live in + # the same module so this isn't a cross-module SLF001. + session._text = final_text # noqa: SLF001 + # Set ``text`` (the adapter-authoritative override) so + # ``Thread.stream`` records only what Teams actually shipped. + # Without this, ``Thread._handle_stream``'s local accumulator + # would still include the buffered suffix on cancellation. + return RawMessage( + id=session.first_chunk_id, + thread_id=thread_id, + raw={"text": final_text}, + text=final_text, + ) + + async def _emit_streaming_activity( + self, + *, + decoded: TeamsThreadId, + thread_id: str, + session: _TeamsStreamSession, + text: str, + ) -> dict[str, Any]: + """Send one ``typing`` activity carrying the cumulative ``text`` snapshot. + + Increments ``session.sequence`` on success. Raises (after canceling + the session and logging) if ``_teams_send`` fails — ``Thread.stream`` + accumulates each chunk locally BEFORE yielding to the adapter, so + swallowing the failure here would let the SDK record a SentMessage + / append a message-history entry containing text Teams never + accepted. Returns the REST response dict on success so the caller + can capture the server-assigned ``streamId`` for the first chunk. + + Hazard #7 — only include ``streamId`` once the server has assigned + one. Sending ``"streamId": None`` (or ``""``) on the first chunk + would cause Teams to reject the activity. The Bot Framework REST + contract requires ``streamId`` on BOTH ``channelData`` and the + ``streaminfo`` entity for subsequent activities; setting it only + on ``channelData`` may cause Teams to detach the chunk from the + initial stream. + """ + next_sequence = session.sequence + 1 + channel_data: dict[str, Any] = { + "streamType": _STREAM_TYPE_STREAMING, + "streamSequence": next_sequence, + } + stream_info_entity: dict[str, Any] = { + "type": "streaminfo", + "streamType": _STREAM_TYPE_STREAMING, + "streamSequence": next_sequence, + } + if session.stream_id is not None: + channel_data["streamId"] = session.stream_id + stream_info_entity["streamId"] = session.stream_id + + activity_payload: dict[str, Any] = { + "type": "typing", + "text": text, + "channelData": channel_data, + "entities": [stream_info_entity], + } + + try: + result = await self._teams_send(decoded, activity_payload) + except Exception as exc: + self._logger.warn( + "Teams stream emit failed; canceling stream", + {"threadId": thread_id, "error": str(exc)}, + ) + session.cancel() + raise + + session.sequence = next_sequence + return result + + async def _close_stream_session( + self, + thread_id: str, + session: _TeamsStreamSession, + ) -> None: + """Send the final ``message`` activity to close out a stream. + + No-op if the session was canceled, or if no chunks were ever + emitted (empty ``text``). Otherwise we send the final activity — + even if the server never returned an ``id`` for the first chunk + (i.e. ``stream_id`` is ``None``), in which case we omit + ``streamId`` from ``channelData``. Mirrors upstream's looser + check: as long as the user saw streamed text, ship the final + ``message`` so the Teams streaming UI clears, instead of leaving + it spinning until Teams times the session out client-side. + """ + if session.canceled: + return + # ``text`` is the cumulative buffer; empty means nothing was ever + # emitted (empty stream, or stream canceled before first send). + if not session.text: + return + + # Throttle the final ``message`` activity against the same + # ~1 req/sec quota the streaming ``typing`` activities honor. + # Teams' streaming endpoint rate-limits ALL activities sharing a + # ``streamId`` together — a close immediately after a successful + # one-chunk emit would 429 without this wait, and because the + # exception is swallowed below (fail-soft) Teams would never + # receive the final message while the SDK records the response + # as sent. ``last_emit_at_ms == -inf`` means no in-stream emit + # happened (shouldn't reach here in that case because + # ``session.text`` would be empty, but the guard is cheap). + if session.last_emit_at_ms != float("-inf"): + interval_ms = ( + session.emit_interval_ms + if session.emit_interval_ms is not None + else self._native_stream_min_emit_interval_ms + ) + elapsed_ms = self._stream_clock_ms() - session.last_emit_at_ms + if elapsed_ms < interval_ms: + try: + await self._stream_sleep_ms(interval_ms - elapsed_ms) + except (asyncio.CancelledError, KeyboardInterrupt, SystemExit, GeneratorExit): + # Supervisor-initiated cancellation during the wait. + # Mark the session canceled so any subsequent dispatch + # (none in normal flow, but cheap defense) sees the + # right state, then re-raise so the caller's + # cancellation propagates correctly. + session.cancel() + raise + # Cancellation may have arrived during the wait via + # ``session.cancel()`` from another task — check before + # shipping the final. + if session.canceled: + return + + decoded = self.decode_thread_id(thread_id) + channel_data: dict[str, Any] = { + "streamType": _STREAM_TYPE_FINAL, + } + stream_info_entity: dict[str, Any] = { + "type": "streaminfo", + "streamType": _STREAM_TYPE_FINAL, + } + # Hazard #7 — only include ``streamId`` when we actually have one. + # The Bot Framework REST response can return ``id=""`` even on a + # 200, in which case ``stream_id`` stays ``None`` (see emit guard + # in ``_stream_via_emit``); ship the final without a ``streamId`` + # rather than skipping the send. When present, ``streamId`` must + # appear on BOTH ``channelData`` and the ``streaminfo`` entity + # per the Bot Framework streaming contract for the final activity. + if session.stream_id is not None: + channel_data["streamId"] = session.stream_id + stream_info_entity["streamId"] = session.stream_id + + final_activity: dict[str, Any] = { + "type": "message", + "text": session.text, + "textFormat": "markdown", + "channelData": channel_data, + "entities": [stream_info_entity], + } + try: + await self._teams_send(decoded, final_activity) + except Exception as exc: + # Logged at warn — by the time we get here, ``_stream_via_emit`` + # has already done an end-of-stream flush so every byte of + # ``session.text`` was confirmed-accepted by Teams via a prior + # ``typing`` activity. The user has seen the full text. The + # final ``message`` activity exists to switch the streaming UI + # from typing indicator to message bubble; if that send fails + # the streaming UI may stay until Teams times the session out + # client-side, but the recorded ``SentMessage`` and + # ``_message_history`` entry still match what the user saw. + self._logger.warn( + "Teams stream final activity failed", + {"threadId": thread_id, "error": str(exc)}, + ) + def encode_thread_id(self, platform_data: TeamsThreadId) -> str: """Encode platform data into a thread ID string. diff --git a/src/chat_sdk/adapters/teams/types.py b/src/chat_sdk/adapters/teams/types.py index cda4e2e..61ddae0 100644 --- a/src/chat_sdk/adapters/teams/types.py +++ b/src/chat_sdk/adapters/teams/types.py @@ -72,6 +72,17 @@ class TeamsAdapterConfig: logger: Logger | None = None # Override bot username (optional). user_name: str | None = None + # Minimum interval between native DM streaming activities, in + # milliseconds. Bot Framework's streaming endpoint is throttled to + # roughly 1 request/second; Microsoft recommends buffering tokens + # for 1.5-2 seconds to avoid 429s mid-response. We default to 1500ms + # per https://learn.microsoft.com/microsoftteams/platform/bots/streaming-ux. + # Chunks that arrive within this window after the previous emit are + # accumulated locally and shipped together on the next emit (or in + # the final ``message`` activity if the stream ends inside the + # window). A caller-supplied ``StreamOptions.update_interval_ms`` + # overrides this default for a single stream. + native_stream_min_emit_interval_ms: int = 1500 # ============================================================================= diff --git a/src/chat_sdk/thread.py b/src/chat_sdk/thread.py index 24eba7b..7b0e613 100644 --- a/src/chat_sdk/thread.py +++ b/src/chat_sdk/thread.py @@ -644,9 +644,21 @@ async def _wrapped_stream() -> AsyncIterator[str | StreamChunk]: yield chunk raw_result = await self.adapter.stream(self._id, _wrapped_stream(), options) # type: ignore[union-attr] + # Adapters can override the recorded text via the optional + # ``text`` field on ``RawMessage`` when their internal state + # (cancellation, throttling, partial commits) makes the local + # ``accumulated`` buffer diverge from what the platform + # actually accepted. Default ``None`` falls back to the local + # buffer — backward-compatible for adapters that don't need + # the override (Slack, Discord, GitHub, Google Chat, + # Telegram, Linear, WhatsApp). The Teams native streaming + # path sets it on cancellation to short-circuit the buffered + # suffix that was coalesced into the throttle window but + # never emitted. + recorded_text = raw_result.text if raw_result.text is not None else accumulated sent = self._create_sent_message( raw_result.id, - PostableMarkdown(markdown=accumulated), + PostableMarkdown(markdown=recorded_text), raw_result.thread_id, ) if self._message_history is not None: diff --git a/src/chat_sdk/types.py b/src/chat_sdk/types.py index 6cdfe29..361c9f3 100644 --- a/src/chat_sdk/types.py +++ b/src/chat_sdk/types.py @@ -649,6 +649,26 @@ class RawMessage: id: str thread_id: str raw: Any + # Optional adapter-authoritative text snapshot. When set, callers + # like ``Thread.stream`` MUST prefer this over their own local + # accumulator when constructing the recorded ``SentMessage`` body / + # message-history entry. Used by adapters whose internal state + # (cancellation, throttling, partial commits) makes the local + # accumulator diverge from what the platform actually accepted — + # the Teams native streaming path sets this when a session is + # canceled mid-flight so ``Thread.stream`` records only the text + # Teams shipped, not the buffered suffix the user canceled out of. + # ``None`` means "use the caller's existing logic" — backward + # compatible for adapters that don't need this override. + # + # Divergence from upstream — see docs/UPSTREAM_SYNC.md. Upstream's + # ``RawMessage`` interface (packages/chat/src/types.ts) has only + # ``id``, ``raw``, ``threadId``; the override is Python-only because + # we hand-roll Teams native streaming (upstream uses + # ``@microsoft/teams.apps``'s ``IStreamer.emit`` which owns the + # cancellation-text reconciliation internally). Will simplify or + # disappear once we migrate to ``microsoft-teams-apps`` (Python). + text: str | None = None @dataclass diff --git a/tests/test_teams_coverage.py b/tests/test_teams_coverage.py index b857325..5553fe2 100644 --- a/tests/test_teams_coverage.py +++ b/tests/test_teams_coverage.py @@ -1047,8 +1047,10 @@ async def text_stream(): yield {"type": "markdown_text", "text": "World"} result = await adapter.stream(tid, text_stream()) + # Group chat: accumulate → single send. assert result.id == "msg-1" assert result.raw["text"] == "Hello World" + assert send_call_count == 1 async def test_stream_string_chunks(self): adapter = _make_adapter(logger=_make_logger()) @@ -1068,6 +1070,9 @@ async def text_stream(): result = await adapter.stream(tid, text_stream()) assert "Hello World" in result.raw["text"] + # Group chat: single accumulate-and-post send, no edits. + assert adapter._teams_send.call_count == 1 + assert adapter._teams_update.call_count == 0 async def test_stream_empty_chunks_skipped(self): adapter = _make_adapter(logger=_make_logger()) @@ -1087,6 +1092,7 @@ async def text_stream(): result = await adapter.stream(tid, text_stream()) assert result.id == "" # nothing sent + assert adapter._teams_send.call_count == 0 # --------------------------------------------------------------------------- diff --git a/tests/test_teams_extended.py b/tests/test_teams_extended.py index 15a091b..a40a423 100644 --- a/tests/test_teams_extended.py +++ b/tests/test_teams_extended.py @@ -593,7 +593,12 @@ def test_minimal_certificate_only_requires_private_key(self): class TestStream: @pytest.mark.asyncio - async def test_stream_posts_then_edits(self): + async def test_group_chat_stream_accumulates_and_posts_single_message(self): + """Group chats / channels accumulate the stream and post one message. + + Mirrors upstream after vercel/chat#416: ``streamViaEmit`` is reserved + for DMs; non-DM threads no longer post+edit (which produced flicker). + """ adapter = _make_adapter(logger=_make_logger()) adapter._teams_send = AsyncMock(return_value={"id": "stream-msg-1"}) adapter._teams_update = AsyncMock() @@ -611,12 +616,16 @@ async def text_gen(): result = await adapter.stream(tid, text_gen()) assert result.id == "stream-msg-1" - # First chunk creates, second updates + # Single send carrying the full accumulated text — no edits. assert adapter._teams_send.call_count == 1 - assert adapter._teams_update.call_count == 1 + assert adapter._teams_update.call_count == 0 + sent_payload = adapter._teams_send.await_args.args[1] + assert sent_payload["text"] == "Hello world" + assert sent_payload["type"] == "message" @pytest.mark.asyncio - async def test_stream_empty_chunks_skipped(self): + async def test_group_chat_stream_empty_returns_empty(self): + """Empty streams in a group chat skip the post entirely.""" adapter = _make_adapter(logger=_make_logger()) adapter._teams_send = AsyncMock(return_value={"id": "stream-msg-2"}) adapter._teams_update = AsyncMock() @@ -630,10 +639,11 @@ async def test_stream_empty_chunks_skipped(self): async def text_gen(): yield "" - yield "Hello" yield "" result = await adapter.stream(tid, text_gen()) - assert result.id == "stream-msg-2" - assert adapter._teams_send.call_count == 1 + # No real text → no send, returned RawMessage carries empty content. + assert result.id == "" + assert result.raw["text"] == "" + assert adapter._teams_send.call_count == 0 assert adapter._teams_update.call_count == 0 diff --git a/tests/test_teams_native_streaming.py b/tests/test_teams_native_streaming.py new file mode 100644 index 0000000..f1f468c --- /dev/null +++ b/tests/test_teams_native_streaming.py @@ -0,0 +1,1814 @@ +"""Behavioral tests for Teams native streaming via Bot Framework streaming protocol. + +Port of upstream vercel/chat#416 (commit ed46bae): for DMs, the Teams +adapter dispatches stream chunks through ``IStreamer.emit`` (in TS) — in +Python, through cumulative typing activities tagged with +``channelData.streamType = "streaming"`` and a final ``message`` activity +tagged ``streamType = "final"``. + +These tests pin the wire-level shape (streamSequence increments, streamId +threading, no streamId on the first chunk) and the lifecycle behavior +(typing indicator clears on close, no orphan streams, cancellation drains +cleanly, mid-stream errors surface to the caller). +""" + +from __future__ import annotations + +import asyncio +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from chat_sdk.adapters.teams.adapter import ( + _STREAM_TYPE_FINAL, + _STREAM_TYPE_STREAMING, + TeamsAdapter, + _TeamsStreamSession, +) +from chat_sdk.adapters.teams.types import TeamsAdapterConfig, TeamsThreadId + + +def _make_adapter( + *, + native_stream_min_emit_interval_ms: int | None = None, + clock_step_ms: float = 2000.0, +) -> TeamsAdapter: + """Build a TeamsAdapter with a deterministic native-stream clock. + + The native streaming path throttles emits via + ``_stream_clock_ms()`` (defaults to ``loop.time() * 1000``). Tests + can't rely on real elapsed time without sleeping, so we substitute + a counter-based clock that advances by ``clock_step_ms`` per call. + With the default 2000ms step (> the 1500ms throttle), every chunk + clears the interval gate — matching the pre-throttle test + expectations of "one emit per chunk." Tests that want to exercise + coalescing pass ``clock_step_ms=0`` (or a value below the configured + interval) so chunks land within the same throttle window. + """ + config_kwargs: dict[str, Any] = { + "app_id": "test-app-id", + "app_password": "test-password", + "logger": MagicMock( + debug=MagicMock(), + info=MagicMock(), + warn=MagicMock(), + error=MagicMock(), + ), + } + if native_stream_min_emit_interval_ms is not None: + config_kwargs["native_stream_min_emit_interval_ms"] = native_stream_min_emit_interval_ms + + adapter = TeamsAdapter(TeamsAdapterConfig(**config_kwargs)) + adapter._stream_clock_ms = _advancing_clock(step_ms=clock_step_ms) + # Default sleep is an AsyncMock so the throttle wait in + # ``_stream_via_emit``'s end-of-stream flush returns instantly. Tests + # that care about the wait amount can introspect via + # ``adapter._stream_sleep_ms.await_args``; tests that don't care just + # get the no-op behavior. + adapter._stream_sleep_ms = AsyncMock() + return adapter + + +def _advancing_clock(*, start_ms: float = 0.0, step_ms: float = 2000.0): + """Returns a deterministic ms-clock that advances by ``step_ms`` per call. + + With ``step_ms`` greater than the throttle interval, every call to + ``_stream_clock_ms`` reports enough elapsed time to clear the gate. + With ``step_ms == 0``, every call returns the same value so all + chunks land inside a single throttle window. + """ + state = {"now": start_ms} + + def clock() -> float: + state["now"] += step_ms + return state["now"] + + return clock + + +def _dm_thread_id(adapter: TeamsAdapter) -> str: + """Encode a DM-shaped (non-``19:`` prefix) conversation id.""" + return adapter.encode_thread_id( + TeamsThreadId( + conversation_id="a:1Abc-DM-conversation-id", + service_url="https://smba.trafficmanager.net/teams/", + ) + ) + + +def _channel_thread_id(adapter: TeamsAdapter) -> str: + """Encode a channel-shaped (``19:`` prefix) conversation id.""" + return adapter.encode_thread_id( + TeamsThreadId( + conversation_id="19:abc@thread.tacv2", + service_url="https://smba.trafficmanager.net/teams/", + ) + ) + + +# --------------------------------------------------------------------------- +# Wire-format invariants +# --------------------------------------------------------------------------- + + +class TestNativeStreamingWireFormat: + """Pin the Bot Framework streaming protocol payload shapes.""" + + @pytest.mark.asyncio + async def test_first_chunk_omits_stream_id(self): + """The first chunk must NOT carry a ``streamId`` — the server assigns it. + + Hazard #7: serializing ``"streamId": None`` would cause Teams to + reject the activity. Only emit the key once we have a real id from + the Bot Framework REST response. + """ + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "stream-id-from-server"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "first" + + await adapter._stream_via_emit(tid, text_gen(), session) + + first_payload = adapter._teams_send.await_args_list[0].args[1] + assert first_payload["type"] == "typing" + assert first_payload["text"] == "first" + assert first_payload["channelData"]["streamType"] == _STREAM_TYPE_STREAMING + assert first_payload["channelData"]["streamSequence"] == 1 + assert "streamId" not in first_payload["channelData"] + # ``streamId`` must also be absent from the streaminfo entity on + # the first chunk — there is no server-assigned id yet and + # sending ``"streamId": None`` (or "") would cause Teams to + # reject the activity. + assert "streamId" not in first_payload["entities"][0] + # Subsequent chunks (none here) would inherit streamId from the + # server response; verify the session captured it. + assert session.stream_id == "stream-id-from-server" + assert session.first_chunk_id == "stream-id-from-server" + + @pytest.mark.asyncio + async def test_chunks_include_streaminfo_entity(self): + """Each streaming chunk includes a ``streaminfo`` entity for the protocol.""" + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "id-1"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "hello" + + await adapter._stream_via_emit(tid, text_gen(), session) + + payload = adapter._teams_send.await_args_list[0].args[1] + assert payload["entities"] == [ + { + "type": "streaminfo", + "streamType": _STREAM_TYPE_STREAMING, + "streamSequence": 1, + } + ] + + @pytest.mark.asyncio + async def test_subsequent_chunks_carry_stream_id_and_increment_sequence(self): + """After the first send, every chunk carries the assigned streamId. + + ``streamSequence`` increments by 1 per chunk (1, 2, 3, ...). Each + chunk's ``text`` is the cumulative content (Teams clients render the + latest snapshot — not deltas). + """ + adapter = _make_adapter() + # Server assigns id on the first send; subsequent sends echo back + # arbitrary ids that we ignore (we keep the first one as streamId). + adapter._teams_send = AsyncMock( + side_effect=[ + {"id": "first-server-id"}, + {"id": "ignored-1"}, + {"id": "ignored-2"}, + ] + ) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "Hel" + yield "lo " + yield "world" + + await adapter._stream_via_emit(tid, text_gen(), session) + + payloads = [c.args[1] for c in adapter._teams_send.await_args_list] + assert [p["text"] for p in payloads] == ["Hel", "Hello ", "Hello world"] + assert [p["channelData"]["streamSequence"] for p in payloads] == [1, 2, 3] + # First chunk has no streamId; later chunks carry the captured one. + assert "streamId" not in payloads[0]["channelData"] + assert payloads[1]["channelData"]["streamId"] == "first-server-id" + assert payloads[2]["channelData"]["streamId"] == "first-server-id" + + @pytest.mark.asyncio + async def test_close_session_sends_final_message(self): + """Closing the session sends a ``message`` activity with ``streamType: final``. + + This is what clears the streaming UI on the Teams client. + """ + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "final-server-id"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + session.stream_id = "running-stream-id" + session._text = "Hello world" + + await adapter._close_stream_session(tid, session) + + assert adapter._teams_send.await_count == 1 + payload = adapter._teams_send.await_args.args[1] + assert payload["type"] == "message" + assert payload["text"] == "Hello world" + assert payload["channelData"]["streamType"] == _STREAM_TYPE_FINAL + assert payload["channelData"]["streamId"] == "running-stream-id" + # Bot Framework streaming contract requires ``streamId`` on the + # ``streaminfo`` entity (not just ``channelData``) for the final + # activity. Earlier versions of this adapter omitted it from the + # entity, which Teams may treat as a malformed close — leaving + # the streaming UI spinning until client-side timeout. + assert payload["entities"] == [ + { + "type": "streaminfo", + "streamType": _STREAM_TYPE_FINAL, + "streamId": "running-stream-id", + } + ] + + +# --------------------------------------------------------------------------- +# Throttling (Bot Framework streaming endpoint is ~1 req/sec) +# --------------------------------------------------------------------------- + + +class TestNativeStreamingThrottle: + """Pin the chunk-coalescing behavior that protects against Teams 429s. + + Microsoft's Bot Framework streaming endpoint throttles to roughly + 1 request/second and recommends buffering tokens for 1.5-2 seconds + before sending the next ``streaming`` activity. ``_stream_via_emit`` + coalesces in-window chunks into the cumulative-text snapshot that + ships with the next eligible emit (or in the final ``message`` + activity if the iterator ends inside the window). + """ + + @pytest.mark.asyncio + async def test_intermediate_chunks_within_window_are_coalesced(self): + """Mid-stream chunks in the same throttle window collapse to one emit. + + Without coalescing, a typical LLM token stream (10+ tokens/s) would + rate-limit on the Bot Framework streaming endpoint within the + first second and the response would be cancelled mid-flight. + + Two sends total: the first chunk's intermediate emit + the + end-of-stream flush that ships everything else (see + ``test_buffered_text_flushed_at_end_of_stream`` for why the + flush exists). + """ + # ``clock_step_ms=0`` means every clock check returns the same + # value, so every chunk after the first lands inside the throttle + # window. Only the first chunk emits intermediately; the rest + # ride out in the end-of-stream flush. + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "id-1"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "Hel" + yield "lo " + yield "world" + yield "!" + + result = await adapter._stream_via_emit(tid, text_gen(), session) + + # Two ``typing`` activities: the first chunk's intermediate emit + # plus the end-of-stream flush. The middle two chunks did not + # each get their own emit — they were coalesced. + assert adapter._teams_send.await_count == 2, ( + "Throttle should coalesce intermediate chunks within the same window. " + "Without this, real LLM streams (10+ tokens/s) would 429 the Bot " + "Framework streaming endpoint within the first second." + ) + payloads = [c.args[1] for c in adapter._teams_send.await_args_list] + # Initial intermediate emit: just the first chunk's text. + assert payloads[0]["text"] == "Hel" + # End-of-stream flush: the full cumulative text. + assert payloads[1]["text"] == "Hello world!" + # Both are streaming activities (not the final ``message``). + assert payloads[0]["type"] == "typing" + assert payloads[1]["type"] == "typing" + assert session.text == "Hello world!" + assert result.raw["text"] == "Hello world!" + + @pytest.mark.asyncio + async def test_chunks_beyond_throttle_interval_emit_individually(self): + """When time advances past the interval, each chunk gets its own send.""" + adapter = _make_adapter(clock_step_ms=2000.0) + adapter._teams_send = AsyncMock(side_effect=[{"id": "first"}, {"id": "ignored-1"}, {"id": "ignored-2"}]) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "one " + yield "two " + yield "three" + + await adapter._stream_via_emit(tid, text_gen(), session) + # All three chunks emitted because each clock check reports + # 2000ms elapsed (> the default 1500ms interval). + assert adapter._teams_send.await_count == 3 + texts = [c.args[1]["text"] for c in adapter._teams_send.await_args_list] + assert texts == ["one ", "one two ", "one two three"] + + @pytest.mark.asyncio + async def test_caller_update_interval_ms_overrides_default(self): + """``StreamOptions.update_interval_ms`` overrides the adapter default. + + A caller (e.g. a ``StreamingPlan``) that asks for ``update_interval_ms=0`` + gets one emit per chunk regardless of the adapter's configured + default. Mirrors how the fallback path treats the same field. + """ + from chat_sdk.types import StreamOptions + + # Even with a real (non-zero) default, a caller-supplied 0 should + # disable coalescing. + adapter = _make_adapter( + native_stream_min_emit_interval_ms=1500, + clock_step_ms=10.0, # tiny steps so coalescing WOULD happen at 1500ms default + ) + adapter._teams_send = AsyncMock(return_value={"id": "id-1"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "a" + yield "b" + yield "c" + + # Without the override, the 1500ms throttle + 10ms clock steps + # would coalesce everything into one emit. With override=0, + # every chunk should emit. + opts = StreamOptions() + opts.update_interval_ms = 0 + await adapter._stream_via_emit(tid, text_gen(), session, opts) + assert adapter._teams_send.await_count == 3, ( + "Caller-supplied StreamOptions.update_interval_ms=0 should disable coalescing entirely for this stream" + ) + + @pytest.mark.asyncio + async def test_buffered_text_flushed_at_end_of_stream(self): + """End-of-stream flush guarantees Teams accepted every byte before return. + + **What this prevents (Codex P2):** without the flush, chunks coalesced + in a throttle window would only ship in the close-path ``message`` + activity — and if THAT send fails (429, network blip), ``Thread.stream`` + would already have built a ``SentMessage`` from this method's return + value containing text Teams never accepted. The chat handler returns + and ``SentMessage`` is created BEFORE the close runs from the + handler's finally block, so a swallowed close failure produces a + message-history entry the user never saw. + + With the flush, ``accumulated`` is confirmed-accepted via a forced + ``typing`` emit before ``_stream_via_emit`` returns, so the + ``SentMessage`` matches reality even if the close fails. + """ + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "first-id"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "Hello" + yield " coalesced" + yield " world" + + result = await adapter._stream_via_emit(tid, text_gen(), session) + + # Two typing sends: the first chunk + the end-of-stream flush. + assert adapter._teams_send.await_count == 2 + payloads = [c.args[1] for c in adapter._teams_send.await_args_list] + assert payloads[0]["text"] == "Hello" + assert payloads[1]["text"] == "Hello coalesced world", ( + "End-of-stream flush must carry the full accumulated text. " + "Without this, Thread.stream would record a SentMessage with " + "text Teams never accepted on a close-path send failure." + ) + # Both are streaming typing activities, sequence increments. + assert payloads[0]["channelData"]["streamSequence"] == 1 + assert payloads[1]["channelData"]["streamSequence"] == 2 + # The flush is what Thread.stream's SentMessage will be built from. + assert result.raw["text"] == "Hello coalesced world" + assert session.text == "Hello coalesced world" + + @pytest.mark.asyncio + async def test_flush_failure_propagates_and_cancels_session(self): + """If the end-of-stream flush fails, re-raise — same shape as in-loop emits. + + A close-path failure is now logged at warn (the user already saw + the text via the flush), but a flush failure means buffered text + was never accepted by Teams. Swallowing it would let + ``Thread.stream`` record the buffered text in ``SentMessage`` / + ``_message_history`` even though the user never saw it. Re-raise + so the outer ``Thread.stream`` short-circuits the history append. + """ + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock( + side_effect=[ + {"id": "first-id"}, # initial chunk emit succeeds + RuntimeError("429 on flush"), # end-of-stream flush fails + ] + ) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "first" + yield " buffered" + + with pytest.raises(RuntimeError, match="429 on flush"): + await adapter._stream_via_emit(tid, text_gen(), session) + # Session canceled so the close path skips its final-message activity. + assert session.canceled is True + # Two attempted sends: the first chunk + the flush attempt. + assert adapter._teams_send.await_count == 2 + + @pytest.mark.asyncio + async def test_no_flush_when_iterator_ended_at_window_boundary(self): + """No redundant flush when the last chunk already triggered an emit. + + Regression: the flush should ONLY run when there's buffered text + that hasn't been emitted. If every chunk landed beyond the + throttle window (each got its own emit), there's nothing to + flush and we shouldn't add a redundant duplicate-text send. + """ + adapter = _make_adapter(clock_step_ms=2000.0) + adapter._teams_send = AsyncMock(side_effect=[{"id": "first"}, {"id": "ignored"}]) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "one " + yield "two" + + await adapter._stream_via_emit(tid, text_gen(), session) + # Exactly two sends — one per chunk. No flush because each chunk + # was already emitted intermediately. + assert adapter._teams_send.await_count == 2 + + @pytest.mark.asyncio + async def test_no_flush_after_session_canceled_mid_stream(self): + """Mid-stream cancellation skips the end-of-stream flush. + + ``session.cancel()`` may be set by a user-initiated abort or by + an upstream supervisor; either way, we shouldn't ship buffered + text the user explicitly canceled out of. + """ + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "first-id"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "first" + session.cancel() + # These two are buffered (within window) but session is now + # canceled — the flush should NOT ship them. + yield " buffered" + yield " more" + + await adapter._stream_via_emit(tid, text_gen(), session) + # Only the pre-cancel emit. No flush of buffered text since + # session.canceled is True at end-of-loop. + assert adapter._teams_send.await_count == 1 + assert adapter._teams_send.await_args_list[0].args[1]["text"] == "first" + + @pytest.mark.asyncio + async def test_close_path_final_message_carries_full_accumulated_text(self): + """The close-path final ``message`` activity carries the full text. + + This is what switches the Teams streaming UI from typing indicator + to message bubble. After the end-of-stream flush, this text is + redundant content-wise (already confirmed via the flush typing + emit), but the activity-type change is what the client UI needs. + """ + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "first-id"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "Hello" + yield " coalesced" + yield " world" + + await adapter._stream_via_emit(tid, text_gen(), session) + # Two typing sends so far (initial + end-of-stream flush). + assert adapter._teams_send.await_count == 2 + assert adapter._teams_send.await_args_list[0].args[1]["text"] == "Hello" + assert adapter._teams_send.await_args_list[1].args[1]["text"] == "Hello coalesced world" + + # Now close: the final ``message`` activity carries the full + # accumulated text — switching the streaming UI from typing + # indicator to message bubble. Index [2] because the flush + # already emitted at index [1]. + await adapter._close_stream_session(tid, session) + assert adapter._teams_send.await_count == 3 + final_payload = adapter._teams_send.await_args_list[2].args[1] + assert final_payload["type"] == "message" + assert final_payload["text"] == "Hello coalesced world" + + +# --------------------------------------------------------------------------- +# streamInfo entity contract (Bot Framework REST: streamId on entity + channelData) +# --------------------------------------------------------------------------- + + +class TestFlushThrottle: + """Pin that the end-of-stream flush honors the throttle window. + + Regression test for the case where a fast LLM stream finishes inside + the throttle window after the last successful emit. Without this + wait, the flush force-sends a ``typing`` activity immediately and + risks a 429 from the Bot Framework streaming endpoint (1 req/sec + quota), cancelling the stream mid-flight. + """ + + @pytest.mark.asyncio + async def test_flush_waits_for_throttle_window_when_iterator_ends_in_window(self): + # ``clock_step_ms=0`` keeps every clock call at 0, so the flush + # sees ``elapsed_ms = 0`` after the first emit and must wait the + # full default interval (1500ms). + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "first-id"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "Hello" + yield " world" # coalesced into the throttle window + + await adapter._stream_via_emit(tid, text_gen(), session) + + # Two emits (initial + flush) — the flush still ships, but + # only after waiting the throttle window. + assert adapter._teams_send.await_count == 2 + # Sleep was awaited exactly once (the flush's throttle wait), + # for the full default interval since elapsed_ms = 0. + assert adapter._stream_sleep_ms.await_count == 1 + wait_arg = adapter._stream_sleep_ms.await_args.args[0] + assert wait_arg == 1500.0, ( + f"Expected the flush to wait the full default interval (1500ms) " + f"when the iterator ends with elapsed_ms=0, got {wait_arg}ms. " + f"Without this wait, the flush would 429 on a real Bot Framework " + f"streaming endpoint." + ) + + @pytest.mark.asyncio + async def test_flush_does_not_wait_when_window_already_elapsed(self): + """If enough time passed since the last emit, the flush ships immediately.""" + # 2000ms steps: first chunk emit at t=2000, second chunk emit at + # t=4000. Iterator ends. Flush would see elapsed = clock_now (6000) + # - last_emit (4000) = 2000 >= 1500 → no wait. + # But we also need the flush to actually have something to flush, + # which means at least one buffered chunk. Use a tiny step that + # still > interval to keep things straightforward — clock_step=2000 + # has every chunk emit individually, so there's no buffered text + # at end. We need a stream where some chunks coalesce and the + # window has elapsed by end-of-stream — achievable by having the + # stream end well after the last emit. Simulate via a clock that + # advances 2000ms per call until the loop exits, then... actually + # the simpler shape: yield 2 chunks (both emit individually since + # 2000 > 1500), then iterator ends. accumulated == last_committed, + # so the flush block doesn't run at all. So we don't even test + # "no wait" in this shape. + # + # The cleaner test: a stream where the LAST chunk lands inside + # the window but enough time has passed before that. That's a + # variable-step clock — out of scope for this regression. The + # important property is: when the flush DOES run, it computes + # elapsed correctly and skips the wait when window has elapsed. + # That's covered by the inverse test above (waits when elapsed=0) + # plus the clock-arithmetic itself. + # + # Instead, pin the inverse: when nothing was buffered (all chunks + # already shipped intermediately), the flush block doesn't run + # at all, so no extra sleep call. This catches a regression that + # would always-wait at end-of-stream regardless of state. + adapter = _make_adapter(clock_step_ms=2000.0) + adapter._teams_send = AsyncMock(side_effect=[{"id": "first"}, {"id": "ignored"}]) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "one " + yield "two" + + await adapter._stream_via_emit(tid, text_gen(), session) + # Both chunks emitted in-loop (clock_step > interval). + assert adapter._teams_send.await_count == 2 + # No flush body ran (accumulated == last_committed at end), so + # the flush throttle wait was not invoked. + assert adapter._stream_sleep_ms.await_count == 0 + + @pytest.mark.asyncio + async def test_flush_skips_emit_if_session_canceled_during_wait(self): + """Cancellation during the throttle wait suppresses the flush emit. + + The returned ``RawMessage`` carries ONLY the text Teams actually + accepted (the pre-cancel in-loop emit), not the buffered suffix + the user canceled out of. ``Thread.stream``'s outer accumulator + builds the ``SentMessage`` body from this value, so the SDK's + recorded history must match what the user actually saw — not the + local buffer that never shipped. + """ + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "first-id"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + # When the wait fires, cancel the session before the flush emits. + async def cancel_during_wait(_ms): + session.cancel() + + adapter._stream_sleep_ms = AsyncMock(side_effect=cancel_during_wait) + + async def text_gen(): + yield "Hello" + yield " world" # coalesced into the throttle window, then canceled + + result = await adapter._stream_via_emit(tid, text_gen(), session) + + # Only the initial chunk emit — the flush bailed because the + # session was canceled during the throttle wait. + assert adapter._teams_send.await_count == 1 + # ``Hello`` was committed by the first in-loop emit. `` world`` was + # buffered in the throttle window and the flush was canceled + # before shipping it. Return only what Teams accepted. + assert result.raw["text"] == "Hello", ( + "RawMessage must carry only ``last_committed_text`` when the " + "session is canceled during the flush throttle wait. Returning " + "the buffered suffix would let Thread.stream record text the " + "user canceled out of." + ) + # session._text mirrors the RawMessage so the (skipped) close + # path would also see only the accepted text. + assert session.text == "Hello" + assert session.canceled is True + + @pytest.mark.asyncio + async def test_canceled_stream_sets_raw_message_text_override(self): + """``RawMessage.text`` carries the adapter-authoritative snapshot. + + ``Thread.stream`` builds the recorded ``SentMessage`` from its + own local accumulator, which includes every chunk yielded to + the adapter — even chunks that were coalesced into the throttle + window and never shipped. When the session is canceled + mid-flight the adapter must surface the corrected text via the + explicit ``RawMessage.text`` override so ``Thread.stream`` can + prefer it over the local buffer; without this, the SDK records + text the user never saw. + """ + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "first-id"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def cancel_during_wait(_ms): + session.cancel() + + adapter._stream_sleep_ms = AsyncMock(side_effect=cancel_during_wait) + + async def text_gen(): + yield "Hello" + yield " buffered" + + result = await adapter._stream_via_emit(tid, text_gen(), session) + + # ``RawMessage.text`` MUST be set (not None) on cancellation so + # ``Thread.stream`` can override its local accumulator. + assert result.text == "Hello", ( + "Adapter must populate RawMessage.text on cancellation so " + "Thread.stream's recorded SentMessage matches what Teams " + "shipped. Returning None would silently fall back to " + "Thread.stream's local accumulator (which still contains " + "the buffered suffix the user canceled out of)." + ) + # raw["text"] mirrors for backward-compat with callers that + # introspect raw directly. + assert result.raw["text"] == "Hello" + + @pytest.mark.asyncio + async def test_happy_path_stream_also_sets_raw_message_text(self): + """Non-canceled streams also set ``RawMessage.text`` (== accumulated). + + Symmetry: the adapter always sets the override, so + ``Thread.stream`` always prefers it. Callers don't have to + special-case "is text set or not" — when the adapter ran to + completion the override equals the local accumulator anyway. + """ + adapter = _make_adapter(clock_step_ms=2000.0) + adapter._teams_send = AsyncMock(side_effect=[{"id": "first"}, {"id": "ignored"}]) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "Hello " + yield "world" + + result = await adapter._stream_via_emit(tid, text_gen(), session) + assert result.text == "Hello world" + assert session.canceled is False + + @pytest.mark.asyncio + async def test_in_loop_cancellation_returns_only_committed_text(self): + """Mid-stream cancellation after a coalesced chunk returns only-emitted text. + + Companion to the during-wait test above. When a chunk has been + accumulated locally but coalesced (inside the throttle window), + and then the session is canceled before the next emit, the loop's + ``if session.canceled: break`` exits without flushing. The bottom + return block must surface ``last_committed_text``, not the + locally-buffered ``accumulated``, for the same reason: the + ``SentMessage`` recorded by ``Thread.stream`` must match what + Teams actually accepted (which is what the user saw). + """ + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "first-id"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "Hello" + yield " buffered" # coalesced (inside window) + session.cancel() + yield " never-sent" # cancel-check at top of next iter breaks + + result = await adapter._stream_via_emit(tid, text_gen(), session) + + # Only the first chunk emit. The second chunk was coalesced + # (still inside the window) but the cancel-check at the top of + # iteration 3 broke the loop before another emit. The flush is + # gated on ``not session.canceled`` so it's skipped too. + assert adapter._teams_send.await_count == 1 + assert result.raw["text"] == "Hello", ( + "RawMessage must carry only what Teams actually shipped " + "(``last_committed_text``) when the session is canceled " + "with buffered text still pending. Returning ``accumulated`` " + "would let Thread.stream record `` buffered`` even though " + "the user never saw it." + ) + assert session.text == "Hello" + assert session.canceled is True + + @pytest.mark.asyncio + async def test_flush_wait_cancelled_error_cancels_session(self): + """``asyncio.CancelledError`` during the flush throttle wait cancels the session. + + Without this guard, a supervisor-initiated task cancellation that + fires while ``_stream_sleep_ms`` is awaiting would propagate + ``CancelledError`` out of ``_stream_via_emit`` while + ``session.canceled`` is still ``False``. The adapter's finally- + block close path would then see a non-canceled session and the + invariant "any exception leaving _stream_via_emit implies + ``session.canceled``" would be violated. Mirrors the in-loop + try/except shape so the close path can safely rely on the + invariant when deciding whether to ship a final ``message`` + activity. + """ + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "first-id"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def cancel_raising_sleep(_ms): + raise asyncio.CancelledError + + adapter._stream_sleep_ms = AsyncMock(side_effect=cancel_raising_sleep) + + async def text_gen(): + yield "Hello" + yield " buffered" # coalesced into the throttle window + + with pytest.raises(asyncio.CancelledError): + await adapter._stream_via_emit(tid, text_gen(), session) + + # Invariant: any exception leaving ``_stream_via_emit`` MUST + # leave the session canceled. Without the flush-block try/except, + # ``session.canceled`` would still be False here. + assert session.canceled is True, ( + "Flush-wait CancelledError must cancel the session so the " + "close path's final-message activity is skipped. Otherwise " + "an external task cancellation could leak past the in-loop " + "guard and leave the session in an inconsistent state." + ) + # Only the in-loop emit landed; the flush emit never ran. + assert adapter._teams_send.await_count == 1 + + @pytest.mark.asyncio + async def test_flush_wait_generic_exception_cancels_session(self): + """A non-CancelledError raised by the flush sleep also cancels the session. + + Defense-in-depth for unusual ``_stream_sleep_ms`` implementations + (e.g. a custom sleep that raises a timeout/IO error). Mirrors the + ``except Exception`` branch in the in-loop try/except: cancel the + session before propagating so the close path doesn't ship a + final message containing buffered text Teams never accepted. + """ + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "first-id"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def raising_sleep(_ms): + raise RuntimeError("sleep impl failure") + + adapter._stream_sleep_ms = AsyncMock(side_effect=raising_sleep) + + async def text_gen(): + yield "Hello" + yield " buffered" + + with pytest.raises(RuntimeError, match="sleep impl failure"): + await adapter._stream_via_emit(tid, text_gen(), session) + + assert session.canceled is True + assert adapter._teams_send.await_count == 1 + + +class TestCloseStreamThrottle: + """Pin that the final ``message`` activity honors the 1 req/sec quota. + + Teams' streaming endpoint rate-limits ALL activities sharing a + ``streamId`` together — the streaming ``typing`` activities AND the + final ``message`` activity. A short DM response (one chunk, emitted + immediately) followed by ``_close_stream_session`` would land two + requests in the same second without this throttle, risking a 429 + that the close-path swallows fail-soft (leaving Teams' streaming UI + stuck while the SDK records the response as sent). + """ + + @pytest.mark.asyncio + async def test_close_waits_for_throttle_window_when_emit_was_recent(self): + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "final-id"}) + tid = _dm_thread_id(adapter) + + # Simulate a session whose last in-stream emit happened at clock=0 + # (just now, since the test clock returns 0 every call). The + # close-path should wait the full 1500ms default interval. + session = _TeamsStreamSession() + session._text = "Hello" # noqa: SLF001 + session.stream_id = "stream-1" + session.last_emit_at_ms = 0.0 + session.emit_interval_ms = 1500 + + await adapter._close_stream_session(tid, session) + + # The close path called sleep once with the full interval before + # sending the final activity. + assert adapter._stream_sleep_ms.await_count == 1 + wait_arg = adapter._stream_sleep_ms.await_args.args[0] + assert wait_arg == 1500.0, ( + f"Close path must throttle the final activity against the " + f"1 req/sec quota — got wait of {wait_arg}ms, expected 1500. " + f"Without this wait, a fast LLM stream followed by an " + f"immediate close would 429 and the swallowed exception " + f"would leave Teams' streaming UI stuck." + ) + # The final activity DID ship after the wait. + assert adapter._teams_send.await_count == 1 + final_payload = adapter._teams_send.await_args.args[1] + assert final_payload["channelData"]["streamType"] == _STREAM_TYPE_FINAL + assert final_payload["text"] == "Hello" + + @pytest.mark.asyncio + async def test_close_does_not_throttle_when_window_already_elapsed(self): + """If enough time passed since the last emit, close ships immediately.""" + adapter = _make_adapter(clock_step_ms=2000.0) + adapter._teams_send = AsyncMock(return_value={"id": "final-id"}) + tid = _dm_thread_id(adapter) + + session = _TeamsStreamSession() + session._text = "Hello" # noqa: SLF001 + session.stream_id = "stream-1" + # Mark the last emit at clock=0; the clock advances by 2000ms each + # call so by the time the close-path checks, elapsed >= 2000ms > 1500ms. + session.last_emit_at_ms = 0.0 + session.emit_interval_ms = 1500 + + await adapter._close_stream_session(tid, session) + + # No sleep — window had already elapsed. + assert adapter._stream_sleep_ms.await_count == 0 + # Final activity shipped. + assert adapter._teams_send.await_count == 1 + + @pytest.mark.asyncio + async def test_close_honors_session_interval_override(self): + """``StreamOptions.update_interval_ms`` override propagates to close. + + The interval is cached on the session by ``_stream_via_emit`` + when the stream runs. The close path reads it instead of falling + back to the adapter default — so a caller that asked for + ``update_interval_ms=0`` (no throttle) gets immediate close even + when the last emit was recent. + """ + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "final-id"}) + tid = _dm_thread_id(adapter) + + session = _TeamsStreamSession() + session._text = "Hello" # noqa: SLF001 + session.stream_id = "stream-1" + session.last_emit_at_ms = 0.0 + # Caller asked for no throttle — close should skip the wait even + # though the last emit was at clock=0 (same instant). + session.emit_interval_ms = 0 + + await adapter._close_stream_session(tid, session) + + assert adapter._stream_sleep_ms.await_count == 0 + assert adapter._teams_send.await_count == 1 + + @pytest.mark.asyncio + async def test_close_falls_back_to_adapter_default_when_session_interval_unset(self): + """If ``session.emit_interval_ms`` was never set (defensive path), use the adapter default.""" + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "final-id"}) + tid = _dm_thread_id(adapter) + + session = _TeamsStreamSession() + session._text = "Hello" # noqa: SLF001 + session.stream_id = "stream-1" + session.last_emit_at_ms = 0.0 + # session.emit_interval_ms left at default (None) + assert session.emit_interval_ms is None + + await adapter._close_stream_session(tid, session) + + # Used the adapter default (1500ms). + assert adapter._stream_sleep_ms.await_count == 1 + assert adapter._stream_sleep_ms.await_args.args[0] == 1500.0 + + @pytest.mark.asyncio + async def test_close_does_not_throttle_when_no_emit_happened(self): + """If no in-stream emit happened (last_emit_at_ms == -inf), close skips the wait. + + Shouldn't reach the throttle code in practice because + ``session.text`` would be empty and the close early-returns, but + the guard is cheap defense for sessions constructed manually + (e.g. tests) that set ``_text`` directly without an emit. + """ + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "final-id"}) + tid = _dm_thread_id(adapter) + + session = _TeamsStreamSession() + session._text = "Hello" # noqa: SLF001 + session.stream_id = "stream-1" + # last_emit_at_ms left at default (-inf) + assert session.last_emit_at_ms == float("-inf") + + await adapter._close_stream_session(tid, session) + + # No sleep — no emit ever happened, no throttle to honor. + assert adapter._stream_sleep_ms.await_count == 0 + assert adapter._teams_send.await_count == 1 + + @pytest.mark.asyncio + async def test_close_cancellation_during_wait_skips_final_emit(self): + """If ``session.cancel()`` fires during the close-path wait, skip the final. + + Mirrors the in-loop / flush cancellation invariant: text that + Teams never received shouldn't be marked as shipped. + """ + adapter = _make_adapter(clock_step_ms=0.0) + adapter._teams_send = AsyncMock(return_value={"id": "final-id"}) + tid = _dm_thread_id(adapter) + + session = _TeamsStreamSession() + session._text = "Hello" # noqa: SLF001 + session.stream_id = "stream-1" + session.last_emit_at_ms = 0.0 + session.emit_interval_ms = 1500 + + async def cancel_during_wait(_ms): + session.cancel() + + adapter._stream_sleep_ms = AsyncMock(side_effect=cancel_during_wait) + + await adapter._close_stream_session(tid, session) + + # Sleep was called (entered the wait) but the final never shipped. + assert adapter._stream_sleep_ms.await_count == 1 + assert adapter._teams_send.await_count == 0 + + +class TestStreamInfoEntityContract: + """Pin the wire-format requirement that ``streamId`` lives on the entity too. + + Per the Bot Framework streaming contract, the ``streaminfo`` entity + must carry ``streamId`` on subsequent and final activities, not just + ``channelData``. Earlier versions of this adapter only set it on + ``channelData``, which Teams treats as a malformed continuation + and may detach from the original stream. + """ + + @pytest.mark.asyncio + async def test_subsequent_chunk_streaminfo_entity_carries_stream_id(self): + adapter = _make_adapter(clock_step_ms=2000.0) + adapter._teams_send = AsyncMock(side_effect=[{"id": "stream-id-1"}, {"id": "ignored"}]) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "first" + yield " second" + + await adapter._stream_via_emit(tid, text_gen(), session) + + # First chunk's entity has no streamId (server hasn't assigned). + first_entity = adapter._teams_send.await_args_list[0].args[1]["entities"][0] + assert "streamId" not in first_entity + + # Second chunk's entity MUST carry the captured streamId. + second_entity = adapter._teams_send.await_args_list[1].args[1]["entities"][0] + assert second_entity["streamId"] == "stream-id-1", ( + "Subsequent streaminfo entity must include streamId per Bot " + "Framework streaming contract. Setting it only on channelData " + "may cause Teams to detach the chunk from the initial stream." + ) + # And the channelData level still has it too — both sites required. + second_channel_data = adapter._teams_send.await_args_list[1].args[1]["channelData"] + assert second_channel_data["streamId"] == "stream-id-1" + + # Final-activity streaminfo+streamId is covered by + # ``TestNativeStreamingWireFormat.test_close_session_sends_final_message``; + # we don't duplicate it here. Subsequent-chunk coverage above is unique + # because the streaming-vs-final wire shapes diverge (different + # ``streamType``, the streaming chunks also carry ``streamSequence``), + # so each test targets a distinct activity type. + + +# --------------------------------------------------------------------------- +# Stream lifecycle / dispatch +# --------------------------------------------------------------------------- + + +class TestStreamDispatch: + """Verify the DM vs non-DM routing decision.""" + + @pytest.mark.asyncio + async def test_dm_thread_with_active_session_uses_native_streaming(self): + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "id-1"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "ping" + + await adapter.stream(tid, text_gen()) + + payload = adapter._teams_send.await_args.args[1] + # Native streaming uses ``typing`` (not ``message``) for chunks. + assert payload["type"] == "typing" + assert payload["channelData"]["streamType"] == _STREAM_TYPE_STREAMING + + @pytest.mark.asyncio + async def test_dm_thread_without_active_session_falls_through(self): + """A DM thread with no registered session uses accumulate-and-post. + + This is the proactive-message case — the bot is sending a message + that wasn't triggered by an inbound webhook, so there's no live + streaming context. + """ + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "post-id"}) + tid = _dm_thread_id(adapter) + # No session registered. + + async def text_gen(): + yield "proactive" + + result = await adapter.stream(tid, text_gen()) + payload = adapter._teams_send.await_args.args[1] + assert payload["type"] == "message" + # Single accumulate-and-post send. + assert adapter._teams_send.await_count == 1 + assert result.id == "post-id" + + @pytest.mark.asyncio + async def test_channel_thread_uses_accumulate_and_post(self): + """Channels (``19:`` prefix) accumulate and post — never native streaming.""" + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "chan-id"}) + tid = _channel_thread_id(adapter) + # Even if a session were somehow registered for a channel thread, + # _handle_message_activity wouldn't do that — verify the dispatcher + # behavior with no session, the realistic case. + + async def text_gen(): + yield "Hello " + yield "channel" + + await adapter.stream(tid, text_gen()) + + # Accumulate → single ``message`` send carrying all content. + assert adapter._teams_send.await_count == 1 + payload = adapter._teams_send.await_args.args[1] + assert payload["type"] == "message" + assert payload["text"] == "Hello channel" + + +# --------------------------------------------------------------------------- +# Cancellation and error handling +# --------------------------------------------------------------------------- + + +class TestStreamCancellation: + @pytest.mark.asyncio + async def test_canceled_session_skips_remaining_chunks(self): + """Once ``session.cancel()`` is called, no more typing activities go out.""" + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "first"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "first" + session.cancel() + yield "second" # should not be sent + yield "third" # should not be sent + + await adapter._stream_via_emit(tid, text_gen(), session) + # Only the pre-cancel chunk made it. + assert adapter._teams_send.await_count == 1 + assert adapter._teams_send.await_args.args[1]["text"] == "first" + + @pytest.mark.asyncio + async def test_canceled_session_skips_final_message(self): + """A canceled session does NOT post a final ``message`` activity. + + This avoids "clearing the streaming UI with a fake completion" when + the user really did cancel. + """ + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "id-1"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + session.cancel() + session.stream_id = "stream-1" + session._text = "partial" + + await adapter._close_stream_session(tid, session) + adapter._teams_send.assert_not_called() + + @pytest.mark.asyncio + async def test_close_session_no_chunks_no_op(self): + """Closing a session that never emitted is a no-op (no orphan final).""" + adapter = _make_adapter() + adapter._teams_send = AsyncMock() + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() # never emitted, no stream_id + + await adapter._close_stream_session(tid, session) + adapter._teams_send.assert_not_called() + + @pytest.mark.asyncio + async def test_close_session_sends_final_when_first_chunk_returned_empty_id( + self, + ): + """If Teams accepted chunks but never returned an ``id``, still send the final. + + Regression for the empty-``id`` edge case: the Bot Framework REST + response can be 200 with ``{"id": ""}`` even on a successful + ``typing`` activity send. ``stream_id`` stays ``None`` (the + first-chunk guard skips assignment for the empty string), but + ``text`` is non-empty because the user already saw the streamed + chunks. Without a final ``message`` activity the Teams streaming + UI would spin until Teams times the session out client-side — + ship the final ``message`` anyway, omitting ``streamId`` from + ``channelData``. Mirrors upstream's looser check. + """ + adapter = _make_adapter() + # First call (chunk): returns an empty id. Second call (final): + # succeeds. + adapter._teams_send = AsyncMock(side_effect=[{"id": ""}, {"id": "final-id"}]) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "hello world" + + await adapter._stream_via_emit(tid, text_gen(), session) + + # Sanity: the chunk send went through, but stream_id is unset + # because the server didn't hand us one. + assert session.stream_id is None + assert session.text == "hello world" + + # Now close: the final ``message`` activity must still be sent + # (omitting ``streamId``). + await adapter._close_stream_session(tid, session) + + assert adapter._teams_send.await_count == 2 + final_payload = adapter._teams_send.await_args_list[1].args[1] + assert final_payload["type"] == "message" + assert final_payload["text"] == "hello world" + assert final_payload["channelData"]["streamType"] == _STREAM_TYPE_FINAL + # Critical: no streamId key when the server never assigned one, + # rather than serializing ``"streamId": None``. + assert "streamId" not in final_payload["channelData"] + assert final_payload["entities"] == [{"type": "streaminfo", "streamType": _STREAM_TYPE_FINAL}] + + +class TestStreamErrors: + @pytest.mark.asyncio + async def test_iterator_exception_cancels_and_reraises(self): + """If the source stream raises mid-iteration, cancel and re-raise. + + Mirrors the fallback-stream exception-capture divergence: native + streaming's analog is to mark the session canceled (so close() + doesn't post a final message that doesn't reflect the user's + view) and propagate the original error. + """ + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "id-1"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + class StreamBoom(RuntimeError): + pass + + async def text_gen(): + yield "good" + raise StreamBoom("LLM connection dropped") + + with pytest.raises(StreamBoom, match="LLM connection dropped"): + await adapter._stream_via_emit(tid, text_gen(), session) + + assert session.canceled is True + # The pre-error chunk was still sent. + assert adapter._teams_send.await_count == 1 + + @pytest.mark.asyncio + async def test_emit_send_failure_propagates_and_cancels_session(self): + """A 429 / network error mid-stream re-raises and cancels the session. + + What to fix if this fails: ``_stream_via_emit`` must propagate the + send exception (not soft-cancel + return a partial RawMessage). + ``Thread.stream`` accumulates each chunk locally BEFORE yielding to + the adapter, so swallowing the failure here would let the SDK + record a SentMessage / append a message-history entry containing + text Teams never accepted. Re-raising short-circuits the + post-stream history append in ``Thread.stream`` so the recorded + message matches what the user actually saw. See + ``src/chat_sdk/adapters/teams/adapter.py`` around the + ``_teams_send`` ``except`` block in ``_stream_via_emit``. + """ + adapter = _make_adapter() + adapter._teams_send = AsyncMock( + side_effect=[ + {"id": "id-1"}, + RuntimeError("429 Too Many Requests"), + ] + ) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "hello" + yield "world" + yield "should-not-send" + + with pytest.raises(RuntimeError, match="429 Too Many Requests"): + await adapter._stream_via_emit(tid, text_gen(), session) + assert session.canceled is True + # Two attempted sends (first ok, second failed); no third. + assert adapter._teams_send.await_count == 2 + # session.sequence stays at 1 (first send incremented it; second + # didn't because it failed before commit). + assert session.sequence == 1 + + @pytest.mark.asyncio + async def test_cancelled_error_propagates_and_marks_session_canceled(self): + """asyncio.CancelledError propagates and cancels the session.""" + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "id-1"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "before-cancel" + raise asyncio.CancelledError + + with pytest.raises(asyncio.CancelledError): + await adapter._stream_via_emit(tid, text_gen(), session) + assert session.canceled is True + + +# --------------------------------------------------------------------------- +# Edge cases +# --------------------------------------------------------------------------- + + +class TestStreamEdgeCases: + @pytest.mark.asyncio + async def test_empty_string_chunks_skipped_in_native_streaming(self): + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "id-1"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "" + yield "real" + yield "" + + await adapter._stream_via_emit(tid, text_gen(), session) + assert adapter._teams_send.await_count == 1 + assert session.sequence == 1 + + @pytest.mark.asyncio + async def test_one_chunk_stream_yields_id_and_text(self): + """Very-short streams (one chunk) round-trip correctly.""" + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "only-id"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield "lonely" + + result = await adapter._stream_via_emit(tid, text_gen(), session) + assert result.id == "only-id" + assert result.raw["text"] == "lonely" + assert session.stream_id == "only-id" + + @pytest.mark.asyncio + async def test_dict_chunks_extract_text(self): + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "id-1"}) + tid = _dm_thread_id(adapter) + session = _TeamsStreamSession() + adapter._active_streams[tid] = session + + async def text_gen(): + yield {"type": "markdown_text", "text": "Hi"} + yield {"type": "markdown_text", "text": " there"} + yield {"type": "other", "data": "ignored"} + + result = await adapter._stream_via_emit(tid, text_gen(), session) + assert result.raw["text"] == "Hi there" + assert adapter._teams_send.await_count == 2 + + @pytest.mark.asyncio + async def test_stream_sequence_no_overflow_concern(self): + """``streamSequence`` is a Python ``int`` — overflow is not a concern. + + Adversarial check (per docs/SELF_REVIEW.md): TS uses a JS number, + which would lose precision past 2**53. Python ints are unbounded; + we don't add a saturation check here because Bot Framework streams + don't last long enough to approach a problematic count, and adding + one would silently change behavior. This test pins the assumption. + """ + session = _TeamsStreamSession() + session.sequence = 2**60 + session.sequence += 1 + # Still increments cleanly, no exceptions, exact value. + assert session.sequence == 2**60 + 1 + + +# --------------------------------------------------------------------------- +# Webhook-level lifecycle (end-to-end through _handle_message_activity) +# --------------------------------------------------------------------------- + + +class TestHandleMessageActivityLifecycle: + """Verify the message-activity → process_message → stream → close flow.""" + + @pytest.mark.asyncio + async def test_caller_wait_until_raise_does_not_kill_native_streaming(self): + """A caller-supplied ``WebhookOptions.wait_until`` that raises must + NOT tear down the DM streaming session before the chat task runs. + + What to fix if this fails: in + ``src/chat_sdk/adapters/teams/adapter.py`` ``_chained_wait_until``, + the call to the upstream ``wait_until`` must be wrapped in + ``try/except`` (and logged). Otherwise the synchronous raise + escapes through ``Chat.process_message``, the outer ``try`` skips + ``await processing_done``, and the ``finally`` removes the session + while the chat task is still scheduled. The handler's later + ``thread.stream()`` call would then miss native streaming and + fall back to a normal post. + """ + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "id-1"}) + + tid = _dm_thread_id(adapter) + + # Build a chat that schedules the streaming task AND invokes + # a deliberately-raising upstream wait_until. + chat = MagicMock() + chat.get_state = MagicMock(return_value=None) + stream_calls: list[str] = [] + + def process_message(adapter_arg, thread_id, message, options): + async def _do_stream(): + async def gen(): + yield "hi" + + # Snapshot whether native streaming is still wired up at + # the moment the chat task runs. + stream_calls.append("native" if thread_id in adapter_arg._active_streams else "fallback") + await adapter_arg.stream(thread_id, gen()) + + task = asyncio.get_running_loop().create_task(_do_stream()) + # Caller-supplied wait_until raises synchronously. The chained + # wrapper must swallow this so processing_done still resolves. + options.wait_until(task) + + chat.process_message = process_message + adapter._chat = chat + + # Inject a raising upstream wait_until via WebhookOptions. + from chat_sdk.types import WebhookOptions + + def raising_wait_until(_task: Any) -> None: + raise RuntimeError("caller wait_until exploded") + + upstream_options = WebhookOptions(wait_until=raising_wait_until) + + activity = { + "type": "message", + "id": "incoming-1", + "text": "user said something", + "from": {"id": "user-1", "name": "User One"}, + "conversation": {"id": "a:1Abc-DM-conversation-id"}, + "serviceUrl": "https://smba.trafficmanager.net/teams/", + } + + # Should NOT raise — the chained wrapper logs and continues. + await adapter._handle_message_activity(activity, upstream_options) + + # The streaming task ran while the session was still registered. + assert stream_calls == ["native"], ( + "Caller wait_until raise tore down the session before the chat " + "task ran; the handler fell back to a normal post instead of " + "native Teams streaming" + ) + # Session was cleaned up after the task finished. + assert tid not in adapter._active_streams + + @pytest.mark.asyncio + async def test_dm_message_activity_registers_session_and_finalizes(self): + """A DM message activity registers a session, awaits processing, then drops it.""" + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "id-1"}) + + tid = _dm_thread_id(adapter) + + # Build a fake chat that streams during processing. + chat = MagicMock() + chat.get_state = MagicMock(return_value=None) + + captured_session: dict[str, Any] = {} + + def process_message(adapter_arg, thread_id, message, options): + assert thread_id == tid + # The session should be registered by the time process_message + # is invoked, so the streaming dispatch sees it. + captured_session["session"] = adapter_arg._active_streams[thread_id] + + async def _do_stream(): + async def gen(): + yield "hi" + + await adapter_arg.stream(thread_id, gen()) + + task = asyncio.get_running_loop().create_task(_do_stream()) + options.wait_until(task) + + chat.process_message = process_message + adapter._chat = chat + + activity = { + "type": "message", + "id": "incoming-1", + "text": "user said something", + "from": {"id": "user-1", "name": "User One"}, + "conversation": {"id": "a:1Abc-DM-conversation-id"}, + "serviceUrl": "https://smba.trafficmanager.net/teams/", + } + + await adapter._handle_message_activity(activity) + + # After the handler returns, the session should have been removed. + assert tid not in adapter._active_streams + # And the session was closed: a final ``message`` activity went out + # in addition to the streaming chunk. + send_payloads = [c.args[1] for c in adapter._teams_send.await_args_list] + types = [p["type"] for p in send_payloads] + assert "typing" in types + assert "message" in types + final_payload = next(p for p in send_payloads if p["type"] == "message") + assert final_payload["channelData"]["streamType"] == _STREAM_TYPE_FINAL + + @pytest.mark.asyncio + async def test_channel_message_does_not_register_session(self): + """Channel/group messages skip session registration entirely.""" + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "id-1"}) + + # Conversation id is constructed inside the activity dict below; + # no separate thread-id variable needed for assertions here. + + chat = MagicMock() + chat.get_state = MagicMock(return_value=None) + seen_active_streams: dict[str, Any] = {} + + def process_message(adapter_arg, thread_id, message, options): + seen_active_streams["snapshot"] = dict(adapter_arg._active_streams) + + chat.process_message = process_message + adapter._chat = chat + + activity = { + "type": "message", + "id": "incoming-2", + "text": "channel message", + "from": {"id": "user-2", "name": "User Two"}, + "conversation": {"id": "19:abc@thread.tacv2"}, + "serviceUrl": "https://smba.trafficmanager.net/teams/", + } + + await adapter._handle_message_activity(activity) + + # No session was ever registered for the channel thread. + assert seen_active_streams["snapshot"] == {} + assert adapter._active_streams == {} + + @pytest.mark.asyncio + async def test_handler_exception_after_partial_stream_drops_session_and_closes(self): + """A handler that raises AFTER streaming still ships the final close + activity and drops the session. + + Streams one chunk, then raises. The session must be removed from the + registry, and ``_close_stream_session`` must still have shipped a + final ``message`` activity (``streamType: "final"``) carrying the + text the user already saw — otherwise the Teams streaming UI keeps + spinning until Teams times the session out client-side. + """ + adapter = _make_adapter() + adapter._teams_send = AsyncMock(return_value={"id": "id-1"}) + + tid = _dm_thread_id(adapter) + + chat = MagicMock() + chat.get_state = MagicMock(return_value=None) + + def process_message(adapter_arg, thread_id, message, options): + async def _stream_then_fail(): + async def gen(): + yield "partial" + + await adapter_arg.stream(thread_id, gen()) + raise RuntimeError("handler boom") + + task = asyncio.get_running_loop().create_task(_stream_then_fail()) + options.wait_until(task) + + chat.process_message = process_message + adapter._chat = chat + + activity = { + "type": "message", + "id": "incoming-3", + "text": "user msg", + "from": {"id": "user-3", "name": "User Three"}, + "conversation": {"id": "a:1Abc-DM-conversation-id"}, + "serviceUrl": "https://smba.trafficmanager.net/teams/", + } + + await adapter._handle_message_activity(activity) + + # Registry was cleaned up. + assert tid not in adapter._active_streams + # And the close path actually fired: typing chunk + final message, + # in that order. Without the final message the Teams streaming UI + # would keep spinning until Teams times the session out. + send_payloads = [c.args[1] for c in adapter._teams_send.await_args_list] + types = [p["type"] for p in send_payloads] + assert "typing" in types, "Streaming chunk before the raise was never sent" + assert "message" in types, ( + "Final close activity was not sent after the handler raised — " + "_close_stream_session must run from the adapter's finally even " + "when the chat task raised" + ) + final_payload = next(p for p in send_payloads if p["type"] == "message") + assert final_payload["channelData"]["streamType"] == _STREAM_TYPE_FINAL + # And the final activity carries the text the user actually saw. + assert final_payload["text"] == "partial" + + +# --------------------------------------------------------------------------- +# Pass-interaction: two simultaneous DM streams to the same user +# --------------------------------------------------------------------------- + + +class TestPassInteraction: + @pytest.mark.asyncio + async def test_distinct_dm_threads_each_have_isolated_session_state(self): + """Two DM threads streaming in parallel must not share session state. + + This pins the ISOLATION property when sessions are explicitly + passed to ``_stream_via_emit`` (the registry is bypassed). Two + DMs in flight from the same bot to the same user (one per + thread) must each carry their own ``streamId`` and + ``streamSequence``. + + Same-thread concurrency (the ``_active_streams`` race) is a + DIFFERENT property — see + ``test_same_thread_concurrent_handlers_clobber_active_stream``. + """ + adapter = _make_adapter() + # Distinct server ids per send so we can verify thread-to-id mapping. + send_log: list[tuple[str, dict[str, Any]]] = [] + + async def fake_send(decoded, payload): + send_log.append((decoded.conversation_id, payload)) + return {"id": f"id-for-{decoded.conversation_id}"} + + adapter._teams_send = fake_send + + tid_a = adapter.encode_thread_id( + TeamsThreadId( + conversation_id="a:1Conv-A", + service_url="https://smba.trafficmanager.net/teams/", + ) + ) + tid_b = adapter.encode_thread_id( + TeamsThreadId( + conversation_id="a:1Conv-B", + service_url="https://smba.trafficmanager.net/teams/", + ) + ) + + session_a = _TeamsStreamSession() + session_b = _TeamsStreamSession() + adapter._active_streams[tid_a] = session_a + adapter._active_streams[tid_b] = session_b + + async def gen_a(): + yield "A1" + await asyncio.sleep(0) # yield control + yield "A2" + + async def gen_b(): + yield "B1" + await asyncio.sleep(0) + yield "B2" + + await asyncio.gather( + adapter._stream_via_emit(tid_a, gen_a(), session_a), + adapter._stream_via_emit(tid_b, gen_b(), session_b), + ) + + # Each session got its own server-assigned streamId. + assert session_a.stream_id == "id-for-a:1Conv-A" + assert session_b.stream_id == "id-for-a:1Conv-B" + # Each session's sequence counts only its own chunks. + assert session_a.sequence == 2 + assert session_b.sequence == 2 + # No bleed-through: A's payloads were posted to A's conversation, + # and B's to B's. + for conv_id, payload in send_log: + assert payload["text"].startswith("A" if "A" in conv_id else "B") + + @pytest.mark.asyncio + async def test_same_thread_concurrent_handlers_clobber_active_stream(self): + """Two near-simultaneous webhooks for the SAME DM thread. + + Realistic case: a user double-sends, or two webhooks land on the + same thread before the first finishes. ``_active_streams`` is a + plain ``dict`` keyed by ``thread_id``, so the second registration + overwrites the first — pin that behavior here so a future change + to add per-thread queueing/locking is a deliberate decision, not + an accidental observable change. + + Upstream's ``activeStreams`` is also a plain ``Map`` with the + same overwrite semantics; this test mirrors that contract. + """ + adapter = _make_adapter() + # Track each session that gets registered, in the order of registration. + registered_sessions: list[_TeamsStreamSession] = [] + # Snapshot the registry contents immediately AFTER each handler's + # process_message call so we can pin the clobber. + post_registration_snapshots: list[_TeamsStreamSession] = [] + + # Block both handlers on a barrier so the second registration races + # the first while the first is still "in flight". This pins the + # registry behavior under genuine overlap, not just sequential calls. + first_registered = asyncio.Event() + release_handlers = asyncio.Event() + + adapter._teams_send = AsyncMock(return_value={"id": "send-id"}) + + chat = MagicMock() + chat.get_state = MagicMock(return_value=None) + + def process_message(adapter_arg, thread_id, message, options): + # Snapshot the session that THIS handler call registered. + registered_sessions.append(adapter_arg._active_streams[thread_id]) + + async def _handler(): + # Hold both handlers open across the barrier so they truly + # overlap. After release, snapshot the registry — by this + # point both handlers have registered, and the LATER + # registration must have won. + if not first_registered.is_set(): + first_registered.set() + await release_handlers.wait() + post_registration_snapshots.append(adapter_arg._active_streams.get(thread_id)) + + task = asyncio.get_running_loop().create_task(_handler()) + options.wait_until(task) + + chat.process_message = process_message + adapter._chat = chat + + tid = _dm_thread_id(adapter) + activity = { + "type": "message", + "id": "incoming-same-thread", + "text": "user said something", + "from": {"id": "user-1", "name": "User One"}, + "conversation": {"id": "a:1Abc-DM-conversation-id"}, + "serviceUrl": "https://smba.trafficmanager.net/teams/", + } + + async def _drive_two_handlers(): + # Start the first; wait until it has registered before launching + # the second so the second observes (and clobbers) the first's + # registry entry. Then release both. + t1 = asyncio.create_task(adapter._handle_message_activity(activity)) + await first_registered.wait() + t2 = asyncio.create_task(adapter._handle_message_activity(activity)) + # Give the second handler a tick to register. + await asyncio.sleep(0) + release_handlers.set() + await asyncio.gather(t1, t2) + + await _drive_two_handlers() + + # Two distinct sessions were created. + assert len(registered_sessions) == 2 + first_session, second_session = registered_sessions + assert first_session is not second_session + # Pin upstream's plain-Map clobber semantics: BOTH in-flight + # handlers, when they look up the registry post-overlap, see the + # SECOND session — the first's entry was overwritten in place. + # If a future change adds per-thread queueing/locking it must be + # a deliberate decision (i.e. update this test). + assert post_registration_snapshots == [second_session, second_session] + # After both handlers exit, registry is empty. Handler 2's + # finally-block matches ``current is session_2`` and pops; handler + # 1's finally-block sees the entry already gone (or not its own) + # and skips the pop — either way the dict ends empty. + assert tid not in adapter._active_streams diff --git a/tests/test_thread_faithful.py b/tests/test_thread_faithful.py index 693591d..b9647e1 100644 --- a/tests/test_thread_faithful.py +++ b/tests/test_thread_faithful.py @@ -286,6 +286,80 @@ async def test_should_use_adapter_native_streaming_when_available(self): # Should NOT call postMessage for fallback assert len(adapter._post_calls) == 0 + @pytest.mark.asyncio + async def test_should_prefer_raw_message_text_override_over_local_accumulator(self): + """When ``adapter.stream`` returns ``RawMessage`` with ``text`` set, the + recorded ``SentMessage`` body MUST come from that override, not from + Thread.stream's local accumulator. + + This is the contract that makes Teams native streaming's + cancellation path consistent: ``Thread._handle_stream`` + accumulates each chunk into a local buffer BEFORE yielding to + the adapter, so on cancellation the buffer includes text the + adapter never shipped to the platform. The adapter signals the + corrected snapshot via ``RawMessage.text``; without honoring it + the SDK's recorded ``SentMessage`` would diverge from what the + user actually saw. + """ + adapter = create_mock_adapter() + state = create_mock_state() + + # Adapter drains the iterator (matching the real-world contract — + # ``_wrapped_stream`` only populates Thread.stream's local + # accumulator if the adapter actually iterates it) and reports + # back via the override that only "Hello" was shipped even though + # "Hello world" was yielded. + async def mock_stream(thread_id, text_stream, options=None): + async for _chunk in text_stream: + pass + return RawMessage( + id="msg-1", + thread_id="t1", + raw={"text": "Hello"}, + text="Hello", + ) + + adapter.stream = mock_stream # type: ignore[attr-defined] + + thread = _make_thread(adapter, state) + text_stream = _create_text_stream(["Hello", " world"]) + result = await thread.post(text_stream) + + assert result.text == "Hello", ( + "Thread.stream must prefer RawMessage.text over its local " + "accumulator. Returning the local 'Hello world' would let " + "Thread.stream record text the adapter said was never shipped." + ) + + @pytest.mark.asyncio + async def test_should_fall_back_to_local_accumulator_when_text_override_is_none(self): + """Default behavior: when ``RawMessage.text`` is ``None`` (the + backward-compatible default for adapters that don't set it), + Thread.stream falls back to its local accumulator. + + Critical for adapters like Slack/Discord/GitHub/Linear/etc. that + don't need the override — adding the new field must not change + their recorded text. + """ + adapter = create_mock_adapter() + state = create_mock_state() + + # No ``text`` set — defaults to None. Adapter drains the + # iterator so Thread.stream's local accumulator populates. + async def mock_stream(thread_id, text_stream, options=None): + async for _chunk in text_stream: + pass + return RawMessage(id="msg-1", thread_id="t1", raw={}) + + adapter.stream = mock_stream # type: ignore[attr-defined] + + thread = _make_thread(adapter, state) + text_stream = _create_text_stream(["Hello", " ", "world"]) + result = await thread.post(text_stream) + + # Local accumulator wins because the adapter didn't override. + assert result.text == "Hello world" + # it("should fall back to post+edit when adapter has no native streaming") @pytest.mark.asyncio async def test_should_fall_back_to_postedit_when_adapter_has_no_native_streaming(self):