diff --git a/.changeset/beige-forks-judge.md b/.changeset/beige-forks-judge.md new file mode 100644 index 00000000..410bd68c --- /dev/null +++ b/.changeset/beige-forks-judge.md @@ -0,0 +1,6 @@ +--- +"chat": patch +--- + +Fix Slack structured streaming when `thread.post(stream)` is called from a handler created by an interactive (`block_actions`) payload. +The team ID is now resolved from `team.id` in addition to `team_id` / `team`. diff --git a/packages/adapter-slack/README.md b/packages/adapter-slack/README.md index a433b26a..90281686 100644 --- a/packages/adapter-slack/README.md +++ b/packages/adapter-slack/README.md @@ -436,10 +436,7 @@ settings: When streaming in an assistant thread, you can attach Block Kit elements to the final message: ```typescript -const raw = message.raw as { team_id?: string; team?: string }; -await thread.adapter.stream(thread.id, textStream, { - recipientUserId: message.author.userId, - recipientTeamId: raw.team_id ?? raw.team, +await thread.post(textStream, { stopBlocks: [ { type: "actions", elements: [{ type: "button", text: { type: "plain_text", text: "Retry" }, action_id: "retry" }] }, ], diff --git a/packages/adapter-slack/src/index.test.ts b/packages/adapter-slack/src/index.test.ts index 04cee9fc..9540a1c5 100644 --- a/packages/adapter-slack/src/index.test.ts +++ b/packages/adapter-slack/src/index.test.ts @@ -2632,6 +2632,7 @@ interface MockableClient { update: ReturnType; delete: ReturnType; }; + chatStream: ReturnType; conversations: { open: ReturnType; replies: ReturnType; diff --git a/packages/chat/src/thread.test.ts b/packages/chat/src/thread.test.ts index bd40caab..6a213e41 100644 --- a/packages/chat/src/thread.test.ts +++ b/packages/chat/src/thread.test.ts @@ -1,5 +1,6 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { Card } from "./cards"; +import type { Message } from "./message"; import { createMockAdapter, createMockState, @@ -9,7 +10,7 @@ import { import { Plan } from "./plan"; import { StreamingPlan } from "./streaming-plan"; import { ThreadImpl } from "./thread"; -import type { Adapter, Message, ScheduledMessage, StreamChunk } from "./types"; +import type { Adapter, ScheduledMessage, StreamChunk } from "./types"; import { NotImplementedError } from "./types"; describe("ThreadImpl", () => { @@ -595,7 +596,34 @@ describe("ThreadImpl", () => { } }); - it("should pass stream options from current message context", async () => { + it.each([ + { + expectedTeamId: "T123", + label: "team_id", + raw: { team_id: "T123", type: "app_mention" }, + }, + { + expectedTeamId: "T234", + label: "team string", + raw: { team: "T234", type: "message" }, + }, + { + expectedTeamId: "T345", + label: "team.id", + raw: { team: { id: "T345" }, type: "block_actions" }, + }, + { + expectedTeamId: "T456", + label: "user.team_id fallback", + raw: { + type: "block_actions", + user: { team_id: "T456" }, + }, + }, + ])("should pass stream options from Slack current message context via $label", async ({ + raw, + expectedTeamId, + }) => { const mockStream = vi.fn().mockResolvedValue({ id: "msg-stream", threadId: "t1", @@ -603,18 +631,13 @@ describe("ThreadImpl", () => { }); mockAdapter.stream = mockStream; - // Create thread with current message context const threadWithContext = new ThreadImpl({ id: "slack:C123:1234.5678", adapter: mockAdapter, channelId: "C123", stateAdapter: mockState, - currentMessage: { - id: "original-msg", - threadId: "slack:C123:1234.5678", - text: "test", - formatted: { type: "root", children: [] }, - raw: { team_id: "T123" }, + currentMessage: createTestMessage("original-msg", "test", { + raw, author: { userId: "U456", userName: "user", @@ -622,9 +645,7 @@ describe("ThreadImpl", () => { isBot: false, isMe: false, }, - metadata: { dateSent: new Date(), edited: false }, - attachments: [], - }, + }), }); const textStream = createTextStream(["Hello"]); @@ -635,11 +656,66 @@ describe("ThreadImpl", () => { expect.any(Object), expect.objectContaining({ recipientUserId: "U456", - recipientTeamId: "T123", + recipientTeamId: expectedTeamId, }) ); }); + it("should forward structured stream chunks to adapter.stream from an action-created thread", async () => { + const mockStream = vi.fn().mockResolvedValue({ + id: "msg-stream", + threadId: "t1", + raw: "Hello", + }); + mockAdapter.stream = mockStream; + + const threadWithActionContext = new ThreadImpl({ + id: "slack:C123:1234.5678", + adapter: mockAdapter, + channelId: "C123", + stateAdapter: mockState, + currentMessage: createTestMessage("action-msg", "", { + raw: { + team: { domain: "workspace", id: "T123" }, + type: "block_actions", + }, + author: { + userId: "U456", + userName: "user", + fullName: "Test User", + isBot: false, + isMe: false, + }, + }), + }); + + const taskChunk: StreamChunk = { + id: "task-1", + status: "pending", + title: "Thinking", + type: "task_update", + }; + async function* structuredStream(): AsyncIterable { + yield "Picking option..."; + yield taskChunk; + } + + await threadWithActionContext.post( + structuredStream() as unknown as AsyncIterable + ); + + expect(mockStream).toHaveBeenCalledTimes(1); + const [, passedStream] = mockStream.mock.calls[0]; + const collected: Array = []; + for await (const chunk of passedStream as AsyncIterable< + string | StreamChunk + >) { + collected.push(chunk); + } + expect(collected).toContain("Picking option..."); + expect(collected).toContainEqual(taskChunk); + }); + it("should pass StreamingPlan PostableObject options to adapter.stream", async () => { const mockStream = vi.fn().mockResolvedValue({ id: "msg-stream", diff --git a/packages/chat/src/thread.ts b/packages/chat/src/thread.ts index cd99cfc1..6df13d1f 100644 --- a/packages/chat/src/thread.ts +++ b/packages/chat/src/thread.ts @@ -574,12 +574,12 @@ export class ThreadImpl> const options: StreamOptions = { ...callerOptions }; if (this._currentMessage) { options.recipientUserId = this._currentMessage.author.userId; - // Extract teamId from raw Slack payload - const raw = this._currentMessage.raw as { - team_id?: string; - team?: string; - }; - options.recipientTeamId = raw?.team_id ?? raw?.team; + // recipientTeamId is only consumed by the Slack adapter; other adapters + // ignore it. Derivation is Slack-specific because `currentMessage.raw` + // shape varies across Slack webhook types (message events vs block_actions). + options.recipientTeamId = this.extractSlackRecipientTeamId( + this._currentMessage.raw + ); } // Use native streaming if adapter supports it @@ -650,6 +650,47 @@ export class ThreadImpl> return this.fallbackStream(textOnlyStream, options); } + /** + * Slack payloads carry the workspace ID in a few different shapes depending on + * the webhook type: + * - Message events: `team_id` or `team` as a string + * - `block_actions` payloads: `team.id` (object), with `user.team_id` as a fallback + */ + private extractSlackRecipientTeamId(raw: unknown): string | undefined { + if (!raw || typeof raw !== "object") { + return undefined; + } + + const payload = raw as { + team?: { id?: unknown } | string; + team_id?: unknown; + user?: { team_id?: unknown }; + }; + + if (typeof payload.team_id === "string" && payload.team_id) { + return payload.team_id; + } + + if (typeof payload.team === "string" && payload.team) { + return payload.team; + } + + if ( + payload.team && + typeof payload.team === "object" && + typeof payload.team.id === "string" && + payload.team.id + ) { + return payload.team.id; + } + + if (typeof payload.user?.team_id === "string" && payload.user.team_id) { + return payload.user.team_id; + } + + return undefined; + } + async startTyping(status?: string): Promise { await this.adapter.startTyping(this.id, status); } diff --git a/packages/integration-tests/fixtures/replay/streaming/slack.json b/packages/integration-tests/fixtures/replay/streaming/slack.json index 10af1486..5bf273fd 100644 --- a/packages/integration-tests/fixtures/replay/streaming/slack.json +++ b/packages/integration-tests/fixtures/replay/streaming/slack.json @@ -203,5 +203,221 @@ ], "is_ext_shared_channel": false, "event_context": "4-eyJldCI6ImFwcF9tZW50aW9uIiwidGlkIjoiVDBBNlI5WUxTS0EiLCJhaWQiOiJBMEE2MEswOENMQyIsImNpZCI6IkMwQUNFTENRQkFCIn0" + }, + "promptMessage": { + "token": "sanitized-verification-token", + "team_id": "T08REALTEAM1", + "context_team_id": "T08REALTEAM1", + "context_enterprise_id": null, + "api_app_id": "A08REALAPP1", + "event": { + "type": "message", + "user": "U08REALBOT1", + "ts": "1775407827.773439", + "bot_id": "B08REALBOT1", + "app_id": "A08REALAPP1", + "text": "*Which option should I use?* Select an option to continue:", + "team": "T08REALTEAM1", + "thread_ts": "1775407823.782829", + "parent_user_id": "U08REALUSER1", + "blocks": [ + { + "type": "header", + "block_id": "BP/k8", + "text": { + "type": "plain_text", + "text": "Which option should I use?", + "emoji": true + } + }, + { + "type": "section", + "block_id": "wIM5b", + "text": { + "type": "mrkdwn", + "text": "Select an option to continue:", + "verbatim": false + } + }, + { + "type": "actions", + "block_id": "AXMQo", + "elements": [ + { + "type": "button", + "action_id": "option-select:option-a", + "text": { + "type": "plain_text", + "text": "Option A", + "emoji": true + }, + "value": "option-a" + } + ] + } + ], + "channel": "C08REALCHAN1", + "event_ts": "1775407827.773439", + "channel_type": "channel" + }, + "type": "event_callback", + "event_id": "Ev08REALPROMPT1", + "event_time": 1775407827, + "authorizations": [ + { + "enterprise_id": null, + "team_id": "T08REALTEAM1", + "user_id": "U08REALBOT1", + "is_bot": true, + "is_enterprise_install": false + } + ], + "is_ext_shared_channel": false, + "event_context": "4-sanitized-button-prompt" + }, + "buttonAction": { + "type": "block_actions", + "user": { + "id": "U08REALUSER1", + "username": "testuser", + "name": "testuser", + "team_id": "T08REALTEAM1" + }, + "api_app_id": "A08REALAPP1", + "token": "sanitized-verification-token", + "container": { + "type": "message", + "message_ts": "1775407827.773439", + "channel_id": "C08REALCHAN1", + "is_ephemeral": false, + "thread_ts": "1775407823.782829" + }, + "trigger_id": "10847034803718.8832297784567.sanitized", + "team": { + "id": "T08REALTEAM1", + "domain": "example-workspace" + }, + "enterprise": null, + "is_enterprise_install": false, + "channel": { + "id": "C08REALCHAN1", + "name": "general" + }, + "message": { + "user": "U08REALBOT1", + "type": "message", + "ts": "1775407827.773439", + "bot_id": "B08REALBOT1", + "app_id": "A08REALAPP1", + "text": "*Which option should I use?* Select an option to continue:", + "team": "T08REALTEAM1", + "thread_ts": "1775407823.782829", + "parent_user_id": "U08REALUSER1", + "blocks": [ + { + "type": "header", + "block_id": "BP/k8", + "text": { + "type": "plain_text", + "text": "Which option should I use?", + "emoji": true + } + }, + { + "type": "section", + "block_id": "wIM5b", + "text": { + "type": "mrkdwn", + "text": "Select an option to continue:", + "verbatim": false + } + }, + { + "type": "actions", + "block_id": "AXMQo", + "elements": [ + { + "type": "button", + "action_id": "option-select:option-a", + "text": { + "type": "plain_text", + "text": "Option A", + "emoji": true + }, + "value": "option-a" + } + ] + } + ] + }, + "state": { + "values": {} + }, + "response_url": "https://hooks.slack.com/actions/T08REALTEAM1/10849095105106/sanitized", + "actions": [ + { + "action_id": "option-select:option-a", + "block_id": "AXMQo", + "text": { + "type": "plain_text", + "text": "Option A", + "emoji": true + }, + "value": "option-a", + "type": "button", + "action_ts": "1775407849.087100" + } + ] + }, + "threadFollowUp": { + "token": "sanitized-verification-token", + "team_id": "T08REALTEAM1", + "context_team_id": "T08REALTEAM1", + "context_enterprise_id": null, + "api_app_id": "A08REALAPP1", + "event": { + "type": "message", + "user": "U08REALUSER1", + "ts": "1775407883.655979", + "client_msg_id": "23bf75d5-4b7f-4caa-88ff-b9510ae177f4", + "text": "ping?", + "team": "T08REALTEAM1", + "thread_ts": "1775407823.782829", + "parent_user_id": "U08REALUSER1", + "blocks": [ + { + "type": "rich_text", + "block_id": "v6Z74", + "elements": [ + { + "type": "rich_text_section", + "elements": [ + { + "type": "text", + "text": "ping?" + } + ] + } + ] + } + ], + "channel": "C08REALCHAN1", + "event_ts": "1775407883.655979", + "channel_type": "channel" + }, + "type": "event_callback", + "event_id": "Ev08REALFOLLOWUP1", + "event_time": 1775407883, + "authorizations": [ + { + "enterprise_id": null, + "team_id": "T08REALTEAM1", + "user_id": "U08REALBOT1", + "is_bot": true, + "is_enterprise_install": false + } + ], + "is_ext_shared_channel": false, + "event_context": "4-sanitized-thread-follow-up" } } diff --git a/packages/integration-tests/src/replay-streaming.test.ts b/packages/integration-tests/src/replay-streaming.test.ts index e3b98b42..23c13082 100644 --- a/packages/integration-tests/src/replay-streaming.test.ts +++ b/packages/integration-tests/src/replay-streaming.test.ts @@ -8,6 +8,7 @@ * See fixtures/replay/README.md for instructions on updating fixtures. */ +import type { ActionEvent, StreamChunk } from "chat"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import gchatFixtures from "../fixtures/replay/streaming/gchat.json"; import slackFixtures from "../fixtures/replay/streaming/slack.json"; @@ -37,6 +38,17 @@ async function* createTextStream(chunks: string[]): AsyncIterable { } } +async function* createStructuredStream(): AsyncIterable { + yield "Starting structured reply..."; + yield { + id: "task-1", + status: "pending", + title: "Looking up selected option", + type: "task_update", + }; + yield "Done."; +} + describe("Streaming Replay Tests", () => { describe("Slack", () => { let ctx: SlackTestContext; @@ -134,6 +146,95 @@ describe("Streaming Replay Tests", () => { expect(aiModeEnabled).toBe(true); expect(ctx.mockClient.chatStream).toHaveBeenCalled(); }); + + it("should ignore a prompt message posted by the bot", async () => { + await ctx.sendWebhook(slackFixtures.promptMessage); + + expect(ctx.captured.mentionMessage).toBeNull(); + expect(ctx.captured.followUpMessage).toBeNull(); + expect(ctx.mockClient.chat.postMessage).not.toHaveBeenCalled(); + expect(ctx.mockClient.chatStream).not.toHaveBeenCalled(); + }); + + it("should stream structured chunks for a block_actions continuation", async () => { + const actionHandler = vi.fn(async (event: ActionEvent) => { + if (event.actionId !== "option-select:option-a") { + return; + } + await event.thread?.post(createStructuredStream()); + }); + ctx.chat.onAction(actionHandler); + + await ctx.sendWebhook(slackFixtures.promptMessage); + ctx.mockClient.clearMocks(); + + await ctx.sendSlackAction(slackFixtures.buttonAction); + + const capturedAction = actionHandler.mock.calls[0]?.[0]; + expect(capturedAction).not.toBeNull(); + if (!capturedAction) { + throw new Error("Expected block action to be captured"); + } + expect(capturedAction.actionId).toBe("option-select:option-a"); + expect(capturedAction.user.userId).toBe("U08REALUSER1"); + expect(capturedAction.user.userName).toBe("testuser"); + expect(capturedAction.thread?.id).toBe( + "slack:C08REALCHAN1:1775407823.782829" + ); + expect(capturedAction.thread?.channelId).toBe("slack:C08REALCHAN1"); + + expect(ctx.mockClient.chatStream).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "C08REALCHAN1", + recipient_team_id: "T08REALTEAM1", + recipient_user_id: "U08REALUSER1", + thread_ts: "1775407823.782829", + }) + ); + + const streamer = ctx.mockClient.chatStream.mock.results.at(-1)?.value as { + append: ReturnType; + }; + const hasStructuredAppend = streamer.append.mock.calls.some((call) => { + const [payload] = call as [{ chunks?: Array<{ type?: string }> }]; + return ( + Array.isArray(payload.chunks) && + payload.chunks.some((chunk) => chunk.type === "task_update") + ); + }); + + expect(hasStructuredAppend).toBe(true); + }); + + it("should stream follow-up replies for a subscribed message payload", async () => { + await ctx.state.connect(); + await ctx.state.subscribe("slack:C08REALCHAN1:1775407823.782829"); + + ctx.chat.onSubscribedMessage(async (thread, message) => { + if (message.text !== "ping?") { + return; + } + + await thread.post(createTextStream(["pong"])); + }); + + await ctx.sendWebhook(slackFixtures.threadFollowUp); + + expectValidFollowUp(ctx.captured, { + text: "ping?", + adapterName: "slack", + }); + expect(ctx.captured.followUpMessage?.author.userId).toBe("U08REALUSER1"); + + expect(ctx.mockClient.chatStream).toHaveBeenCalledWith( + expect.objectContaining({ + channel: "C08REALCHAN1", + recipient_team_id: "T08REALTEAM1", + recipient_user_id: "U08REALUSER1", + thread_ts: "1775407823.782829", + }) + ); + }); }); describe("Teams", () => {