-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
fix: mcp stream timeout #5860
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: next
Are you sure you want to change the base?
fix: mcp stream timeout #5860
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,6 +3,7 @@ import cluster from 'node:cluster'; | |||||||||||||||||||||||||||||||
| import querystring from 'node:querystring'; | ||||||||||||||||||||||||||||||||
| import { Readable } from 'node:stream'; | ||||||||||||||||||||||||||||||||
| import url from 'node:url'; | ||||||||||||||||||||||||||||||||
| import { fetch, Agent } from 'undici'; | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| import { MCPControllerRegister } from '@eggjs/controller-plugin/lib/impl/mcp/MCPControllerRegister'; | ||||||||||||||||||||||||||||||||
| import type { MCPControllerHook } from '@eggjs/controller-plugin/lib/impl/mcp/MCPControllerRegister'; | ||||||||||||||||||||||||||||||||
|
|
@@ -343,11 +344,10 @@ export class MCPProxyApiClient extends APIClientBase { | |||||||||||||||||||||||||||||||
| ctx.req.headers['mcp-proxy-type'] = action; | ||||||||||||||||||||||||||||||||
| ctx.req.headers['mcp-proxy-sessionid'] = sessionId; | ||||||||||||||||||||||||||||||||
| const resp = await fetch(`http://localhost:${detail.port}/mcp/message?sessionId=${sessionId}`, { | ||||||||||||||||||||||||||||||||
| // dispatcher: new Agent({ | ||||||||||||||||||||||||||||||||
| // connect: { | ||||||||||||||||||||||||||||||||
| // socketPath, | ||||||||||||||||||||||||||||||||
| // }, | ||||||||||||||||||||||||||||||||
| // }), | ||||||||||||||||||||||||||||||||
| dispatcher: new Agent({ | ||||||||||||||||||||||||||||||||
| bodyTimeout: 0, | ||||||||||||||||||||||||||||||||
| headersTimeout: 0, | ||||||||||||||||||||||||||||||||
| }), | ||||||||||||||||||||||||||||||||
|
Comment on lines
+347
to
+350
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||||||||||||||
| headers: ctx.req.headers as unknown as Record<string, string>, | ||||||||||||||||||||||||||||||||
| body: body as string, | ||||||||||||||||||||||||||||||||
| method: ctx.req.method, | ||||||||||||||||||||||||||||||||
|
|
@@ -389,11 +389,10 @@ export class MCPProxyApiClient extends APIClientBase { | |||||||||||||||||||||||||||||||
| ctx.req.headers['mcp-proxy-type'] = action; | ||||||||||||||||||||||||||||||||
| ctx.req.headers['mcp-proxy-sessionid'] = sessionId; | ||||||||||||||||||||||||||||||||
| const response = await fetch(`http://localhost:${detail.port}`, { | ||||||||||||||||||||||||||||||||
| // dispatcher: new Agent({ | ||||||||||||||||||||||||||||||||
| // connect: { | ||||||||||||||||||||||||||||||||
| // socketPath, | ||||||||||||||||||||||||||||||||
| // }, | ||||||||||||||||||||||||||||||||
| // }), | ||||||||||||||||||||||||||||||||
| dispatcher: new Agent({ | ||||||||||||||||||||||||||||||||
| bodyTimeout: 0, | ||||||||||||||||||||||||||||||||
| headersTimeout: 0, | ||||||||||||||||||||||||||||||||
| }), | ||||||||||||||||||||||||||||||||
|
Comment on lines
+392
to
+395
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||||||||||||||
| headers: ctx.req.headers as unknown as Record<string, string>, | ||||||||||||||||||||||||||||||||
| method: ctx.req.method, | ||||||||||||||||||||||||||||||||
| ...(ctx.req.method !== 'GET' | ||||||||||||||||||||||||||||||||
|
|
@@ -413,7 +412,14 @@ export class MCPProxyApiClient extends APIClientBase { | |||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| ctx.set(headers); | ||||||||||||||||||||||||||||||||
| ctx.res.statusCode = response.status; | ||||||||||||||||||||||||||||||||
| Readable.fromWeb(response.body! as any).pipe(ctx.res); | ||||||||||||||||||||||||||||||||
| const readable = Readable.fromWeb(response.body!); | ||||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The use of the non-null assertion operator (!) on response.body could lead to a runtime error if the response has no body (e.g., a 204 No Content response). It is safer to check if the body exists before attempting to create a readable stream from it. if (!response.body) {
ctx.res.end();
break;
}
const readable = Readable.fromWeb(response.body); |
||||||||||||||||||||||||||||||||
| readable.on('error', err => { | ||||||||||||||||||||||||||||||||
| this.logger.error('[mcp-proxy] stream proxy error: %s', err.message); | ||||||||||||||||||||||||||||||||
| if (!ctx.res.writableEnded) { | ||||||||||||||||||||||||||||||||
| ctx.res.end(); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||||
| readable.pipe(ctx.res); | ||||||||||||||||||||||||||||||||
|
Comment on lines
+415
to
+422
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Guard against The non-null assertion 🛡️ Proposed fix: Add null check ctx.res.statusCode = response.status;
+ if (!response.body) {
+ ctx.res.end();
+ break;
+ }
const readable = Readable.fromWeb(response.body!);
readable.on('error', err => {
this.logger.error('[mcp-proxy] stream proxy error: %s', err.message);🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||
| break; | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
|
@@ -464,8 +470,10 @@ export class MCPProxyApiClient extends APIClientBase { | |||||||||||||||||||||||||||||||
| ctx.res.write('event: terminate'); | ||||||||||||||||||||||||||||||||
| } catch (error) { | ||||||||||||||||||||||||||||||||
| ctx.res.statusCode = 500; | ||||||||||||||||||||||||||||||||
| ctx.res.write(`see stream error ${error}`); | ||||||||||||||||||||||||||||||||
| ctx.res.end(); | ||||||||||||||||||||||||||||||||
| if (!ctx.res.writableEnded) { | ||||||||||||||||||||||||||||||||
| ctx.res.statusCode = 500; | ||||||||||||||||||||||||||||||||
| ctx.res.end(); | ||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
Comment on lines
472
to
+476
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The status code is being set redundantly. Furthermore, attempting to set the status code after headers have been sent (which is likely in a streaming response) will result in errors or warnings. It is also good practice to log the error for observability. this.logger.error('[mcp-proxy] sse stream error: %s', error);
if (!ctx.res.headersSent) {
ctx.res.statusCode = 500;
}
if (!ctx.res.writableEnded) {
ctx.res.end();
} |
||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||
|
Comment on lines
471
to
477
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Remove redundant
🧹 Proposed fix } catch (error) {
- ctx.res.statusCode = 500;
if (!ctx.res.writableEnded) {
ctx.res.statusCode = 500;
ctx.res.end();
}
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||
| processStream(); | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To improve performance and enable connection pooling, it is recommended to create a single Agent instance and reuse it across requests instead of instantiating a new one for every fetch call.