diff --git a/packages/core/src/core/geminiChat.test.ts b/packages/core/src/core/geminiChat.test.ts index fd25e8a220..ef636f3ce3 100644 --- a/packages/core/src/core/geminiChat.test.ts +++ b/packages/core/src/core/geminiChat.test.ts @@ -1060,13 +1060,15 @@ describe('GeminiChat', async () => { parts: [{ text: 'hello' }], }, ], - config: {}, + config: { + abortSignal: expect.any(AbortSignal), + }, }, 'prompt-id-1', ); // Verify that token counting is called when usageMetadata is present. - // The Footer-driving counter must reflect *prompt* size only — output + // The Footer-driving counter must reflect *prompt* size only ?output // tokens for the in-flight round are not yet in history. The mock // returns promptTokenCount=42, so that's what should be reported. expect(uiTelemetryService.setLastPromptTokenCount).toHaveBeenCalledWith( @@ -1266,7 +1268,7 @@ describe('GeminiChat', async () => { expect(subagentChat.getLastPromptTokenCount()).toBe(123_456); // The compression service receives the seeded count, so the threshold - // check sees the inherited size — not the constructor default of 0. + // check sees the inherited size ?not the constructor default of 0. const compressSpy = vi .spyOn(ChatCompressionService.prototype, 'compress') .mockResolvedValue({ @@ -1342,7 +1344,7 @@ describe('GeminiChat', async () => { 'compress', ); - // Step 1: auto-compression fails — latch is set on the chat. + // Step 1: auto-compression fails ?latch is set on the chat. compressSpy.mockResolvedValueOnce({ newHistory: null, info: { @@ -2315,7 +2317,7 @@ describe('GeminiChat', async () => { expect(first.value.type).toBe(StreamEventType.RETRY); const skipDelay = first.value.retryInfo!.skipDelay!; - // Resume generator — it's now awaiting the 60s delay. + // Resume generator ?it's now awaiting the 60s delay. // Call skipDelay() to resolve it immediately instead of advancing timers. const secondPromise = iterator.next(); skipDelay(); @@ -2366,7 +2368,7 @@ describe('GeminiChat', async () => { vi.mocked(mockContentGenerator.generateContentStream) .mockResolvedValueOnce(failingStreamGenerator()) - // Should never be called — abort should prevent the second attempt + // Should never be called ?abort should prevent the second attempt .mockResolvedValueOnce(failingStreamGenerator()); const stream = await chat.sendMessageStream( @@ -2394,7 +2396,7 @@ describe('GeminiChat', async () => { // Verify the next sendMessageStream is not blocked by the old delay. // If sendPromise were still pending, this would hang until the 60s - // timer fires — which never happens under fake timers, causing a timeout. + // timer fires ?which never happens under fake timers, causing a timeout. const nextStream = (async function* () { yield { candidates: [ @@ -2436,7 +2438,7 @@ describe('GeminiChat', async () => { try { const glmError = new StreamContentError( - '{"error":{"code":"1302","message":"您的账户已达到速率限制,请您控制请求频率"}}', + '{"error":{"code":"1302","message":"˻ѴﵽƣƵ"}}', ); async function* failingStreamGenerator() { throw glmError; @@ -3217,8 +3219,7 @@ describe('GeminiChat', async () => { } it('should enter recovery loop when escalated response is also truncated', async () => { - // Three streams: initial (MAX_TOKENS) → escalated (MAX_TOKENS) → - // recovery (STOP). + // Three streams: initial (MAX_TOKENS) ?escalated (MAX_TOKENS) ? // recovery (STOP). const streams = [ makeStream([makeChunk([{ text: 'Hello' }], 'MAX_TOKENS')]), makeStream([makeChunk([{ text: ' world' }], 'MAX_TOKENS')]), @@ -3472,7 +3473,7 @@ describe('GeminiChat', async () => { } const history = chat.getHistory(); - // Exactly one user turn + one model turn — the recovery pairs should + // Exactly one user turn + one model turn ?the recovery pairs should // be folded back into the preceding model entry. expect(history.length).toBe(2); expect(history[0]!.role).toBe('user'); @@ -3484,7 +3485,7 @@ describe('GeminiChat', async () => { expect(flattened).not.toContain('Output token limit hit'); // All escalation + recovery content must be preserved in the merged - // model turn, in order (B escalation → C recovery-1 → D recovery-2). + // model turn, in order (B escalation ?C recovery-1 ?D recovery-2). const mergedText = (history[1]!.parts ?? []) .map((p) => ('text' in p ? ((p as { text?: string }).text ?? '') : '')) .join(''); @@ -3492,6 +3493,181 @@ describe('GeminiChat', async () => { }); }); + describe('stream idle watchdog', () => { + function makeChunk( + parts: Array<{ text?: string; functionCall?: unknown }>, + finishReason?: string, + ): GenerateContentResponse { + return { + candidates: [ + { + content: { role: 'model', parts }, + ...(finishReason ? { finishReason } : {}), + }, + ], + } as unknown as GenerateContentResponse; + } + + function waitForDelayOrAbort( + ms: number, + signal?: AbortSignal, + ): Promise { + return new Promise((resolve, reject) => { + let timeoutId: ReturnType | undefined; + + const cleanup = () => { + if (timeoutId !== undefined) { + clearTimeout(timeoutId); + timeoutId = undefined; + } + signal?.removeEventListener('abort', onAbort); + }; + + const onAbort = () => { + cleanup(); + reject(new Error('aborted')); + }; + + if (signal?.aborted) { + onAbort(); + return; + } + + signal?.addEventListener('abort', onAbort, { once: true }); + timeoutId = setTimeout(() => { + cleanup(); + resolve(); + }, ms); + }); + } + + async function* makeSilentStream(signal?: AbortSignal) { + await new Promise((_, reject) => { + if (!signal) { + return; + } + if (signal.aborted) { + reject(new Error('aborted')); + return; + } + signal.addEventListener('abort', () => reject(new Error('aborted')), { + once: true, + }); + }); + + yield {} as GenerateContentResponse; + } + + it('should reject a silent stream after the configured idle timeout retries exhaust', async () => { + vi.useFakeTimers(); + vi.stubEnv('QWEN_CODE_STREAM_IDLE_TIMEOUT_MS', '50'); + + const abortController = new AbortController(); + vi.mocked(mockContentGenerator.generateContentStream).mockImplementation( + async (request) => makeSilentStream(request.config?.abortSignal), + ); + + const stream = await chat.sendMessageStream( + 'gemini-3-pro', + { + message: 'hangs forever', + config: { abortSignal: abortController.signal }, + }, + 'prompt-stream-idle-timeout', + ); + + try { + await expectStreamExhaustion(stream); + expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes(3); + } finally { + abortController.abort(); + vi.unstubAllEnvs(); + vi.useRealTimers(); + } + }); + + it('should reset the idle timer whenever a new chunk arrives', async () => { + vi.useFakeTimers(); + vi.stubEnv('QWEN_CODE_STREAM_IDLE_TIMEOUT_MS', '50'); + + vi.mocked(mockContentGenerator.generateContentStream).mockImplementation( + async (request) => + (async function* () { + await waitForDelayOrAbort(30, request.config?.abortSignal); + yield makeChunk([{ text: 'Hel' }]); + await waitForDelayOrAbort(30, request.config?.abortSignal); + yield makeChunk([{ text: 'lo' }], 'STOP'); + })(), + ); + + try { + const stream = await chat.sendMessageStream( + 'gemini-3-pro', + { message: 'stream slowly but steadily' }, + 'prompt-stream-idle-progress', + ); + + const events = await collectStreamWithFakeTimers(stream, 100); + expect(events.map((event) => event.type)).toEqual([ + StreamEventType.CHUNK, + StreamEventType.CHUNK, + ]); + expect(chat.getHistory().at(-1)?.parts?.map((part) => part.text ?? '').join('')).toBe( + 'Hello', + ); + } finally { + vi.unstubAllEnvs(); + vi.useRealTimers(); + } + }); + + it('should allow disabling the idle watchdog via environment variable', async () => { + vi.useFakeTimers(); + vi.stubEnv('QWEN_CODE_STREAM_IDLE_TIMEOUT_MS', '50'); + vi.stubEnv('QWEN_CODE_DISABLE_STREAM_WATCHDOG', 'true'); + + const abortController = new AbortController(); + let internalSignal: AbortSignal | undefined; + let settled = false; + + vi.mocked(mockContentGenerator.generateContentStream).mockImplementation( + async (request) => { + internalSignal = request.config?.abortSignal; + return makeSilentStream(internalSignal); + }, + ); + + const stream = await chat.sendMessageStream( + 'gemini-3-pro', + { + message: 'keep waiting', + config: { abortSignal: abortController.signal }, + }, + 'prompt-stream-idle-disabled', + ); + + const consumePromise = (async () => { + for await (const _ of stream) { + /* consume */ + } + })().finally(() => { + settled = true; + }); + + try { + await vi.advanceTimersByTimeAsync(60); + await Promise.resolve(); + expect(settled).toBe(false); + expect(internalSignal?.aborted).toBe(false); + expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes(1); + } finally { + abortController.abort(); + await consumePromise.catch(() => undefined); + vi.unstubAllEnvs(); + vi.useRealTimers(); + } + }); + }); describe('redactStructuredOutputArgsForRecording', () => { // The chat-recording JSONL persists assistant turns to disk and re-feeds // them on `--continue` / `--resume`. For `--json-schema` runs the @@ -3578,7 +3754,7 @@ describe('GeminiChat', async () => { /** * Mock a successful compression: the service returns COMPRESSED with a * fresh history. We don't go through the real - * `config.getContentGenerator().generateContent` path here — the service + * `config.getContentGenerator().generateContent` path here ?the service * is mocked at the boundary. */ function mockCompressionService( diff --git a/packages/core/src/core/geminiChat.ts b/packages/core/src/core/geminiChat.ts index c2fc71bbea..05a3a051ae 100644 --- a/packages/core/src/core/geminiChat.ts +++ b/packages/core/src/core/geminiChat.ts @@ -71,7 +71,7 @@ const HEAP_PRESSURE_COMPRESSION_COOLDOWN_MS = 30_000; * The chat-recording JSONL (`/chats/.jsonl`) * persists assistant turns to disk and re-feeds them on * `--continue` / `--resume`. For `--json-schema` runs the tool args - * ARE the user's structured payload — already emitted on stdout via + * ARE the user's structured payload ?already emitted on stdout via * `result` / `structured_result`. Recording them verbatim here would * mean the same payload (and every validation-failure retry along the * way) sits on disk indefinitely, contradicting the privacy contract @@ -167,6 +167,8 @@ const INVALID_STREAM_RETRY_CONFIG = { * message so the model can continue from where it left off. */ const MAX_OUTPUT_RECOVERY_ATTEMPTS = 3; +const DEFAULT_STREAM_IDLE_TIMEOUT_MS = 90_000; +const STREAM_IDLE_WARNING_FRACTION = 0.5; /** * Recovery message injected as a user turn when the model's output is @@ -174,7 +176,7 @@ const MAX_OUTPUT_RECOVERY_ATTEMPTS = 3; * without repeating itself and to break remaining work into smaller steps. */ const OUTPUT_RECOVERY_MESSAGE = - 'Output token limit hit. Resume directly — no apology, no recap of what ' + + 'Output token limit hit. Resume directly ?no apology, no recap of what ' + 'you were doing. Pick up mid-thought if that is where the cut happened. ' + 'Break remaining work into smaller pieces.'; @@ -369,27 +371,131 @@ function stripThoughtPartsFromContent(content: Content): Content | null { }; } +type InvalidStreamErrorType = + | 'NO_FINISH_REASON' + | 'NO_RESPONSE_TEXT' + | 'STREAM_IDLE_TIMEOUT'; + +interface StreamIdleWatchdog { + next(nextPromise: Promise>): Promise>; + cleanup(): void; +} + +function isStreamWatchdogDisabled(): boolean { + const value = process.env['QWEN_CODE_DISABLE_STREAM_WATCHDOG']; + return value === '1' || value === 'true'; +} + +function getStreamIdleTimeoutMs(): number | undefined { + if (isStreamWatchdogDisabled()) { + return undefined; + } + + const value = process.env['QWEN_CODE_STREAM_IDLE_TIMEOUT_MS']; + if (!value) { + return DEFAULT_STREAM_IDLE_TIMEOUT_MS; + } + + const parsed = Number(value); + if (!Number.isFinite(parsed) || parsed <= 0) { + debugLogger.warn( + `Ignoring invalid QWEN_CODE_STREAM_IDLE_TIMEOUT_MS value: ${value}`, + ); + return DEFAULT_STREAM_IDLE_TIMEOUT_MS; + } + + return parsed; +} + +function linkAbortSignal( + signal: AbortSignal | undefined, + controller: AbortController, +): () => void { + if (!signal) { + return () => {}; + } + + if (signal.aborted) { + controller.abort(signal.reason); + return () => {}; + } + + const onAbort = () => { + controller.abort(signal.reason); + }; + signal.addEventListener('abort', onAbort, { once: true }); + return () => signal.removeEventListener('abort', onAbort); +} + +function createStreamIdleWatchdog( + model: string, + abortController: AbortController, +): StreamIdleWatchdog | undefined { + const timeoutMs = getStreamIdleTimeoutMs(); + if (timeoutMs === undefined) { + return undefined; + } + + const warningMs = Math.max( + 1, + Math.floor(timeoutMs * STREAM_IDLE_WARNING_FRACTION), + ); + let timeoutId: ReturnType | undefined; + let warningId: ReturnType | undefined; + + const clearTimers = () => { + if (timeoutId !== undefined) { + clearTimeout(timeoutId); + timeoutId = undefined; + } + if (warningId !== undefined) { + clearTimeout(warningId); + warningId = undefined; + } + }; + + return { + next(nextPromise: Promise>): Promise> { + const timeoutPromise = new Promise((_, reject) => { + warningId = setTimeout(() => { + debugLogger.warn( + `Stream has been idle for ${warningMs}ms while waiting for the next chunk from ${model}.`, + ); + }, warningMs); + timeoutId = setTimeout(() => { + clearTimers(); + abortController.abort(); + reject( + new InvalidStreamError( + `Model stream went idle for ${timeoutMs}ms without receiving a chunk.`, + 'STREAM_IDLE_TIMEOUT', + ), + ); + }, timeoutMs); + }); + + return Promise.race([nextPromise, timeoutPromise]).finally(clearTimers); + }, + cleanup() { + clearTimers(); + }, + }; +} + /** * Custom error to signal that a stream completed with invalid content, * which should trigger a retry. */ export class InvalidStreamError extends Error { - readonly type: 'NO_FINISH_REASON' | 'NO_RESPONSE_TEXT'; + readonly type: InvalidStreamErrorType; - constructor(message: string, type: 'NO_FINISH_REASON' | 'NO_RESPONSE_TEXT') { + constructor(message: string, type: InvalidStreamErrorType) { super(message); this.name = 'InvalidStreamError'; this.type = type; } } -/** - * Chat session that enables sending messages to the model with previous - * conversation context. - * - * @remarks - * The session maintains all the turns between user and model. - */ const SESSION_START_CONTEXT_SENTINEL_START = ''; @@ -485,7 +591,7 @@ export class GeminiChat { /** * Seed the last-prompt-token-count for chats created with inherited * history (forks, subagents, speculation). Without this, the auto-compress - * threshold check sees `0` and refuses to compress — so the first API call + * threshold check sees `0` and refuses to compress ?so the first API call * can 400 from oversized history. Callers pass the parent chat's * `getLastPromptTokenCount()` here. */ @@ -555,7 +661,7 @@ export class GeminiChat { info, compressedHistory: newHistory, }); - // Auto-compaction replaces history in place — no env-context refresh + // Auto-compaction replaces history in place ?no env-context refresh // here. Manual /compress goes through GeminiClient.tryCompressChat, // which calls startChat() to re-prepend a fresh env snapshot. See // GeminiClient.sendMessageStream for the rationale behind the split. @@ -658,7 +764,7 @@ export class GeminiChat { * @return The model's response. * * @example - * ```ts + * ``ts * const chat = ai.chats.create({model: 'gemini-2.0-flash'}); * const response = await chat.sendMessageStream({ * message: 'Why is the sky blue?' @@ -666,7 +772,7 @@ export class GeminiChat { * for await (const chunk of response) { * console.log(chunk.text); * } - * ``` + * `` */ async sendMessageStream( model: string, @@ -946,7 +1052,7 @@ export class GeminiChat { await delay(delayMs, params.config?.abortSignal).promise; continue; } - // Transient budget exhausted — stop immediately. + // Transient budget exhausted ?stop immediately. if (isTransientStreamError) { break; } @@ -1157,40 +1263,66 @@ export class GeminiChat { params: SendMessageParameters, prompt_id: string, ): Promise> { + const streamAbortController = new AbortController(); + const cleanupAbortLink = linkAbortSignal( + params.config?.abortSignal, + streamAbortController, + ); + const streamWatchdog = createStreamIdleWatchdog( + model, + streamAbortController, + ); + const apiCall = () => this.config.getContentGenerator().generateContentStream( { model, contents: requestContents, - config: { ...this.generationConfig, ...params.config }, + config: { + ...this.generationConfig, + ...params.config, + abortSignal: streamAbortController.signal, + }, }, prompt_id, ); - const streamResponse = await retryWithBackoff(apiCall, { - shouldRetryOnError: (error: unknown) => { - if (error instanceof Error) { - if (isSchemaDepthError(error.message)) return false; - if (isInvalidArgumentError(error.message)) return false; - } - const status = getErrorStatus(error); - if (status === 400) return false; - if (status === 429) return true; - if (status && status >= 500 && status < 600) return true; + try { + const streamResponse = await retryWithBackoff(apiCall, { + shouldRetryOnError: (error: unknown) => { + if (error instanceof Error) { + if (isSchemaDepthError(error.message)) return false; + if (isInvalidArgumentError(error.message)) return false; + } - return false; - }, - authType: this.config.getContentGeneratorConfig()?.authType, - persistentMode: isUnattendedMode(), - signal: params.config?.abortSignal, - heartbeatFn: (info) => { - process.stderr.write( - `[qwen-code] Waiting for API capacity... attempt ${info.attempt}, retry in ${Math.ceil(info.remainingMs / 1000)}s\n`, - ); - }, - }); + const status = getErrorStatus(error); + if (status === 400) return false; + if (status === 429) return true; + if (status && status >= 500 && status < 600) return true; + + return false; + }, + authType: this.config.getContentGeneratorConfig()?.authType, + persistentMode: isUnattendedMode(), + signal: params.config?.abortSignal, + heartbeatFn: (info) => { + process.stderr.write( + `[qwen-code] Waiting for API capacity... attempt ${info.attempt}, retry in ${Math.ceil(info.remainingMs / 1000)}s\n`, + ); + }, + }); - return this.processStreamResponse(model, streamResponse); + return this.processStreamResponse( + model, + streamResponse, + streamWatchdog, + cleanupAbortLink, + ); + } catch (error) { + streamWatchdog?.cleanup(); + cleanupAbortLink(); + throw error; + } } /** @@ -1248,7 +1380,7 @@ export class GeminiChat { /** * Returns the number of entries in the raw chat history. O(1) and - * does not clone — use this when you only need the count and would + * does not clone ?use this when you only need the count and would * otherwise pay the {@link getHistory} `structuredClone` cost. */ getHistoryLength(): number { @@ -1339,6 +1471,8 @@ export class GeminiChat { private async *processStreamResponse( model: string, streamResponse: AsyncGenerator, + streamWatchdog?: StreamIdleWatchdog, + cleanupAbortLink?: () => void, ): AsyncGenerator { // Collect ALL parts from the model response (including thoughts for recording) const allModelParts: Part[] = []; @@ -1347,149 +1481,161 @@ export class GeminiChat { let hasToolCall = false; let hasFinishReason = false; - for await (const chunk of streamResponse) { - // Use ||= to avoid later usage-only chunks (no candidates) overwriting - // a finishReason that was already seen in an earlier chunk. - hasFinishReason ||= - chunk?.candidates?.some((candidate) => candidate.finishReason) ?? false; - - if (isValidResponse(chunk)) { - const content = chunk.candidates?.[0]?.content; - if (content?.parts) { - if (content.parts.some((part) => part.functionCall)) { - hasToolCall = true; + try { + while (true) { + const result = streamWatchdog + ? await streamWatchdog.next(streamResponse.next()) + : await streamResponse.next(); + if (result.done) { + break; + } + const chunk = result.value; + // Use ||= to avoid later usage-only chunks (no candidates) overwriting + // a finishReason that was already seen in an earlier chunk. + hasFinishReason ||= + chunk?.candidates?.some((candidate) => candidate.finishReason) ?? false; + + if (isValidResponse(chunk)) { + const content = chunk.candidates?.[0]?.content; + if (content?.parts) { + if (content.parts.some((part) => part.functionCall)) { + hasToolCall = true; + } + + // Collect all parts for recording + allModelParts.push(...content.parts); } + } - // Collect all parts for recording - allModelParts.push(...content.parts); + // Collect token usage for consolidated recording + if (chunk.usageMetadata) { + usageMetadata = chunk.usageMetadata; + // Context usage tracks prompt size; output isn't in history yet. + const lastPromptTokenCount = + usageMetadata.promptTokenCount || usageMetadata.totalTokenCount; + if (lastPromptTokenCount) { + // Always update the per-chat counter so this chat (including + // subagents) can make its own compaction decisions. + this.lastPromptTokenCount = lastPromptTokenCount; + // Mirror to the global telemetry only when wired ?subagents + // pass `telemetryService=undefined` to keep their context usage + // out of the main session's UI counters. + this.telemetryService?.setLastPromptTokenCount(lastPromptTokenCount); + } + if (usageMetadata.cachedContentTokenCount && this.telemetryService) { + this.telemetryService.setLastCachedContentTokenCount( + usageMetadata.cachedContentTokenCount, + ); + } } + + yield chunk; // Yield every chunk to the UI immediately. } - // Collect token usage for consolidated recording - if (chunk.usageMetadata) { - usageMetadata = chunk.usageMetadata; - // Context usage tracks prompt size; output isn't in history yet. - const lastPromptTokenCount = - usageMetadata.promptTokenCount || usageMetadata.totalTokenCount; - if (lastPromptTokenCount) { - // Always update the per-chat counter so this chat (including - // subagents) can make its own compaction decisions. - this.lastPromptTokenCount = lastPromptTokenCount; - // Mirror to the global telemetry only when wired — subagents - // pass `telemetryService=undefined` to keep their context usage - // out of the main session's UI counters. - this.telemetryService?.setLastPromptTokenCount(lastPromptTokenCount); - } - if (usageMetadata.cachedContentTokenCount && this.telemetryService) { - this.telemetryService.setLastCachedContentTokenCount( - usageMetadata.cachedContentTokenCount, - ); + let thoughtContentPart: Part | undefined; + const thoughtText = allModelParts + .filter((part) => part.thought) + .map((part) => part.text) + .join('') + .trim(); + + if (thoughtText !== '') { + thoughtContentPart = { + text: thoughtText, + thought: true, + }; + + const thoughtSignature = allModelParts.filter( + (part) => part.thoughtSignature && part.thought, + )?.[0]?.thoughtSignature; + if (thoughtContentPart && thoughtSignature) { + thoughtContentPart.thoughtSignature = thoughtSignature; } } - yield chunk; // Yield every chunk to the UI immediately. - } + const contentParts = allModelParts.filter((part) => !part.thought); + const consolidatedHistoryParts: Part[] = []; + for (const part of contentParts) { + const lastPart = + consolidatedHistoryParts[consolidatedHistoryParts.length - 1]; + if ( + lastPart?.text && + isValidNonThoughtTextPart(lastPart) && + isValidNonThoughtTextPart(part) + ) { + lastPart.text += part.text; + } else if (isValidContentPart(part)) { + consolidatedHistoryParts.push(part); + } + } - let thoughtContentPart: Part | undefined; - const thoughtText = allModelParts - .filter((part) => part.thought) - .map((part) => part.text) - .join('') - .trim(); - - if (thoughtText !== '') { - thoughtContentPart = { - text: thoughtText, - thought: true, - }; - - const thoughtSignature = allModelParts.filter( - (part) => part.thoughtSignature && part.thought, - )?.[0]?.thoughtSignature; - if (thoughtContentPart && thoughtSignature) { - thoughtContentPart.thoughtSignature = thoughtSignature; + const contentText = consolidatedHistoryParts + .filter((part) => part.text) + .map((part) => part.text) + .join('') + .trim(); + + // Record assistant turn with raw Content and metadata + if (thoughtContentPart || contentText || hasToolCall || usageMetadata) { + const contextWindowSize = + this.config.getContentGeneratorConfig()?.contextWindowSize; + this.chatRecordingService?.recordAssistantTurn({ + model, + message: [ + ...(thoughtContentPart ? [thoughtContentPart] : []), + ...(contentText ? [{ text: contentText }] : []), + ...(hasToolCall + ? contentParts + .map(redactStructuredOutputArgsForRecording) + .filter( + ( + p, + ): p is { functionCall: NonNullable } => + p !== null, + ) + : []), + ], + tokens: usageMetadata, + contextWindowSize, + }); } - } - const contentParts = allModelParts.filter((part) => !part.thought); - const consolidatedHistoryParts: Part[] = []; - for (const part of contentParts) { - const lastPart = - consolidatedHistoryParts[consolidatedHistoryParts.length - 1]; - if ( - lastPart?.text && - isValidNonThoughtTextPart(lastPart) && - isValidNonThoughtTextPart(part) - ) { - lastPart.text += part.text; - } else if (isValidContentPart(part)) { - consolidatedHistoryParts.push(part); + // Stream validation logic: A stream is considered successful if: + // 1. There's a tool call (tool calls can end without explicit finish reasons), OR + // 2. There's a finish reason AND we have non-empty response text or thought text + // + // We throw an error only when there's no tool call AND: + // - No finish reason, OR + // - Empty response text (e.g., no actual content and no thoughts) + // + // Note: Thoughts-only responses are valid for models that use thinking modes + // These models may send only reasoning content without explicit text output. + const hasAnyContent = contentText || thoughtText; + if (!hasToolCall && (!hasFinishReason || !hasAnyContent)) { + if (!hasFinishReason) { + throw new InvalidStreamError( + 'Model stream ended without a finish reason.', + 'NO_FINISH_REASON', + ); + } else { + throw new InvalidStreamError( + 'Model stream ended with empty response text.', + 'NO_RESPONSE_TEXT', + ); + } } - } - const contentText = consolidatedHistoryParts - .filter((part) => part.text) - .map((part) => part.text) - .join('') - .trim(); - - // Record assistant turn with raw Content and metadata - if (thoughtContentPart || contentText || hasToolCall || usageMetadata) { - const contextWindowSize = - this.config.getContentGeneratorConfig()?.contextWindowSize; - this.chatRecordingService?.recordAssistantTurn({ - model, - message: [ + this.history.push({ + role: 'model', + parts: [ ...(thoughtContentPart ? [thoughtContentPart] : []), - ...(contentText ? [{ text: contentText }] : []), - ...(hasToolCall - ? contentParts - .map(redactStructuredOutputArgsForRecording) - .filter( - ( - p, - ): p is { functionCall: NonNullable } => - p !== null, - ) - : []), + ...consolidatedHistoryParts, ], - tokens: usageMetadata, - contextWindowSize, }); + } finally { + streamWatchdog?.cleanup(); + cleanupAbortLink?.(); } - - // Stream validation logic: A stream is considered successful if: - // 1. There's a tool call (tool calls can end without explicit finish reasons), OR - // 2. There's a finish reason AND we have non-empty response text or thought text - // - // We throw an error only when there's no tool call AND: - // - No finish reason, OR - // - Empty response text (e.g., no actual content and no thoughts) - // - // Note: Thoughts-only responses are valid for models that use thinking modes - // These models may send only reasoning content without explicit text output. - const hasAnyContent = contentText || thoughtText; - if (!hasToolCall && (!hasFinishReason || !hasAnyContent)) { - if (!hasFinishReason) { - throw new InvalidStreamError( - 'Model stream ended without a finish reason.', - 'NO_FINISH_REASON', - ); - } else { - throw new InvalidStreamError( - 'Model stream ended with empty response text.', - 'NO_RESPONSE_TEXT', - ); - } - } - - this.history.push({ - role: 'model', - parts: [ - ...(thoughtContentPart ? [thoughtContentPart] : []), - ...consolidatedHistoryParts, - ], - }); } /**