Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 189 additions & 13 deletions packages/core/src/core/geminiChat.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1060,13 +1060,15 @@ describe('GeminiChat', async () => {
parts: [{ text: 'hello' }],
},
],
config: {},
config: {
abortSignal: expect.any(AbortSignal),
},
},
'prompt-id-1',
);

// Verify that token counting is called when usageMetadata is present.
// The Footer-driving counter must reflect *prompt* size only — output
// The Footer-driving counter must reflect *prompt* size only �output
// tokens for the in-flight round are not yet in history. The mock
// returns promptTokenCount=42, so that's what should be reported.
expect(uiTelemetryService.setLastPromptTokenCount).toHaveBeenCalledWith(
Expand Down Expand Up @@ -1266,7 +1268,7 @@ describe('GeminiChat', async () => {
expect(subagentChat.getLastPromptTokenCount()).toBe(123_456);

// The compression service receives the seeded count, so the threshold
// check sees the inherited size — not the constructor default of 0.
// check sees the inherited size �not the constructor default of 0.
const compressSpy = vi
.spyOn(ChatCompressionService.prototype, 'compress')
.mockResolvedValue({
Expand Down Expand Up @@ -1342,7 +1344,7 @@ describe('GeminiChat', async () => {
'compress',
);

// Step 1: auto-compression fails — latch is set on the chat.
// Step 1: auto-compression fails �latch is set on the chat.
compressSpy.mockResolvedValueOnce({
newHistory: null,
info: {
Expand Down Expand Up @@ -2315,7 +2317,7 @@ describe('GeminiChat', async () => {
expect(first.value.type).toBe(StreamEventType.RETRY);
const skipDelay = first.value.retryInfo!.skipDelay!;

// Resume generator — it's now awaiting the 60s delay.
// Resume generator �it's now awaiting the 60s delay.
// Call skipDelay() to resolve it immediately instead of advancing timers.
const secondPromise = iterator.next();
skipDelay();
Expand Down Expand Up @@ -2366,7 +2368,7 @@ describe('GeminiChat', async () => {

vi.mocked(mockContentGenerator.generateContentStream)
.mockResolvedValueOnce(failingStreamGenerator())
// Should never be called — abort should prevent the second attempt
// Should never be called �abort should prevent the second attempt
.mockResolvedValueOnce(failingStreamGenerator());

const stream = await chat.sendMessageStream(
Expand Down Expand Up @@ -2394,7 +2396,7 @@ describe('GeminiChat', async () => {

// Verify the next sendMessageStream is not blocked by the old delay.
// If sendPromise were still pending, this would hang until the 60s
// timer fires — which never happens under fake timers, causing a timeout.
// timer fires �which never happens under fake timers, causing a timeout.
const nextStream = (async function* () {
yield {
candidates: [
Expand Down Expand Up @@ -2436,7 +2438,7 @@ describe('GeminiChat', async () => {

try {
const glmError = new StreamContentError(
'{"error":{"code":"1302","message":"您的账户已达到速率限制,请您控制请求频率"}}',
'{"error":{"code":"1302","message":"ÄúµÄÕË»§ÒÑ´ïµ½ËÙÂÊÏÞÖÆ£¬ÇëÄú¿ØÖÆÇëÇóƵÂÊ"}}',
);
async function* failingStreamGenerator() {
throw glmError;
Expand Down Expand Up @@ -3217,8 +3219,7 @@ describe('GeminiChat', async () => {
}

it('should enter recovery loop when escalated response is also truncated', async () => {
// Three streams: initial (MAX_TOKENS) → escalated (MAX_TOKENS) →
// recovery (STOP).
// Three streams: initial (MAX_TOKENS) �escalated (MAX_TOKENS) � // recovery (STOP).
const streams = [
makeStream([makeChunk([{ text: 'Hello' }], 'MAX_TOKENS')]),
makeStream([makeChunk([{ text: ' world' }], 'MAX_TOKENS')]),
Expand Down Expand Up @@ -3472,7 +3473,7 @@ describe('GeminiChat', async () => {
}

const history = chat.getHistory();
// Exactly one user turn + one model turn — the recovery pairs should
// Exactly one user turn + one model turn �the recovery pairs should
// be folded back into the preceding model entry.
expect(history.length).toBe(2);
expect(history[0]!.role).toBe('user');
Expand All @@ -3484,14 +3485,189 @@ describe('GeminiChat', async () => {
expect(flattened).not.toContain('Output token limit hit');

// All escalation + recovery content must be preserved in the merged
// model turn, in order (B escalation → C recovery-1 → D recovery-2).
// model turn, in order (B escalation �C recovery-1 �D recovery-2).
const mergedText = (history[1]!.parts ?? [])
.map((p) => ('text' in p ? ((p as { text?: string }).text ?? '') : ''))
.join('');
expect(mergedText).toBe('BCD');
});
});

describe('stream idle watchdog', () => {
function makeChunk(
parts: Array<{ text?: string; functionCall?: unknown }>,
finishReason?: string,
): GenerateContentResponse {
return {
candidates: [
{
content: { role: 'model', parts },
...(finishReason ? { finishReason } : {}),
},
],
} as unknown as GenerateContentResponse;
}

function waitForDelayOrAbort(
ms: number,
signal?: AbortSignal,
): Promise<void> {
return new Promise((resolve, reject) => {
let timeoutId: ReturnType<typeof setTimeout> | undefined;

const cleanup = () => {
if (timeoutId !== undefined) {
clearTimeout(timeoutId);
timeoutId = undefined;
}
signal?.removeEventListener('abort', onAbort);
};

const onAbort = () => {
cleanup();
reject(new Error('aborted'));
};

if (signal?.aborted) {
onAbort();
return;
}

signal?.addEventListener('abort', onAbort, { once: true });
timeoutId = setTimeout(() => {
cleanup();
resolve();
}, ms);
});
}

async function* makeSilentStream(signal?: AbortSignal) {
await new Promise<never>((_, reject) => {
if (!signal) {
return;
}
if (signal.aborted) {
reject(new Error('aborted'));
return;
}
signal.addEventListener('abort', () => reject(new Error('aborted')), {
once: true,
});
});

yield {} as GenerateContentResponse;
}

it('should reject a silent stream after the configured idle timeout retries exhaust', async () => {
vi.useFakeTimers();
vi.stubEnv('QWEN_CODE_STREAM_IDLE_TIMEOUT_MS', '50');

const abortController = new AbortController();
vi.mocked(mockContentGenerator.generateContentStream).mockImplementation(
async (request) => makeSilentStream(request.config?.abortSignal),
);

const stream = await chat.sendMessageStream(
'gemini-3-pro',
{
message: 'hangs forever',
config: { abortSignal: abortController.signal },
},
'prompt-stream-idle-timeout',
);

try {
await expectStreamExhaustion(stream);
expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes(3);
} finally {
abortController.abort();
vi.unstubAllEnvs();
vi.useRealTimers();
}
});

it('should reset the idle timer whenever a new chunk arrives', async () => {
vi.useFakeTimers();
vi.stubEnv('QWEN_CODE_STREAM_IDLE_TIMEOUT_MS', '50');

vi.mocked(mockContentGenerator.generateContentStream).mockImplementation(
async (request) =>
(async function* () {
await waitForDelayOrAbort(30, request.config?.abortSignal);
yield makeChunk([{ text: 'Hel' }]);
await waitForDelayOrAbort(30, request.config?.abortSignal);
yield makeChunk([{ text: 'lo' }], 'STOP');
})(),
);

try {
const stream = await chat.sendMessageStream(
'gemini-3-pro',
{ message: 'stream slowly but steadily' },
'prompt-stream-idle-progress',
);

const events = await collectStreamWithFakeTimers(stream, 100);
expect(events.map((event) => event.type)).toEqual([
StreamEventType.CHUNK,
StreamEventType.CHUNK,
]);
expect(chat.getHistory().at(-1)?.parts?.map((part) => part.text ?? '').join('')).toBe(
'Hello',
);
} finally {
vi.unstubAllEnvs();
vi.useRealTimers();
}
});

it('should allow disabling the idle watchdog via environment variable', async () => {
vi.useFakeTimers();
vi.stubEnv('QWEN_CODE_STREAM_IDLE_TIMEOUT_MS', '50');
vi.stubEnv('QWEN_CODE_DISABLE_STREAM_WATCHDOG', 'true');

const abortController = new AbortController();
let internalSignal: AbortSignal | undefined;
let settled = false;

vi.mocked(mockContentGenerator.generateContentStream).mockImplementation(
async (request) => {
internalSignal = request.config?.abortSignal;
return makeSilentStream(internalSignal);
},
);

const stream = await chat.sendMessageStream(
'gemini-3-pro',
{
message: 'keep waiting',
config: { abortSignal: abortController.signal },
},
'prompt-stream-idle-disabled',
);

const consumePromise = (async () => {
for await (const _ of stream) {
/* consume */
}
})().finally(() => {
settled = true;
});

try {
await vi.advanceTimersByTimeAsync(60);
await Promise.resolve();
expect(settled).toBe(false);
expect(internalSignal?.aborted).toBe(false);
expect(mockContentGenerator.generateContentStream).toHaveBeenCalledTimes(1);
} finally {
abortController.abort();
await consumePromise.catch(() => undefined);
vi.unstubAllEnvs();
vi.useRealTimers();
}
});
});
describe('redactStructuredOutputArgsForRecording', () => {
// The chat-recording JSONL persists assistant turns to disk and re-feeds
// them on `--continue` / `--resume`. For `--json-schema` runs the
Expand Down Expand Up @@ -3578,7 +3754,7 @@ describe('GeminiChat', async () => {
/**
* Mock a successful compression: the service returns COMPRESSED with a
* fresh history. We don't go through the real
* `config.getContentGenerator().generateContent` path here — the service
* `config.getContentGenerator().generateContent` path here �the service
* is mocked at the boundary.
*/
function mockCompressionService(
Expand Down
Loading
Loading