11import type { IncomingMessage , ServerResponse , OutgoingHttpHeaders } from 'node:http'
2- import { Http2ServerRequest } from 'node:http2'
2+ import { Http2ServerRequest , constants as h2constants } from 'node:http2'
33import type { Http2ServerResponse } from 'node:http2'
44import type { Writable } from 'node:stream'
55import type { IncomingMessageWithWrapBodyStream } from './request'
@@ -22,9 +22,68 @@ import {
2222import { X_ALREADY_SENT } from './utils/response/constants'
2323
2424const outgoingEnded = Symbol ( 'outgoingEnded' )
25+ const incomingDraining = Symbol ( 'incomingDraining' )
2526type OutgoingHasOutgoingEnded = Http2ServerResponse & {
2627 [ outgoingEnded ] ?: ( ) => void
2728}
29+ type IncomingHasDrainState = ( IncomingMessage | Http2ServerRequest ) & {
30+ [ incomingDraining ] ?: boolean
31+ }
32+
33+ const DRAIN_TIMEOUT_MS = 500
34+ const MAX_DRAIN_BYTES = 64 * 1024 * 1024
35+
36+ const drainIncoming = ( incoming : IncomingMessage | Http2ServerRequest ) : void => {
37+ const incomingWithDrainState = incoming as IncomingHasDrainState
38+ if ( incoming . destroyed || incomingWithDrainState [ incomingDraining ] ) {
39+ return
40+ }
41+ incomingWithDrainState [ incomingDraining ] = true
42+
43+ // HTTP/2: streams are multiplexed, so we can close immediately
44+ // without risking TCP RST racing the response.
45+ if ( incoming instanceof Http2ServerRequest ) {
46+ try {
47+ // eslint-disable-next-line @typescript-eslint/no-explicit-any
48+ ; ( incoming as any ) . stream ?. close ?.( h2constants . NGHTTP2_NO_ERROR )
49+ } catch {
50+ // stream may already be closed
51+ }
52+ return
53+ }
54+
55+ let bytesRead = 0
56+ const cleanup = ( ) => {
57+ clearTimeout ( timer )
58+ incoming . off ( 'data' , onData )
59+ incoming . off ( 'end' , cleanup )
60+ incoming . off ( 'error' , cleanup )
61+ }
62+
63+ const forceClose = ( ) => {
64+ cleanup ( )
65+ const socket = incoming . socket
66+ if ( socket && ! socket . destroyed ) {
67+ socket . destroySoon ( )
68+ }
69+ }
70+
71+ const timer = setTimeout ( forceClose , DRAIN_TIMEOUT_MS )
72+ timer . unref ?.( )
73+
74+ const onData = ( chunk : Buffer ) => {
75+ bytesRead += chunk . length
76+ if ( bytesRead > MAX_DRAIN_BYTES ) {
77+ forceClose ( )
78+ }
79+ }
80+
81+ incoming . on ( 'data' , onData )
82+ incoming . on ( 'end' , cleanup )
83+ incoming . on ( 'error' , cleanup )
84+
85+ incoming . resume ( )
86+ }
2887
2988const handleRequestError = ( ) : Response =>
3089 new Response ( null , {
@@ -264,15 +323,21 @@ export const getRequestListener = (
264323 // and end is called at this point. At that point, nothing is done.
265324 if ( ! incomingEnded ) {
266325 setTimeout ( ( ) => {
267- incoming . destroy ( )
268- // a Http2ServerResponse instance will not terminate without also calling outgoing.destroy()
269- outgoing . destroy ( )
326+ drainIncoming ( incoming )
270327 } )
271328 }
272329 } )
273330 }
274331 }
275332 }
333+
334+ // Drain incoming as soon as the response is flushed to the OS,
335+ // before the socket is closed, to prevent TCP RST racing the response.
336+ outgoing . on ( 'finish' , ( ) => {
337+ if ( ! incomingEnded ) {
338+ drainIncoming ( incoming )
339+ }
340+ } )
276341 }
277342
278343 // Detect if request was aborted.
@@ -294,7 +359,7 @@ export const getRequestListener = (
294359 // and end is called at this point. At that point, nothing is done.
295360 if ( ! incomingEnded ) {
296361 setTimeout ( ( ) => {
297- incoming . destroy ( )
362+ drainIncoming ( incoming )
298363 } )
299364 }
300365 } )
0 commit comments