Skip to content

Commit f69e68a

Browse files
committed
feat(cloud-agent): add server-side cloud prompt handling
1 parent dbf0ca1 commit f69e68a

File tree

6 files changed

+167
-33
lines changed

6 files changed

+167
-33
lines changed

packages/agent/src/server/agent-server.test.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,30 @@ describe("AgentServer HTTP Mode", () => {
252252
const body = await response.json();
253253
expect(body.error).toBe("No active session for this run");
254254
});
255+
256+
it("accepts structured user_message content", async () => {
257+
await createServer().start();
258+
const token = createToken({ run_id: "different-run-id" });
259+
260+
const response = await fetch(`http://localhost:${port}/command`, {
261+
method: "POST",
262+
headers: {
263+
Authorization: `Bearer ${token}`,
264+
"Content-Type": "application/json",
265+
},
266+
body: JSON.stringify({
267+
jsonrpc: "2.0",
268+
method: "user_message",
269+
params: {
270+
content: [{ type: "text", text: "test" }],
271+
},
272+
}),
273+
});
274+
275+
expect(response.status).toBe(400);
276+
const body = await response.json();
277+
expect(body.error).toBe("No active session for this run");
278+
});
255279
});
256280

257281
describe("404 handling", () => {

packages/agent/src/server/agent-server.ts

Lines changed: 60 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import type { ContentBlock } from "@agentclientprotocol/sdk";
12
import {
23
ClientSideConnection,
34
ndJsonStream,
@@ -30,6 +31,11 @@ import type {
3031
import { AsyncMutex } from "../utils/async-mutex";
3132
import { getLlmGatewayUrl } from "../utils/gateway";
3233
import { Logger } from "../utils/logger";
34+
import {
35+
deserializeCloudPrompt,
36+
normalizeCloudPromptContent,
37+
promptBlocksToText,
38+
} from "./cloud-prompt";
3339
import { type JwtPayload, JwtValidationError, validateJwt } from "./jwt";
3440
import { jsonRpcRequestSchema, validateCommandParams } from "./schemas";
3541
import type { AgentServerConfig } from "./types";
@@ -487,17 +493,20 @@ export class AgentServer {
487493
switch (method) {
488494
case POSTHOG_NOTIFICATIONS.USER_MESSAGE:
489495
case "user_message": {
490-
const content = params.content as string;
496+
const prompt = normalizeCloudPromptContent(
497+
params.content as string | ContentBlock[],
498+
);
499+
const promptPreview = promptBlocksToText(prompt);
491500

492501
this.logger.info(
493-
`Processing user message (detectedPrUrl=${this.detectedPrUrl ?? "none"}): ${content.substring(0, 100)}...`,
502+
`Processing user message (detectedPrUrl=${this.detectedPrUrl ?? "none"}): ${promptPreview.substring(0, 100)}...`,
494503
);
495504

496505
this.session.logWriter.resetTurnMessages(this.session.payload.run_id);
497506

498507
const result = await this.session.clientConnection.prompt({
499508
sessionId: this.session.acpSessionId,
500-
prompt: [{ type: "text", text: content }],
509+
prompt,
501510
...(this.detectedPrUrl && {
502511
_meta: {
503512
prContext:
@@ -837,24 +846,33 @@ export class AgentServer {
837846
const initialPromptOverride = taskRun
838847
? this.getInitialPromptOverride(taskRun)
839848
: null;
840-
const initialPrompt = initialPromptOverride ?? task.description;
849+
const pendingUserPrompt = this.getPendingUserPrompt(taskRun);
850+
let initialPrompt: ContentBlock[] = [];
851+
if (pendingUserPrompt?.length) {
852+
initialPrompt = pendingUserPrompt;
853+
} else if (initialPromptOverride) {
854+
initialPrompt = [{ type: "text", text: initialPromptOverride }];
855+
} else if (task.description) {
856+
initialPrompt = [{ type: "text", text: task.description }];
857+
}
841858

842-
if (!initialPrompt) {
859+
if (initialPrompt.length === 0) {
843860
this.logger.warn("Task has no description, skipping initial message");
844861
return;
845862
}
846863

847864
this.logger.info("Sending initial task message", {
848865
taskId: payload.task_id,
849-
descriptionLength: initialPrompt.length,
866+
descriptionLength: promptBlocksToText(initialPrompt).length,
850867
usedInitialPromptOverride: !!initialPromptOverride,
868+
usedPendingUserMessage: !!pendingUserPrompt?.length,
851869
});
852870

853871
this.session.logWriter.resetTurnMessages(payload.run_id);
854872

855873
const result = await this.session.clientConnection.prompt({
856874
sessionId: this.session.acpSessionId,
857-
prompt: [{ type: "text", text: initialPrompt }],
875+
prompt: initialPrompt,
858876
});
859877

860878
this.logger.info("Initial task message completed", {
@@ -886,38 +904,49 @@ export class AgentServer {
886904
this.resumeState.conversation,
887905
);
888906

889-
// Read the pending user message from TaskRun state (set by the workflow
907+
// Read the pending user prompt from TaskRun state (set by the workflow
890908
// when the user sends a follow-up message that triggers a resume).
891-
const pendingUserMessage = this.getPendingUserMessage(taskRun);
909+
const pendingUserPrompt = this.getPendingUserPrompt(taskRun);
892910

893911
const sandboxContext = this.resumeState.snapshotApplied
894912
? `The workspace environment (all files, packages, and code changes) has been fully restored from where you left off.`
895913
: `The workspace files from the previous session were not restored (the file snapshot may have expired), so you are starting with a fresh environment. Your conversation history is fully preserved below.`;
896914

897-
let resumePrompt: string;
898-
if (pendingUserMessage) {
899-
// Include the pending message as the user's new question so the agent
900-
// responds to it directly instead of the generic resume context.
901-
resumePrompt =
902-
`You are resuming a previous conversation. ${sandboxContext}\n\n` +
903-
`Here is the conversation history from the previous session:\n\n` +
904-
`${conversationSummary}\n\n` +
905-
`The user has sent a new message:\n\n` +
906-
`${pendingUserMessage}\n\n` +
907-
`Respond to the user's new message above. You have full context from the previous session.`;
915+
let resumePromptBlocks: ContentBlock[];
916+
if (pendingUserPrompt?.length) {
917+
resumePromptBlocks = [
918+
{
919+
type: "text",
920+
text:
921+
`You are resuming a previous conversation. ${sandboxContext}\n\n` +
922+
`Here is the conversation history from the previous session:\n\n` +
923+
`${conversationSummary}\n\n` +
924+
`The user has sent a new message:\n\n`,
925+
},
926+
...pendingUserPrompt,
927+
{
928+
type: "text",
929+
text: "\n\nRespond to the user's new message above. You have full context from the previous session.",
930+
},
931+
];
908932
} else {
909-
resumePrompt =
910-
`You are resuming a previous conversation. ${sandboxContext}\n\n` +
911-
`Here is the conversation history from the previous session:\n\n` +
912-
`${conversationSummary}\n\n` +
913-
`Continue from where you left off. The user is waiting for your response.`;
933+
resumePromptBlocks = [
934+
{
935+
type: "text",
936+
text:
937+
`You are resuming a previous conversation. ${sandboxContext}\n\n` +
938+
`Here is the conversation history from the previous session:\n\n` +
939+
`${conversationSummary}\n\n` +
940+
`Continue from where you left off. The user is waiting for your response.`,
941+
},
942+
];
914943
}
915944

916945
this.logger.info("Sending resume message", {
917946
taskId: payload.task_id,
918947
conversationTurns: this.resumeState.conversation.length,
919-
promptLength: resumePrompt.length,
920-
hasPendingUserMessage: !!pendingUserMessage,
948+
promptLength: promptBlocksToText(resumePromptBlocks).length,
949+
hasPendingUserMessage: !!pendingUserPrompt?.length,
921950
snapshotApplied: this.resumeState.snapshotApplied,
922951
});
923952

@@ -928,7 +957,7 @@ export class AgentServer {
928957

929958
const result = await this.session.clientConnection.prompt({
930959
sessionId: this.session.acpSessionId,
931-
prompt: [{ type: "text", text: resumePrompt }],
960+
prompt: resumePromptBlocks,
932961
});
933962

934963
this.logger.info("Resume message completed", {
@@ -1013,16 +1042,16 @@ export class AgentServer {
10131042
return trimmed.length > 0 ? trimmed : null;
10141043
}
10151044

1016-
private getPendingUserMessage(taskRun: TaskRun | null): string | null {
1045+
private getPendingUserPrompt(taskRun: TaskRun | null): ContentBlock[] | null {
10171046
if (!taskRun) return null;
10181047
const state = taskRun.state as Record<string, unknown> | undefined;
10191048
const message = state?.pending_user_message;
10201049
if (typeof message !== "string") {
10211050
return null;
10221051
}
10231052

1024-
const trimmed = message.trim();
1025-
return trimmed.length > 0 ? trimmed : null;
1053+
const prompt = deserializeCloudPrompt(message);
1054+
return prompt.length > 0 ? prompt : null;
10261055
}
10271056

10281057
private getResumeRunId(taskRun: TaskRun | null): string | null {
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import type { ContentBlock } from "@agentclientprotocol/sdk";
2+
import { deserializeCloudPrompt, promptBlocksToText } from "@posthog/shared";
3+
4+
export { deserializeCloudPrompt, promptBlocksToText };
5+
6+
export function normalizeCloudPromptContent(
7+
content: string | ContentBlock[],
8+
): ContentBlock[] {
9+
if (typeof content === "string") {
10+
return deserializeCloudPrompt(content);
11+
}
12+
return content;
13+
}

packages/agent/src/server/question-relay.test.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,53 @@ describe("Question relay", () => {
371371
});
372372

373373
describe("sendInitialTaskMessage prompt source", () => {
374+
it("uses pending user prompt blocks when present", async () => {
375+
vi.spyOn(server.posthogAPI, "getTask").mockResolvedValue({
376+
id: "test-task-id",
377+
title: "t",
378+
description: "original task description",
379+
} as unknown as Task);
380+
vi.spyOn(server.posthogAPI, "getTaskRun").mockResolvedValue({
381+
id: "test-run-id",
382+
task: "test-task-id",
383+
state: {
384+
pending_user_message:
385+
'__twig_cloud_prompt_v1__:{"blocks":[{"type":"text","text":"read this attachment"},{"type":"resource","resource":{"uri":"attachment://test.txt","text":"hello from file","mimeType":"text/plain"}}]}',
386+
},
387+
} as unknown as TaskRun);
388+
389+
const promptSpy = vi.fn().mockResolvedValue({ stopReason: "max_tokens" });
390+
server.session = {
391+
payload: TEST_PAYLOAD,
392+
acpSessionId: "acp-session",
393+
clientConnection: { prompt: promptSpy },
394+
logWriter: {
395+
flushAll: vi.fn().mockResolvedValue(undefined),
396+
getFullAgentResponse: vi.fn().mockReturnValue(null),
397+
resetTurnMessages: vi.fn(),
398+
flush: vi.fn().mockResolvedValue(undefined),
399+
isRegistered: vi.fn().mockReturnValue(true),
400+
},
401+
};
402+
403+
await server.sendInitialTaskMessage(TEST_PAYLOAD);
404+
405+
expect(promptSpy).toHaveBeenCalledWith({
406+
sessionId: "acp-session",
407+
prompt: [
408+
{ type: "text", text: "read this attachment" },
409+
{
410+
type: "resource",
411+
resource: {
412+
uri: "attachment://test.txt",
413+
text: "hello from file",
414+
mimeType: "text/plain",
415+
},
416+
},
417+
],
418+
});
419+
});
420+
374421
it("uses run state initial_prompt_override when present", async () => {
375422
vi.spyOn(server.posthogAPI, "getTask").mockResolvedValue({
376423
id: "test-task-id",

packages/agent/src/server/schemas.test.ts

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { describe, expect, it } from "vitest";
2-
import { mcpServersSchema } from "./schemas";
2+
import { mcpServersSchema, validateCommandParams } from "./schemas";
33

44
describe("mcpServersSchema", () => {
55
it("accepts a valid HTTP server", () => {
@@ -115,3 +115,21 @@ describe("mcpServersSchema", () => {
115115
expect(result.success).toBe(false);
116116
});
117117
});
118+
119+
describe("validateCommandParams", () => {
120+
it("accepts structured user_message content arrays", () => {
121+
const result = validateCommandParams("user_message", {
122+
content: [{ type: "text", text: "hello" }],
123+
});
124+
125+
expect(result.success).toBe(true);
126+
});
127+
128+
it("rejects empty content array", () => {
129+
const result = validateCommandParams("user_message", {
130+
content: [],
131+
});
132+
133+
expect(result.success).toBe(false);
134+
});
135+
});

packages/agent/src/server/schemas.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,10 @@ export const jsonRpcRequestSchema = z.object({
4242
export type JsonRpcRequest = z.infer<typeof jsonRpcRequestSchema>;
4343

4444
export const userMessageParamsSchema = z.object({
45-
content: z.string().min(1, "Content is required"),
45+
content: z.union([
46+
z.string().min(1, "Content is required"),
47+
z.array(z.record(z.string(), z.unknown())).min(1, "Content is required"),
48+
]),
4649
});
4750

4851
export const commandParamsSchemas = {

0 commit comments

Comments
 (0)